aboutsummaryrefslogblamecommitdiffstats
path: root/scripts/queue_runner.py
blob: fd886002684b3d19f771abc7db7e6ed46b42f5a1 (plain) (tree)
1
2
3
4
5
6
7
8
9
                      
                                         

         

              
           
 
                               
                              
 





                                                        

            
                               





                                                           
                                    

         




           

            
                                  
                         
                               
                

                 
                  

                                       








                                                                            


                                                             
                                                   




                                                            
                    
 

                        
 
                                     
                                                         
                                                                           



                                                                   
         

            
#!/usr/bin/env python3
""" move items from queue to database """

import os
import pickle
import sqlite3
import time

from common.postgres import dbi
from litequeue import SQLQueue

QUEUE_DB = os.environ.get("el_QUEUE_db", "litequeue.db")
QUEUE_DIR = os.environ.get("el_QUEUE_dir", "queue")
QUEUE_DB = QUEUE_DIR + "/" + QUEUE_DB
QUEUE_SLEEP = int(
    os.environ.get("el_QUEUE_sleep", 15)
)  # Default sleep 15 seconds when queue empty

# Unlock all
con = sqlite3.connect(QUEUE_DB)
cur = con.cursor()
cur.execute("UPDATE Queue SET status = 0 WHERE status = 1")
con.commit()
con.close()

# Open Queue
q = SQLQueue(QUEUE_DB, maxsize=None)
q.prune()

msgids = []
values = []

i = 0

# Event loop
while True:
    # Just sleep if queue is empty
    if q.empty() is True:
        time.sleep(QUEUE_SLEEP)
        continue

    # get message
    task = q.pop()
    raw = pickle.loads(task["message"])
    msgids.append(task["message_id"])
    sql = raw[0]

    # if the queue-item already is a batch-job, don't do any more batch-work
    if isinstance(raw[1], list):
        values = raw[1]
        i = 10
    else:
        values.append(raw[1])

    # Check if we can batch up with the next message in queue
    i += 1
    if i < 10 and q.qsize() - len(msgids) >= 1:
        nextraw = pickle.loads(q.peek()["message"])
        nextsql = nextraw[0]
        nextvalues = nextraw[1]
        if sql == nextsql and isinstance(nextvalues, tuple):
            continue

    dbi(sql, values)

    for msgid in msgids:
        q.done(msgid)

    table = sql.split(" ")[2].strip()
    num = 1 if isinstance(values, tuple) else len(values)
    left = str(q.qsize()) + " items left in queue" if q.qsize() > 0 else ""
    print("Processed", num, "item(s) for table", table + ".", left)

    msgids = []
    values = []
    i = 0

print("END")