diff options
author | dennis <d@ennis.no> | 2023-02-01 19:38:35 +0100 |
---|---|---|
committer | dennis <d@ennis.no> | 2023-02-01 19:38:35 +0100 |
commit | 20c83cc935c1ddd808b82d985531cb694c1598f2 (patch) | |
tree | 694c49aa6c5d5e2dfbc8d47dac234d1d151a5f75 /queue2pgsql.py | |
parent | use new dbi (diff) | |
download | energyscripts-20c83cc935c1ddd808b82d985531cb694c1598f2.tar.gz |
run queue in batches
Diffstat (limited to '')
-rw-r--r-- | queue2pgsql.py | 44 |
1 files changed, 41 insertions, 3 deletions
diff --git a/queue2pgsql.py b/queue2pgsql.py index f5be1d9..436dcb8 100644 --- a/queue2pgsql.py +++ b/queue2pgsql.py @@ -22,15 +22,53 @@ con.close() q = SQLQueue(QUEUE_DB, maxsize=None) q.prune() +msgids = [] +values = [] + +i = 0 + # Event loop while True: + # Just sleep if queue is empty if q.empty() == True: time.sleep(10) continue + + # get message task = q.pop() raw = pickle.loads(task['message']) - dbi(raw[0],raw[1]) - print("Processed " + str(task['message_id']) + ". " + str(q.qsize() - 1) + " left") - q.done(task['message_id']) + msgids.append(task['message_id']) + sql = raw[0] + + # if the queue-item already is a batch-job, don't do any more batch-work + if isinstance(raw[1], list): + values = raw[1] + i = 10 + else: + values.append(raw[1]) + + + # Check if we can batch up with the next message in queue + i += 1 + if i < 10 and q.qsize() - len(msgids) >= 1: + nextraw = pickle.loads(q.peek()['message']) + nextsql = nextraw[0] + nextvalues = nextraw[1] + if sql == nextsql and isinstance(nextvalues, tuple): + continue + + dbi(sql,values) + + for id in msgids: + q.done(id) + + table = sql.split(' ')[2] + num = 1 if isinstance(values, tuple) else len(values) + left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else '' + print("Processed", num, "item(s) for table", table + ".", left) + + msgids = [] + values = [] + i=0 print("END") |