From 8d186d39483beff64a1c11f80c6ca5e56dd7bbc5 Mon Sep 17 00:00:00 2001 From: Dennis Eriksen Date: Wed, 1 Feb 2023 20:32:11 +0100 Subject: moving and renaming/breaking everything --- queue2pgsql.py | 74 ---------------------------------------------------------- 1 file changed, 74 deletions(-) delete mode 100644 queue2pgsql.py (limited to 'queue2pgsql.py') diff --git a/queue2pgsql.py b/queue2pgsql.py deleted file mode 100644 index 436dcb8..0000000 --- a/queue2pgsql.py +++ /dev/null @@ -1,74 +0,0 @@ -#!/usr/bin/env python3 -''' move items from queue to database ''' - -import os -import time -import pickle -import sqlite3 -from litequeue import SQLQueue - -from common.postgres import dbi - -QUEUE_DB = os.environ.get('el_QUEUE_db', 'litequeue.db') - -# Unlock all -con = sqlite3.connect(QUEUE_DB) -cur = con.cursor() -cur.execute("UPDATE Queue SET status = 0 WHERE status = 1") -con.commit() -con.close() - -# Open Queue -q = SQLQueue(QUEUE_DB, maxsize=None) -q.prune() - -msgids = [] -values = [] - -i = 0 - -# Event loop -while True: - # Just sleep if queue is empty - if q.empty() == True: - time.sleep(10) - continue - - # get message - task = q.pop() - raw = pickle.loads(task['message']) - msgids.append(task['message_id']) - sql = raw[0] - - # if the queue-item already is a batch-job, don't do any more batch-work - if isinstance(raw[1], list): - values = raw[1] - i = 10 - else: - values.append(raw[1]) - - - # Check if we can batch up with the next message in queue - i += 1 - if i < 10 and q.qsize() - len(msgids) >= 1: - nextraw = pickle.loads(q.peek()['message']) - nextsql = nextraw[0] - nextvalues = nextraw[1] - if sql == nextsql and isinstance(nextvalues, tuple): - continue - - dbi(sql,values) - - for id in msgids: - q.done(id) - - table = sql.split(' ')[2] - num = 1 if isinstance(values, tuple) else len(values) - left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else '' - print("Processed", num, "item(s) for table", table + ".", left) - - msgids = [] - values = [] - i=0 - -print("END") -- cgit v1.2.3