diff options
author | Dennis Eriksen <d@ennis.no> | 2023-02-01 20:32:11 +0100 |
---|---|---|
committer | Dennis Eriksen <d@ennis.no> | 2023-02-01 20:32:11 +0100 |
commit | 8d186d39483beff64a1c11f80c6ca5e56dd7bbc5 (patch) | |
tree | 2c5a64ace4bd8eabd4d65014c5313bd7edd76191 /queue2pgsql.py | |
parent | run queue in batches (diff) | |
download | energyscripts-8d186d39483beff64a1c11f80c6ca5e56dd7bbc5.tar.gz |
moving and renaming/breaking everything
Diffstat (limited to 'queue2pgsql.py')
-rw-r--r-- | queue2pgsql.py | 74 |
1 files changed, 0 insertions, 74 deletions
diff --git a/queue2pgsql.py b/queue2pgsql.py deleted file mode 100644 index 436dcb8..0000000 --- a/queue2pgsql.py +++ /dev/null @@ -1,74 +0,0 @@ -#!/usr/bin/env python3 -''' move items from queue to database ''' - -import os -import time -import pickle -import sqlite3 -from litequeue import SQLQueue - -from common.postgres import dbi - -QUEUE_DB = os.environ.get('el_QUEUE_db', 'litequeue.db') - -# Unlock all -con = sqlite3.connect(QUEUE_DB) -cur = con.cursor() -cur.execute("UPDATE Queue SET status = 0 WHERE status = 1") -con.commit() -con.close() - -# Open Queue -q = SQLQueue(QUEUE_DB, maxsize=None) -q.prune() - -msgids = [] -values = [] - -i = 0 - -# Event loop -while True: - # Just sleep if queue is empty - if q.empty() == True: - time.sleep(10) - continue - - # get message - task = q.pop() - raw = pickle.loads(task['message']) - msgids.append(task['message_id']) - sql = raw[0] - - # if the queue-item already is a batch-job, don't do any more batch-work - if isinstance(raw[1], list): - values = raw[1] - i = 10 - else: - values.append(raw[1]) - - - # Check if we can batch up with the next message in queue - i += 1 - if i < 10 and q.qsize() - len(msgids) >= 1: - nextraw = pickle.loads(q.peek()['message']) - nextsql = nextraw[0] - nextvalues = nextraw[1] - if sql == nextsql and isinstance(nextvalues, tuple): - continue - - dbi(sql,values) - - for id in msgids: - q.done(id) - - table = sql.split(' ')[2] - num = 1 if isinstance(values, tuple) else len(values) - left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else '' - print("Processed", num, "item(s) for table", table + ".", left) - - msgids = [] - values = [] - i=0 - -print("END") |