diff --git a/.gitignore b/.gitignore
index af68312..cc50e13 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,9 @@ litequeue.db
+# test
# Environments
diff --git a/common/__init__.py b/common/__init__.py
new file mode 100644
index 0000000..89977d2
--- /dev/null
+++ b/common/__init__.py
@@ -0,0 +1,13 @@
+#!/usr/bin/env python3
+''' common functions and stuff '''
+import os
+QUEUE = bool(os.environ.get('el_QUEUE', False))
+# Initialize queue
+if QUEUE is True:
+ from .queue import dbi
+ from .postgres import dbi
diff --git a/common/postgres.py b/common/postgres.py
new file mode 100644
index 0000000..bff0720
--- /dev/null
+++ b/common/postgres.py
@@ -0,0 +1,28 @@
+#!/usr/bin/env python3
+''' common functions and stuff '''
+import os
+import sys
+import psycopg
+pg_db = os.environ['el_pg_db']
+pg_host = os.environ['el_pg_host']
+pg_user = os.environ.get('el_pg_user','')
+pg_pass = os.environ.get('el_pg_pass','')
+def dbi(sql, values, **kwargs):
+ ''' insert into db '''
+ verbose = bool(kwargs['verbose']) if 'verbose' in kwargs else False
+ # pylint: disable=E1129
+ with psycopg.connect(dbname=pg_db, host=pg_host, user=pg_user, password=pg_pass) as conn:
+ cur = conn.cursor()
+ if isinstance(values, list):
+ cur.executemany(sql, values)
+ elif isinstance(values, tuple):
+ cur.execute(sql, values)
+ else:
+ print('`values` is a', type(values), 'but it needs to be tuple or list')
+ sys.exit(1)
+ if verbose is True:
+ print("Inserted and/or changed", cur.rowcount, "rows into db")
+ return True
diff --git a/common/queue.py b/common/queue.py
new file mode 100644
index 0000000..7804c8d
--- /dev/null
+++ b/common/queue.py
@@ -0,0 +1,21 @@
+#!/usr/bin/env python3
+''' common functions and stuff '''
+import os
+import sys
+import pickle
+from litequeue import SQLQueue
+QUEUE_DB = os.environ.get('el_QUEUE_db', 'litequeue.db')
+# Initialize queue
+q = SQLQueue(QUEUE_DB, maxsize=None)
+def dbi(sql,values,**kwargs):
+ print("lolqu")
+ verbose = bool(kwargs['verbose']) if 'verbose' in kwargs else False
+ q.put(pickle.dumps([sql, values]))
+ if verbose is True:
+ print("Inserted into queue")
+ return True
diff --git a/elvia2pgsql.py b/elvia2pgsql.py
index 4ed2430..be5eaae 100755
--- a/elvia2pgsql.py
+++ b/elvia2pgsql.py
@@ -1,26 +1,23 @@
+#!/usr/bin/env python3
+''' elvia2pgsql '''
import os
import sys
-import json
-import psycopg2
import requests
from datetime import datetime
from datetime import timedelta
from tzlocal import get_localzone
+from common import dbi
apiKey = os.environ['el_elvia_token']
apiUrl = "https://elvia.azure-api.net/customer/metervalues/api/v1/metervalues"
-pg_db = os.environ['el_pg_db']
-pg_host = os.environ['el_pg_host']
pg_table = "elvia"
startTime = datetime.now(get_localzone()) - timedelta(days = 2)
startTime = startTime.isoformat('T')
endTime = datetime.now(get_localzone()).isoformat('T')
@@ -55,30 +52,6 @@ for item in data['meteringpoints'][0]['metervalue']['timeSeries']:
if item['verified']:
values.append((data['meteringpoints'][0]['meteringPointId'], item['startTime'], item['endTime'], item['value'], item['uom'], item['production']))
-# connect to database
-conn = psycopg2.connect(database=pg_db, host=pg_host)
-cur = conn.cursor()
-# count rows before we start
-cur.execute("SELECT COUNT(*) FROM " + pg_table)
-before = cur.fetchone()
-# insert data
- cur.executemany("INSERT INTO " + pg_table + " VALUES(%s,%s,%s,%s,%s,%s) ON CONFLICT (startTime,endTime) DO NOTHING", values)
- conn.commit()
-except Exception as e:
- conn.rollback()
- raise e
-# count rows after we finnish
-cur.execute("SELECT COUNT(*) FROM " + pg_table)
-after = cur.fetchone()
-# count *new* rows
-newRows = after[0] - before[0]
-# close connection
-print("Successfully inserted " + str(newRows) + " records into the database")
+# Count new rows and insert
+dbi("INSERT INTO " + pg_table + " VALUES(%s,%s,%s,%s,%s,%s) ON CONFLICT (startTime,endTime) DO NOTHING", values, verbose=True)
diff --git a/elvia_gridtariff2pgsql.py b/elvia_gridtariff2pgsql.py
index b6b423b..bcf4bd7 100755
--- a/elvia_gridtariff2pgsql.py
+++ b/elvia_gridtariff2pgsql.py
@@ -1,15 +1,15 @@
+''' get grid tariffs'''
import os
import sys
-import json
-import psycopg2
-import requests
from datetime import datetime
from datetime import timedelta
+import requests
from tzlocal import get_localzone
+from common import dbi
# API documentation: https://elvia.portal.azure-api.net/docs/services/gridtariffapi/operations/post-digin-api-v-tariffquery-meteringpointsgridtariffs?
apiKey = os.environ['el_elvia_grid_api_key']
@@ -66,30 +66,4 @@ values = []
for item in data['gridTariffCollections'][0]['gridTariff']['tariffPrice']['hours']:
values.append((meteringPointId, item['startTime'], item['expiredAt'], item['shortName'], item['isPublicHoliday'], item['energyPrice']['total'], item['energyPrice']['totalExVat']))
-# connect to database
-conn = psycopg2.connect(database=pg_db, host=pg_host)
-cur = conn.cursor()
-# count rows before we start
-cur.execute("SELECT COUNT(*) FROM " + pg_table)
-before = cur.fetchone()
-# insert data
- cur.executemany("INSERT INTO " + pg_table + " VALUES(%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (meteringPointId,startTime,endTime) DO NOTHING", values)
- conn.commit()
-except Exception as e:
- conn.rollback()
- raise e
-# count rows after we finnish
-cur.execute("SELECT COUNT(*) FROM " + pg_table)
-after = cur.fetchone()
-# count *new* rows
-newRows = after[0] - before[0]
-# close connection
-print("Successfully inserted " + str(newRows) + " records into the database")
+dbi("INSERT INTO " + pg_table + " VALUES(%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (meteringPointId,startTime,endTime) DO NOTHING", values, verbose=True)
diff --git a/entsoe2pgsql.py b/entsoe2pgsql.py
index 3fdb6b6..2597a98 100755
--- a/entsoe2pgsql.py
+++ b/entsoe2pgsql.py
@@ -2,21 +2,20 @@
import os
import sys
-import json
-import psycopg2
-import requests
-import xmltodict
from datetime import datetime
from datetime import timedelta
+import requests
+import xmltodict
from tzlocal import get_localzone
from dateutil import tz
+from common import dbi
# variables
# Getting an api-key isn't very well documented. The documentation [1] points
-# to a pdf [2], which says the following:
+# to a pdf [2], which says the following:
# > In order to request the access to the Restful API, please register on the
# > Transparency Platform and send an email to transparency@entsoe.eu with
# > “Restful API access” in the subject line. Indicate the email address you
@@ -45,7 +44,7 @@ areas = [ {"name": "NO-0", "code": "10YNO-0--------C"},
{"name": "NO-2", "code": "10YNO-2--------T"},
{"name": "NO-3", "code": "10YNO-3--------J"},
{"name": "NO-4", "code": "10YNO-4--------9"} ]
UTC = tz.gettz('UTC')
CET = tz.gettz('Europe/Oslo')
@@ -55,20 +54,20 @@ values=[]
for area in areas:
url = apiUrl + "&documentType=A44&in_Domain=" + area["code"] + "&out_Domain=" + area["code"] + "&periodStart=" + startTime + "0000&periodEnd=" + endTime + "0000"
print("Getting data for " + area["code"])
response = requests.get(url)
if response.status_code != 200:
print("Oh shit")
except Exception as e:
print("oh lol")
data_dict = xmltodict.parse(response.content)
items = 0
if "Publication_MarketDocument" in data_dict:
for lista in data_dict["Publication_MarketDocument"]["TimeSeries"]:
@@ -84,34 +83,5 @@ for area in areas:
# append values
values.append((time, area["name"], item["price.amount"]))
print("Got " + str(items) + " records")
-# connect to db
-conn = psycopg2.connect(database=pg_db, host=pg_host)
-cur = conn.cursor()
-# count rows before we start
-cur.execute("SELECT COUNT(*) FROM " + pg_table)
-before = cur.fetchone()
-# insert data
-print("Inserting into database")
- cur.executemany("INSERT INTO " + pg_table + " VALUES(%s,%s,%s) ON CONFLICT (starttime, zone) DO NOTHING", values)
- conn.commit()
-except Exception as e:
- conn.rollback()
- raise e
-# count rows after we finnish
-cur.execute("SELECT COUNT(*) FROM " + pg_table)
-after = cur.fetchone()
-# count *new* rows
-newRows = after[0] - before[0]
-# close connection
-print("Successfully inserted " + str(newRows) + " records into the database")
+dbi("INSERT INTO " + pg_table + " VALUES(%s,%s,%s) ON CONFLICT (starttime, zone) DO NOTHING", values, verbose=True)
diff --git a/env.sample b/env.sample
index e40aa42..6632904 100644
--- a/env.sample
+++ b/env.sample
@@ -16,3 +16,6 @@ export el_entsoe_token=XXX
export el_neohub_ip=XXX
export el_neohub_port=4242
+export el_QUEUE=False
+export el_QUEUE_db='litequeue.db'
diff --git a/mqtt2queue.py b/mqtt2queue.py
index 102b1eb..24ee207 100644
--- a/mqtt2queue.py
+++ b/mqtt2queue.py
@@ -2,18 +2,16 @@
import os
import json
-import pickle
from datetime import datetime
-from litequeue import SQLQueue
import paho.mqtt.client as mqtt
+from common import dbi
mqtt_server = os.environ['el_mqtt_server']
mqtt_port = int(os.environ['el_mqtt_port'])
keepalive = int(os.environ['el_mqtt_keepalive'])
mqtt_topic = os.environ['el_mqtt_topic']
-q = SQLQueue("litequeue.db", maxsize=None)
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
@@ -30,12 +28,15 @@ def on_message(client, userdata, msg):
if name.startswith('tmp') and 'temperature' in data and 'humidity' in data:
sql = "INSERT INTO mqtt_temps (name, temperature, humidity, battery, linkquality, voltage, time) VALUES(%s,%s,%s,%s,%s,%s,%s)"
values = (name, data['temperature'], data['humidity'], data['battery'], data['linkquality'], data['voltage'], datetime.utcnow())
- q.put(pickle.dumps([sql, values]))
elif name == 'HAN' and 'current' in data:
sql = "INSERT INTO mqtt_han (name, current, power, voltage, linkquality, time) VALUES(%s,%s,%s,%s,%s,%s)"
values = (name, data['current'], data['power'], data['voltage'], data['linkquality'], datetime.utcnow())
- q.put(pickle.dumps([sql, values]))
+ else:
+ return
+ dbi(sql, values, verbose=True)
diff --git a/nb2pgsql.py b/nb2pgsql.py
index 3993c41..1901182 100755
--- a/nb2pgsql.py
+++ b/nb2pgsql.py
@@ -4,7 +4,6 @@ import os
import sys
import csv
import json
-import psycopg2
import requests
import tempfile
@@ -12,6 +11,8 @@ from datetime import datetime
from datetime import timedelta
from tzlocal import get_localzone
+from common import dbi
# I'm not sure I understand Norges Banks json-model. It seems a lot easier to just get the CSV, and convert it to JSON.
apiUrl = "https://data.norges-bank.no/api/data/EXR/B.EUR.NOK.SP?format=csv&locale=en"
@@ -55,30 +56,4 @@ with open(temp.name) as csvfile:
-# connect to database
-conn = psycopg2.connect(database=pg_db, host=pg_host)
-cur = conn.cursor()
-# count rows before we start
-cur.execute("SELECT COUNT(*) FROM " + pg_table)
-before = cur.fetchone()
-# insert data
- cur.executemany("INSERT INTO " + pg_table + " VALUES(%s,%s,%s,%s) ON CONFLICT (startdate,base_cur,quote_cur) DO NOTHING", values)
- conn.commit()
-except Exception as e:
- conn.rollback()
- raise e
-# count rows after we finnish
-cur.execute("SELECT COUNT(*) FROM " + pg_table)
-after = cur.fetchone()
-# count *new* rows
-newRows = after[0] - before[0]
-# close connection
-print("Successfully inserted " + str(newRows) + " records into the database")
+dbi("INSERT INTO " + pg_table + " VALUES(%s,%s,%s,%s) ON CONFLICT (startdate,base_cur,quote_cur) DO NOTHING", values,verbose=True)
diff --git a/queue2pgsql.py b/queue2pgsql.py
index 0c95f6d..253f5b4 100644
--- a/queue2pgsql.py
+++ b/queue2pgsql.py
@@ -1,23 +1,25 @@
#!/usr/bin/env python3
+''' move items from queue to database '''
import os
import time
import pickle
import sqlite3
from litequeue import SQLQueue
-from datetime import datetime
-from todb import todb
+from common import dbi
+QUEUE_DB = os.environ.get('el_QUEUE_db', 'litequeue.db')
# Unlock all
-con = sqlite3.connect("litequeue.db")
+con = sqlite3.connect(QUEUE_DB)
cur = con.cursor()
cur.execute("UPDATE Queue SET status = 0 WHERE status = 1")
# Open Queue
-q = SQLQueue("litequeue.db", maxsize=None)
+q = SQLQueue(QUEUE_DB, maxsize=None)
# Event loop
@@ -28,7 +30,7 @@ while True:
task = q.pop()
raw = pickle.loads(task['message'])
- todb(raw[0],raw[1])
+ dbi(raw[0],raw[1])
print("Processed " + str(task['message_id']) + ". " + str(q.qsize() - 1) + " left")
diff --git a/todb.py b/todb.py
index e5988cd..7426ace 100644
--- a/todb.py
+++ b/todb.py
@@ -13,8 +13,10 @@ def todb(sql, values):
if type(values) == list:
cur = conn.cursor()
cur.executemany(sql, values)
+ return cur.rowcount
elif type(values) == tuple:
conn.execute(sql, values)
+ return 1
print("OH SHIT")
diff --git a/yr2pgsql.py b/yr2pgsql.py
index 28bb06e..9c3ae5e 100755
--- a/yr2pgsql.py
+++ b/yr2pgsql.py
@@ -1,11 +1,12 @@
+#!/usr/bin/env python3
+''' Get weatherdata from yr.no '''
import os
import sys
-import json
-import psycopg2
import requests
+from common import dbi
lat = str(os.environ['el_yr_lat'])
lon = str(os.environ['el_yr_lon'])
@@ -38,10 +39,6 @@ except Exception as e:
data = response.json()
### insert data into database
values = []
@@ -49,38 +46,13 @@ for item in data["properties"]["timeseries"]:
details = item["data"]["instant"]["details"]
-# connect to database
-conn = psycopg2.connect(database=pg_db, host=pg_host)
-cur = conn.cursor()
-# count rows before we start
-cur.execute("SELECT COUNT(*) FROM " + pg_table)
-before = cur.fetchone()
-# insert data
- cur.executemany("INSERT INTO " + pg_table + """ VALUES(%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (time) DO UPDATE SET
- air_temperature=EXCLUDED.air_temperature,
- air_pressure_at_sea_level=EXCLUDED.air_pressure_at_sea_level,
- cloud_area_fraction=EXCLUDED.cloud_area_fraction,
- relative_humidity=EXCLUDED.relative_humidity,
- wind_from_direction=EXCLUDED.wind_from_direction,
- wind_speed=EXCLUDED.wind_speed,
- updated=now()
- """, values)
- conn.commit()
-except Exception as e:
- conn.rollback()
- raise e
-# count rows after we finnish
-cur.execute("SELECT COUNT(*) FROM " + pg_table)
-after = cur.fetchone()
-# count *new* rows
-newRows = after[0] - before[0]
-# close connection
+sql = "INSERT INTO " + pg_table + """ VALUES(%s,%s,%s,%s,%s,%s,%s) ON CONFLICT (time) DO UPDATE SET
+ air_temperature=EXCLUDED.air_temperature,
+ air_pressure_at_sea_level=EXCLUDED.air_pressure_at_sea_level,
+ cloud_area_fraction=EXCLUDED.cloud_area_fraction,
+ relative_humidity=EXCLUDED.relative_humidity,
+ wind_from_direction=EXCLUDED.wind_from_direction,
+ wind_speed=EXCLUDED.wind_speed,
+ updated=now()"""
-print("Successfully inserted " + str(newRows) + " records into the database. Might have updated a bunch more.")
+dbi(sql, values, verbose=True)