#!/usr/bin/env python3 ''' move items from queue to database ''' import os import time import pickle import sqlite3 from litequeue import SQLQueue from common.postgres import dbi QUEUE_DB = os.environ.get('el_QUEUE_db', 'litequeue.db') QUEUE_SLEEP = int(os.environ.get('el_QUEUE_sleep', 15)) # Default sleep 15 seconds when queue empty # Unlock all con = sqlite3.connect(QUEUE_DB) cur = con.cursor() cur.execute("UPDATE Queue SET status = 0 WHERE status = 1") con.commit() con.close() # Open Queue q = SQLQueue(QUEUE_DB, maxsize=None) q.prune() msgids = [] values = [] i = 0 # Event loop while True: # Just sleep if queue is empty if q.empty() is True: time.sleep(QUEUE_SLEEP) continue # get message task = q.pop() raw = pickle.loads(task['message']) msgids.append(task['message_id']) sql = raw[0] # if the queue-item already is a batch-job, don't do any more batch-work if isinstance(raw[1], list): values = raw[1] i = 10 else: values.append(raw[1]) # Check if we can batch up with the next message in queue i += 1 if i < 10 and q.qsize() - len(msgids) >= 1: nextraw = pickle.loads(q.peek()['message']) nextsql = nextraw[0] nextvalues = nextraw[1] if sql == nextsql and isinstance(nextvalues, tuple): continue dbi(sql,values) for msgid in msgids: q.done(msgid) table = sql.split(' ')[2] num = 1 if isinstance(values, tuple) else len(values) left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else '' print("Processed", num, "item(s) for table", table + ".", left) msgids = [] values = [] i=0 print("END")