Python Падение производительности обработки данных в потоках. При выставлении проверки выполнения работы потоками

Дорогие форумчане, есть следующий код обрабатывающий данные получаемые из SQL, в основном потоке делаются запросы к базе, полученные данные добавляются в очередь1, из которой поток первичной обработки забирает данные, производит первичную обработку и складывает в очередь2 для потока дополнительной обработки, который в свою очередь забирает данные оттуда обрабатывает их и структурирует для импорта в базу SQL, при накоплении определенного объёма происходит импорт в базу и ставиться метка последнего обработанного идентификатора данных которая помещается в очередь откуда эту метку забирает главный поток. Все работает, но поскольку главный поток делает запросы к базе в непрерывном цикле while True, то количество запросов к базе большое и следующий запрос не успевает получить данные последнего идентификатора обработанных данных и запрашивает старые данные, что создает бессмысленную повторную обработку данных, а в случае отсутствия новых первичных данных для обработки эти запросы становятся безрезультативными, возникает вопрос как оптимизировать главный поток, что-бы он не лез просто так в базу, т.е. если там нет данных главный поток становился на паузу. Этот код с выполнением запроса только если очередь не пустая, но данная проверка очень существенно замедляет выполнения всего процесса, скорость обработки падает на порядок. Посоветуйте как нейтрализовать данную проблему, думал установить time.sleep(х), но судя по логам усыпляет в том числе потоки обработки, а не только главный поток, вопрос что я делаю не так и как сделать правильно.

import queue 
import pyodbc
import keyboard
from threading import Thread, Lock
from time import sleep
start_max_id = 0
stop_thread = False
lock = Lock()

def exit_program():
    Lock.acquire()
    try:
        global stop_thread
        stop_thread = True
    finally:
        Lock.release()

class Worker1(Thread):
    def __init__(self, queue1, queue2):
        super().__init__()
        self.queue1 = queue1
        self.queue1 = queue2



    def run(self):
        while True:
            try:
                id, data = self.work_queue.get()
                self.process(id, data)
            finally:
                self.work_queue.task_done()
                Lock.acquire()
                try:
                    if not stop_thread:
                        pass
                    else:
                        break
                finally:
                    Lock.release()

    def process(self, id, data):
        data = data + 1
        self.queue2.put((id, data))

class Worker2(Thread):
    def __init__(self, queue2, queue3=None, base_config=None, insertchunk=50):
    self.queue2 = queue2
    self.queue3 = queue3
    self.base_config = base_config
    self.insertchunk = insertchunk
    self.datastore = []
    self.datacount = 0
    self.max_id = None

    def run(self):
        while True:
            try:
                id, data = self.queue2.get()
                self.process(id, data)
                if self.datacount % self.insertchunk == 0:
                    self.input_to_base()
            finally:
                self.queue2.task_done()
                Lock.acquire()
                try:
                    if not stop_thread:
                        pass
                    else:
                        break
                finally:
                    Lock.release()

    def process(self, id, data):
        data *= 10
        self.datastore.append((id, data))
        self.max_id = id
        self.datacount +=1

    def input_to_base(self)
        out_db = pyodbc.connect("DRIVER={" + self.base_config['driver'] +
                                "};SERVER=" + self.base_config['servername'] +
                                ";DATABASE=" + self.base_config['basename'] +
                                ";UID=" + self.base_config['username'] +
                                ";PWD=" + self.base_config['password']
                                )
        with out_db.cursor() as cursor:
            cursor.fast_executemany = True
            cursor.execute("INSERT INTO tablename (id, data) values (?,?)", self.datastore)
            cursor.commit()
            self.queue3.put(max_id)
def main():
    chunksize = 100
    primary_base_config = {'driver': 'driver',
                           'servername': 'servername',        
                           'basename':'basename',
                           'username':'username',
                           'password':'password',
                          }
    out_base_config = {'driver': 'driver',
                       'servername': 'servername',        
                       'basename':'basename',
                       'username':'username',
                       'password':'password',
                      }
    queue1 = queue.Queue()
    queue2 = queue.Queue()
    queue3 = queue.Queue()

    worker1 = Worker1(queue1, queue2)
    worker1.daemon = True
    worker1.start()

    worker2 = Worker2(queue2, queue3, base_config=out_base_config)
    worker1.daemon = True
    worker1.start()
    queue3.put(start_max_id)
    keyboard.add_hotkey('ctrl+q', lambda: exit_program())
    primary_base = pyodbc.connect("DRIVER={" + self.base_config['driver'] +
                                           "};SERVER=" + self.base_config['servername']+
                                           ";DATABASE=" + self.base_config['basename'] +
                                           ";UID=" + self.base_config['username'] +
                                           ";PWD=" + self.base_config['password']
                                              )
    while True:
        while not queue3.empty():
            try:
                start_max_id = queue3.get()
                with primary_base.cursor() as primary_cursor:
                    primary_cursor.execute("SELECT TOP (" + str(chunksize) +
                                           ") id, data from table where id >= "                               
                                           + str(start_max_id) + " ;")
                    catch_data = primary_cursor.fetchall()
                    for row in catch_data:
                        queue1.put(row)
            finally:
                queue3.task_done()
        if stop_thread:
            sys.exit("...)
        "sleep(10)"

    queue3.join()
    queue2.join()
    queue1.join()

if __name__ == "__main__":
    main()

Так же было бы хорошо, что-бы кто-нибудь объяснил на моём примере с методами join очередей, порядок их постановки какое значение имеет, т.к в этом плане я плаваю и хотелось бы разобраться.


Ответы (0 шт):