Как запустить 2 потока одновременно?
Consumer из rabbitmq считывает данные из записывает его в jobs. В данной реализации consumer работает, а метод do_stuff никак не реагирует
import threading
jobs = Queue()
def do_stuff(q):
while not q.empty():
logger.info('received new task')
value = q.get()
url = value['download_link']
driver = uc.Chrome(headless = False)
driver = webdriver.Chrome()
driver.execute_cdp_cmd("Page.setDownloadBehavior", params)
driver.set_window_size(200, 400)
driver.get(url)
wait_page_download_finished(driver)
waiting = WebDriverWait(driver, 300, 1).until(every_downloads_chrome)
table.insert(q)
q.task_done()
for i in range(3):
worker = threading.Thread(target=do_stuff, args=(jobs,))
worker.daemon = True
worker.start()
worker.join(0)
def callbackFunctionForQueueC(ch,method,properties,body):
logger.info('received message')
message = json.loads(body.decode('utf-8'))
jobs.put(message)
connection= pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel= connection.channel()
channel.basic_consume(queue='item', on_message_callback=callbackFunctionForQueueC, auto_ack=True)
t1 = Thread(target= channel.start_consuming)
t1.start()
t1.join(0)
Ответы (2 шт):
while not q.empty():
выход из потока сразу, ведь поток запустился с пустой очередью. используйте пустую посылку для завершения тредов например
def do_stuff(q):
while True:
logger.info('received new task')
value = q.get()
if not value:
q.task_done()
break
url = value['download_link']
driver = uc.Chrome(headless = False)
driver = webdriver.Chrome()
driver.execute_cdp_cmd("Page.setDownloadBehavior", params)
driver.set_window_size(200, 400)
driver.get(url)
wait_page_download_finished(driver)
waiting = WebDriverWait(driver, 300, 1).until(every_downloads_chrome)
table.insert(q)
q.task_done()
Но так как у вас сообщения берутся с внешнего источника, то эти пустые сообщения вставляешь через обработчик для модуля signal для плавного завершения работы
Но ведь в документации на Queue есть пример, как правильно работать с очередью:
import threading
import queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()
# Send thirty task requests to the worker.
for item in range(30):
q.put(item)
# Block until all tasks are done.
q.join()
print('All work completed')
Основной смысл тут в том, что не нужно ориентироваться на empty(), а нужно делать task_done(), а в основном коде сделать join() к очереди и тогда, пока очередь не разгребётся, это всё будет работать. Хотя если у вас вдруг очередь разгребается быстрее, чем пополняется, то и этот вариант не сработает как нужно. Возможно, тогда вам нужно договориться с собой о том, что когда пополнение очереди закончится, вы положите в неё специальный элемент, например None, за наличием которого и будете следить при разгребании очереди, чтобы закончить её обработку, когда достанете этот элемент из очереди.