Python Падение производительности обработки данных в потоках. При выставлении проверки выполнения работы потоками
Дорогие форумчане, есть следующий код обрабатывающий данные получаемые из SQL, в основном потоке делаются запросы к базе, полученные данные добавляются в очередь1, из которой поток первичной обработки забирает данные, производит первичную обработку и складывает в очередь2 для потока дополнительной обработки, который в свою очередь забирает данные оттуда обрабатывает их и структурирует для импорта в базу SQL, при накоплении определенного объёма происходит импорт в базу и ставиться метка последнего обработанного идентификатора данных которая помещается в очередь откуда эту метку забирает главный поток. Все работает, но поскольку главный поток делает запросы к базе в непрерывном цикле while True, то количество запросов к базе большое и следующий запрос не успевает получить данные последнего идентификатора обработанных данных и запрашивает старые данные, что создает бессмысленную повторную обработку данных, а в случае отсутствия новых первичных данных для обработки эти запросы становятся безрезультативными, возникает вопрос как оптимизировать главный поток, что-бы он не лез просто так в базу, т.е. если там нет данных главный поток становился на паузу. Этот код с выполнением запроса только если очередь не пустая, но данная проверка очень существенно замедляет выполнения всего процесса, скорость обработки падает на порядок. Посоветуйте как нейтрализовать данную проблему, думал установить time.sleep(х), но судя по логам усыпляет в том числе потоки обработки, а не только главный поток, вопрос что я делаю не так и как сделать правильно.
import queue
import pyodbc
import keyboard
from threading import Thread, Lock
from time import sleep
start_max_id = 0
stop_thread = False
lock = Lock()
def exit_program():
Lock.acquire()
try:
global stop_thread
stop_thread = True
finally:
Lock.release()
class Worker1(Thread):
def __init__(self, queue1, queue2):
super().__init__()
self.queue1 = queue1
self.queue1 = queue2
def run(self):
while True:
try:
id, data = self.work_queue.get()
self.process(id, data)
finally:
self.work_queue.task_done()
Lock.acquire()
try:
if not stop_thread:
pass
else:
break
finally:
Lock.release()
def process(self, id, data):
data = data + 1
self.queue2.put((id, data))
class Worker2(Thread):
def __init__(self, queue2, queue3=None, base_config=None, insertchunk=50):
self.queue2 = queue2
self.queue3 = queue3
self.base_config = base_config
self.insertchunk = insertchunk
self.datastore = []
self.datacount = 0
self.max_id = None
def run(self):
while True:
try:
id, data = self.queue2.get()
self.process(id, data)
if self.datacount % self.insertchunk == 0:
self.input_to_base()
finally:
self.queue2.task_done()
Lock.acquire()
try:
if not stop_thread:
pass
else:
break
finally:
Lock.release()
def process(self, id, data):
data *= 10
self.datastore.append((id, data))
self.max_id = id
self.datacount +=1
def input_to_base(self)
out_db = pyodbc.connect("DRIVER={" + self.base_config['driver'] +
"};SERVER=" + self.base_config['servername'] +
";DATABASE=" + self.base_config['basename'] +
";UID=" + self.base_config['username'] +
";PWD=" + self.base_config['password']
)
with out_db.cursor() as cursor:
cursor.fast_executemany = True
cursor.execute("INSERT INTO tablename (id, data) values (?,?)", self.datastore)
cursor.commit()
self.queue3.put(max_id)
def main():
chunksize = 100
primary_base_config = {'driver': 'driver',
'servername': 'servername',
'basename':'basename',
'username':'username',
'password':'password',
}
out_base_config = {'driver': 'driver',
'servername': 'servername',
'basename':'basename',
'username':'username',
'password':'password',
}
queue1 = queue.Queue()
queue2 = queue.Queue()
queue3 = queue.Queue()
worker1 = Worker1(queue1, queue2)
worker1.daemon = True
worker1.start()
worker2 = Worker2(queue2, queue3, base_config=out_base_config)
worker1.daemon = True
worker1.start()
queue3.put(start_max_id)
keyboard.add_hotkey('ctrl+q', lambda: exit_program())
primary_base = pyodbc.connect("DRIVER={" + self.base_config['driver'] +
"};SERVER=" + self.base_config['servername']+
";DATABASE=" + self.base_config['basename'] +
";UID=" + self.base_config['username'] +
";PWD=" + self.base_config['password']
)
while True:
while not queue3.empty():
try:
start_max_id = queue3.get()
with primary_base.cursor() as primary_cursor:
primary_cursor.execute("SELECT TOP (" + str(chunksize) +
") id, data from table where id >= "
+ str(start_max_id) + " ;")
catch_data = primary_cursor.fetchall()
for row in catch_data:
queue1.put(row)
finally:
queue3.task_done()
if stop_thread:
sys.exit("...)
"sleep(10)"
queue3.join()
queue2.join()
queue1.join()
if __name__ == "__main__":
main()
Так же было бы хорошо, что-бы кто-нибудь объяснил на моём примере с методами join очередей, порядок их постановки какое значение имеет, т.к в этом плане я плаваю и хотелось бы разобраться.