diff options
Diffstat (limited to 'scripts/queue_runner.py')
-rw-r--r-- | scripts/queue_runner.py | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/scripts/queue_runner.py b/scripts/queue_runner.py new file mode 100644 index 0000000..436dcb8 --- /dev/null +++ b/scripts/queue_runner.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +''' move items from queue to database ''' + +import os +import time +import pickle +import sqlite3 +from litequeue import SQLQueue + +from common.postgres import dbi + +QUEUE_DB = os.environ.get('el_QUEUE_db', 'litequeue.db') + +# Unlock all +con = sqlite3.connect(QUEUE_DB) +cur = con.cursor() +cur.execute("UPDATE Queue SET status = 0 WHERE status = 1") +con.commit() +con.close() + +# Open Queue +q = SQLQueue(QUEUE_DB, maxsize=None) +q.prune() + +msgids = [] +values = [] + +i = 0 + +# Event loop +while True: + # Just sleep if queue is empty + if q.empty() == True: + time.sleep(10) + continue + + # get message + task = q.pop() + raw = pickle.loads(task['message']) + msgids.append(task['message_id']) + sql = raw[0] + + # if the queue-item already is a batch-job, don't do any more batch-work + if isinstance(raw[1], list): + values = raw[1] + i = 10 + else: + values.append(raw[1]) + + + # Check if we can batch up with the next message in queue + i += 1 + if i < 10 and q.qsize() - len(msgids) >= 1: + nextraw = pickle.loads(q.peek()['message']) + nextsql = nextraw[0] + nextvalues = nextraw[1] + if sql == nextsql and isinstance(nextvalues, tuple): + continue + + dbi(sql,values) + + for id in msgids: + q.done(id) + + table = sql.split(' ')[2] + num = 1 if isinstance(values, tuple) else len(values) + left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else '' + print("Processed", num, "item(s) for table", table + ".", left) + + msgids = [] + values = [] + i=0 + +print("END") |