aboutsummaryrefslogtreecommitdiffstats
path: root/scripts/queue_runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/queue_runner.py')
-rw-r--r--scripts/queue_runner.py74
1 files changed, 74 insertions, 0 deletions
diff --git a/scripts/queue_runner.py b/scripts/queue_runner.py
new file mode 100644
index 0000000..436dcb8
--- /dev/null
+++ b/scripts/queue_runner.py
@@ -0,0 +1,74 @@
+#!/usr/bin/env python3
+''' move items from queue to database '''
+
+import os
+import time
+import pickle
+import sqlite3
+from litequeue import SQLQueue
+
+from common.postgres import dbi
+
+QUEUE_DB = os.environ.get('el_QUEUE_db', 'litequeue.db')
+
+# Unlock all
+con = sqlite3.connect(QUEUE_DB)
+cur = con.cursor()
+cur.execute("UPDATE Queue SET status = 0 WHERE status = 1")
+con.commit()
+con.close()
+
+# Open Queue
+q = SQLQueue(QUEUE_DB, maxsize=None)
+q.prune()
+
+msgids = []
+values = []
+
+i = 0
+
+# Event loop
+while True:
+ # Just sleep if queue is empty
+ if q.empty() == True:
+ time.sleep(10)
+ continue
+
+ # get message
+ task = q.pop()
+ raw = pickle.loads(task['message'])
+ msgids.append(task['message_id'])
+ sql = raw[0]
+
+ # if the queue-item already is a batch-job, don't do any more batch-work
+ if isinstance(raw[1], list):
+ values = raw[1]
+ i = 10
+ else:
+ values.append(raw[1])
+
+
+ # Check if we can batch up with the next message in queue
+ i += 1
+ if i < 10 and q.qsize() - len(msgids) >= 1:
+ nextraw = pickle.loads(q.peek()['message'])
+ nextsql = nextraw[0]
+ nextvalues = nextraw[1]
+ if sql == nextsql and isinstance(nextvalues, tuple):
+ continue
+
+ dbi(sql,values)
+
+ for id in msgids:
+ q.done(id)
+
+ table = sql.split(' ')[2]
+ num = 1 if isinstance(values, tuple) else len(values)
+ left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else ''
+ print("Processed", num, "item(s) for table", table + ".", left)
+
+ msgids = []
+ values = []
+ i=0
+
+print("END")