Как получать данные одновременно из нескольких очередей asyncio.Queue?

У меня есть две очереди, условно:

queue1 = asyncio.LifoQueue()
queue2 = asyncio.LifoQueue()

В них асинхронно из двух задач добавляются данные через put. Я хочу получать в главном потоке данные из очереди. Я делаю это так и все работает:

#главный поток
while True:
    res1 = await self.queue1.get()
    res2 = await self.queue2.get()

Проблема в том, что первая или вторая задача может класть данные в очередь с разным промежутком времени и из-за этого главный поток стопорится, я бы хотел получать none, если данных нет и идти к следующей очереди без ожидания.

Я пробовал получать данные через queue.no_wait() и при except asyncio.queues.QueueEmpty, возвращать None, но так почему-то постоянно выводятся None и данные так и не приходят


Ответы (1 шт):

Автор решения: insolor

Более "правильный" вариант: создаем таски для ожидания объектов из каждой очереди, ждем результата из любой очереди, в которой что-то появится, с помощью функции asyncio.wait с параметром return_when=asyncio.FIRST_COMPLETED (завершить ожидание, когда любой таск завершится).

import asyncio
import random

queue1 = asyncio.LifoQueue()
queue2 = asyncio.LifoQueue()
queues = [queue1, queue2]


async def queue_getter(queue_number: int, queue):
    """
    Вспомогательная функция - получает значение из очереди
    и возвращает его вместе с номером очереди,
    чтобы мы понимали, откуда оно пришло
    """
    return queue_number, await queue.get()


async def queue_processer():
    waiting_tasks = {i: asyncio.create_task(queue_getter(i, queue)) for i, queue in enumerate(queues)}
    while True:
        done, _pending = await asyncio.wait(waiting_tasks.values(), return_when=asyncio.FIRST_COMPLETED)
        for finished_task in done:
            i, result = await finished_task
            print(f"Got item {result} from queue {i}")
            waiting_tasks[i] = asyncio.create_task(queue_getter(i, queues[i]))


async def producer(queue, delay):
    while True:
        await asyncio.sleep(delay)
        await queue.put(random.randint(0, 100))


async def main():
    producers = [asyncio.create_task(producer(queue1, 1)), asyncio.create_task(producer(queue2, 1.5))]
    processor = asyncio.create_task(queue_processer())
    tasks = producers + [processor]
    await asyncio.gather(*tasks)


asyncio.run(main())

Более простой и костыльный способ — пытаться получать значения с помощью метода queue.get_nowait() и добавить asyncio.sleep с небольшой задержкой (без него не будет переключения на другие корутины):

import asyncio
import random

queue1 = asyncio.LifoQueue()
queue2 = asyncio.LifoQueue()
queues = [queue1, queue2]


async def queue_processer():
    while True:
        for i, queue in enumerate(queues):
            try:
                result = queue.get_nowait()
                print(f"Got item {result} from queue {i}")
            except asyncio.QueueEmpty:
                pass
        
        await asyncio.sleep(0.1)


...  # Код ниже без изменений


asyncio.run(main())
→ Ссылка