aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--common/queue.py4
-rw-r--r--queue2pgsql.py44
2 files changed, 44 insertions, 4 deletions
diff --git a/common/queue.py b/common/queue.py
index abdfb96..e567fb4 100644
--- a/common/queue.py
+++ b/common/queue.py
@@ -16,5 +16,7 @@ def dbi(sql,values,**kwargs):
q.put(pickle.dumps([sql, values]))
if verbose is True:
- print("Inserted into queue")
+ table = sql.split(' ')[2]
+ num = 1 if isinstance(values, tuple) else len(values)
+ print("Inserted", num, "item(s) into queue for", table)
return True
diff --git a/queue2pgsql.py b/queue2pgsql.py
index f5be1d9..436dcb8 100644
--- a/queue2pgsql.py
+++ b/queue2pgsql.py
@@ -22,15 +22,53 @@ con.close()
q = SQLQueue(QUEUE_DB, maxsize=None)
q.prune()
+msgids = []
+values = []
+
+i = 0
+
# Event loop
while True:
+ # Just sleep if queue is empty
if q.empty() == True:
time.sleep(10)
continue
+
+ # get message
task = q.pop()
raw = pickle.loads(task['message'])
- dbi(raw[0],raw[1])
- print("Processed " + str(task['message_id']) + ". " + str(q.qsize() - 1) + " left")
- q.done(task['message_id'])
+ msgids.append(task['message_id'])
+ sql = raw[0]
+
+ # if the queue-item already is a batch-job, don't do any more batch-work
+ if isinstance(raw[1], list):
+ values = raw[1]
+ i = 10
+ else:
+ values.append(raw[1])
+
+
+ # Check if we can batch up with the next message in queue
+ i += 1
+ if i < 10 and q.qsize() - len(msgids) >= 1:
+ nextraw = pickle.loads(q.peek()['message'])
+ nextsql = nextraw[0]
+ nextvalues = nextraw[1]
+ if sql == nextsql and isinstance(nextvalues, tuple):
+ continue
+
+ dbi(sql,values)
+
+ for id in msgids:
+ q.done(id)
+
+ table = sql.split(' ')[2]
+ num = 1 if isinstance(values, tuple) else len(values)
+ left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else ''
+ print("Processed", num, "item(s) for table", table + ".", left)
+
+ msgids = []
+ values = []
+ i=0
print("END")