blob: fd886002684b3d19f771abc7db7e6ed46b42f5a1 (
plain) (
tree)
|
|
#!/usr/bin/env python3
""" move items from queue to database """
import os
import pickle
import sqlite3
import time
from common.postgres import dbi
from litequeue import SQLQueue
QUEUE_DB = os.environ.get("el_QUEUE_db", "litequeue.db")
QUEUE_DIR = os.environ.get("el_QUEUE_dir", "queue")
QUEUE_DB = QUEUE_DIR + "/" + QUEUE_DB
QUEUE_SLEEP = int(
os.environ.get("el_QUEUE_sleep", 15)
) # Default sleep 15 seconds when queue empty
# Unlock all
con = sqlite3.connect(QUEUE_DB)
cur = con.cursor()
cur.execute("UPDATE Queue SET status = 0 WHERE status = 1")
con.commit()
con.close()
# Open Queue
q = SQLQueue(QUEUE_DB, maxsize=None)
q.prune()
msgids = []
values = []
i = 0
# Event loop
while True:
# Just sleep if queue is empty
if q.empty() is True:
time.sleep(QUEUE_SLEEP)
continue
# get message
task = q.pop()
raw = pickle.loads(task["message"])
msgids.append(task["message_id"])
sql = raw[0]
# if the queue-item already is a batch-job, don't do any more batch-work
if isinstance(raw[1], list):
values = raw[1]
i = 10
else:
values.append(raw[1])
# Check if we can batch up with the next message in queue
i += 1
if i < 10 and q.qsize() - len(msgids) >= 1:
nextraw = pickle.loads(q.peek()["message"])
nextsql = nextraw[0]
nextvalues = nextraw[1]
if sql == nextsql and isinstance(nextvalues, tuple):
continue
dbi(sql, values)
for msgid in msgids:
q.done(msgid)
table = sql.split(" ")[2].strip()
num = 1 if isinstance(values, tuple) else len(values)
left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else ""
print("Processed", num, "item(s) for table", table + ".", left)
msgids = []
values = []
i = 0
print("END")
|