Как получать данные одновременно из нескольких очередей asyncio.Queue?
У меня есть две очереди, условно:
queue1 = asyncio.LifoQueue()
queue2 = asyncio.LifoQueue()
В них асинхронно из двух задач добавляются данные через put
.
Я хочу получать в главном потоке данные из очереди.
Я делаю это так и все работает:
#главный поток
while True:
res1 = await self.queue1.get()
res2 = await self.queue2.get()
Проблема в том, что первая или вторая задача может класть данные в очередь с разным промежутком времени и из-за этого главный поток стопорится, я бы хотел получать none, если данных нет и идти к следующей очереди без ожидания.
Я пробовал получать данные через queue.no_wait()
и при except asyncio.queues.QueueEmpty
, возвращать None
, но так почему-то постоянно выводятся None
и данные так и не приходят
Ответы (1 шт):
Более "правильный" вариант: создаем таски для ожидания объектов из каждой очереди, ждем результата из любой очереди, в которой что-то появится, с помощью функции asyncio.wait
с параметром return_when=asyncio.FIRST_COMPLETED
(завершить ожидание, когда любой таск завершится).
import asyncio
import random
queue1 = asyncio.LifoQueue()
queue2 = asyncio.LifoQueue()
queues = [queue1, queue2]
async def queue_getter(queue_number: int, queue):
"""
Вспомогательная функция - получает значение из очереди
и возвращает его вместе с номером очереди,
чтобы мы понимали, откуда оно пришло
"""
return queue_number, await queue.get()
async def queue_processer():
waiting_tasks = {i: asyncio.create_task(queue_getter(i, queue)) for i, queue in enumerate(queues)}
while True:
done, _pending = await asyncio.wait(waiting_tasks.values(), return_when=asyncio.FIRST_COMPLETED)
for finished_task in done:
i, result = await finished_task
print(f"Got item {result} from queue {i}")
waiting_tasks[i] = asyncio.create_task(queue_getter(i, queues[i]))
async def producer(queue, delay):
while True:
await asyncio.sleep(delay)
await queue.put(random.randint(0, 100))
async def main():
producers = [asyncio.create_task(producer(queue1, 1)), asyncio.create_task(producer(queue2, 1.5))]
processor = asyncio.create_task(queue_processer())
tasks = producers + [processor]
await asyncio.gather(*tasks)
asyncio.run(main())
Более простой и костыльный способ — пытаться получать значения с помощью метода queue.get_nowait()
и добавить asyncio.sleep
с небольшой задержкой (без него не будет переключения на другие корутины):
import asyncio
import random
queue1 = asyncio.LifoQueue()
queue2 = asyncio.LifoQueue()
queues = [queue1, queue2]
async def queue_processer():
while True:
for i, queue in enumerate(queues):
try:
result = queue.get_nowait()
print(f"Got item {result} from queue {i}")
except asyncio.QueueEmpty:
pass
await asyncio.sleep(0.1)
... # Код ниже без изменений
asyncio.run(main())