#!/usr/bin/env python3 """ move items from queue to database """ import pickle import sqlite3 import time import common from common.postgres import dbi from litequeue import SQLQueue QUEUE_DB = common.env("el_QUEUE_db", "litequeue.db") QUEUE_DIR = common.env("el_QUEUE_dir", "queue") QUEUE_DB = QUEUE_DIR + "/" + QUEUE_DB QUEUE_SLEEP = int( common.env("el_QUEUE_sleep", 15) ) # Default sleep 15 seconds when queue empty # Unlock all con = sqlite3.connect(QUEUE_DB) cur = con.cursor() cur.execute("UPDATE Queue SET status = 0 WHERE status = 1") con.commit() con.close() # Open Queue q = SQLQueue(QUEUE_DB, maxsize=None) q.prune() msgids = [] values = [] i = 0 # Event loop while True: # Just sleep if queue is empty if q.empty() is True: time.sleep(QUEUE_SLEEP) continue # get message task = q.pop() raw = pickle.loads(task["message"]) msgids.append(task["message_id"]) sql = raw[0] # if the queue-item already is a batch-job, don't do any more batch-work if isinstance(raw[1], list): values = raw[1] i = 10 else: values.append(raw[1]) # Check if we can batch up with the next message in queue i += 1 if i < 10 and q.qsize() - len(msgids) >= 1: nextraw = pickle.loads(q.peek()["message"]) nextsql = nextraw[0] nextvalues = nextraw[1] if sql == nextsql and isinstance(nextvalues, tuple): continue dbi(sql, values) for msgid in msgids: q.done(msgid) table = sql.split(" ")[2].strip() num = 1 if isinstance(values, tuple) else len(values) left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else "" print("Processed", num, "item(s) for table", table + ".", left) msgids = [] values = [] i = 0 print("END")