diff options
author | Dennis Eriksen <d@ennis.no> | 2023-02-01 20:32:11 +0100 |
---|---|---|
committer | Dennis Eriksen <d@ennis.no> | 2023-02-01 20:32:11 +0100 |
commit | 8d186d39483beff64a1c11f80c6ca5e56dd7bbc5 (patch) | |
tree | 2c5a64ace4bd8eabd4d65014c5313bd7edd76191 /scripts/queue_runner.py | |
parent | run queue in batches (diff) | |
download | energyscripts-8d186d39483beff64a1c11f80c6ca5e56dd7bbc5.tar.gz |
moving and renaming/breaking everything
Diffstat (limited to 'scripts/queue_runner.py')
-rw-r--r-- | scripts/queue_runner.py | 74 |
1 files changed, 74 insertions, 0 deletions
diff --git a/scripts/queue_runner.py b/scripts/queue_runner.py new file mode 100644 index 0000000..436dcb8 --- /dev/null +++ b/scripts/queue_runner.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +''' move items from queue to database ''' + +import os +import time +import pickle +import sqlite3 +from litequeue import SQLQueue + +from common.postgres import dbi + +QUEUE_DB = os.environ.get('el_QUEUE_db', 'litequeue.db') + +# Unlock all +con = sqlite3.connect(QUEUE_DB) +cur = con.cursor() +cur.execute("UPDATE Queue SET status = 0 WHERE status = 1") +con.commit() +con.close() + +# Open Queue +q = SQLQueue(QUEUE_DB, maxsize=None) +q.prune() + +msgids = [] +values = [] + +i = 0 + +# Event loop +while True: + # Just sleep if queue is empty + if q.empty() == True: + time.sleep(10) + continue + + # get message + task = q.pop() + raw = pickle.loads(task['message']) + msgids.append(task['message_id']) + sql = raw[0] + + # if the queue-item already is a batch-job, don't do any more batch-work + if isinstance(raw[1], list): + values = raw[1] + i = 10 + else: + values.append(raw[1]) + + + # Check if we can batch up with the next message in queue + i += 1 + if i < 10 and q.qsize() - len(msgids) >= 1: + nextraw = pickle.loads(q.peek()['message']) + nextsql = nextraw[0] + nextvalues = nextraw[1] + if sql == nextsql and isinstance(nextvalues, tuple): + continue + + dbi(sql,values) + + for id in msgids: + q.done(id) + + table = sql.split(' ')[2] + num = 1 if isinstance(values, tuple) else len(values) + left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else '' + print("Processed", num, "item(s) for table", table + ".", left) + + msgids = [] + values = [] + i=0 + +print("END") |