diff options
Diffstat (limited to '')
-rw-r--r-- | common/queue.py | 4 | ||||
-rw-r--r-- | queue2pgsql.py | 44 |
2 files changed, 44 insertions, 4 deletions
diff --git a/common/queue.py b/common/queue.py index abdfb96..e567fb4 100644 --- a/common/queue.py +++ b/common/queue.py @@ -16,5 +16,7 @@ def dbi(sql,values,**kwargs): q.put(pickle.dumps([sql, values])) if verbose is True: - print("Inserted into queue") + table = sql.split(' ')[2] + num = 1 if isinstance(values, tuple) else len(values) + print("Inserted", num, "item(s) into queue for", table) return True diff --git a/queue2pgsql.py b/queue2pgsql.py index f5be1d9..436dcb8 100644 --- a/queue2pgsql.py +++ b/queue2pgsql.py @@ -22,15 +22,53 @@ con.close() q = SQLQueue(QUEUE_DB, maxsize=None) q.prune() +msgids = [] +values = [] + +i = 0 + # Event loop while True: + # Just sleep if queue is empty if q.empty() == True: time.sleep(10) continue + + # get message task = q.pop() raw = pickle.loads(task['message']) - dbi(raw[0],raw[1]) - print("Processed " + str(task['message_id']) + ". " + str(q.qsize() - 1) + " left") - q.done(task['message_id']) + msgids.append(task['message_id']) + sql = raw[0] + + # if the queue-item already is a batch-job, don't do any more batch-work + if isinstance(raw[1], list): + values = raw[1] + i = 10 + else: + values.append(raw[1]) + + + # Check if we can batch up with the next message in queue + i += 1 + if i < 10 and q.qsize() - len(msgids) >= 1: + nextraw = pickle.loads(q.peek()['message']) + nextsql = nextraw[0] + nextvalues = nextraw[1] + if sql == nextsql and isinstance(nextvalues, tuple): + continue + + dbi(sql,values) + + for id in msgids: + q.done(id) + + table = sql.split(' ')[2] + num = 1 if isinstance(values, tuple) else len(values) + left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else '' + print("Processed", num, "item(s) for table", table + ".", left) + + msgids = [] + values = [] + i=0 print("END") |