aboutsummaryrefslogtreecommitdiffstats
path: root/queue2pgsql.py
diff options
context:
space:
mode:
authorDennis Eriksen <d@ennis.no>2023-02-01 20:32:11 +0100
committerDennis Eriksen <d@ennis.no>2023-02-01 20:32:11 +0100
commit8d186d39483beff64a1c11f80c6ca5e56dd7bbc5 (patch)
tree2c5a64ace4bd8eabd4d65014c5313bd7edd76191 /queue2pgsql.py
parentrun queue in batches (diff)
downloadenergyscripts-8d186d39483beff64a1c11f80c6ca5e56dd7bbc5.tar.gz
moving and renaming/breaking everything
Diffstat (limited to 'queue2pgsql.py')
-rw-r--r--queue2pgsql.py74
1 files changed, 0 insertions, 74 deletions
diff --git a/queue2pgsql.py b/queue2pgsql.py
deleted file mode 100644
index 436dcb8..0000000
--- a/queue2pgsql.py
+++ /dev/null
@@ -1,74 +0,0 @@
-#!/usr/bin/env python3
-''' move items from queue to database '''
-
-import os
-import time
-import pickle
-import sqlite3
-from litequeue import SQLQueue
-
-from common.postgres import dbi
-
-QUEUE_DB = os.environ.get('el_QUEUE_db', 'litequeue.db')
-
-# Unlock all
-con = sqlite3.connect(QUEUE_DB)
-cur = con.cursor()
-cur.execute("UPDATE Queue SET status = 0 WHERE status = 1")
-con.commit()
-con.close()
-
-# Open Queue
-q = SQLQueue(QUEUE_DB, maxsize=None)
-q.prune()
-
-msgids = []
-values = []
-
-i = 0
-
-# Event loop
-while True:
- # Just sleep if queue is empty
- if q.empty() == True:
- time.sleep(10)
- continue
-
- # get message
- task = q.pop()
- raw = pickle.loads(task['message'])
- msgids.append(task['message_id'])
- sql = raw[0]
-
- # if the queue-item already is a batch-job, don't do any more batch-work
- if isinstance(raw[1], list):
- values = raw[1]
- i = 10
- else:
- values.append(raw[1])
-
-
- # Check if we can batch up with the next message in queue
- i += 1
- if i < 10 and q.qsize() - len(msgids) >= 1:
- nextraw = pickle.loads(q.peek()['message'])
- nextsql = nextraw[0]
- nextvalues = nextraw[1]
- if sql == nextsql and isinstance(nextvalues, tuple):
- continue
-
- dbi(sql,values)
-
- for id in msgids:
- q.done(id)
-
- table = sql.split(' ')[2]
- num = 1 if isinstance(values, tuple) else len(values)
- left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else ''
- print("Processed", num, "item(s) for table", table + ".", left)
-
- msgids = []
- values = []
- i=0
-
-print("END")