Asyncio.sleep() после, например, 10 или 50 запросов aiohttp в asyncio.gather()

Нужна возможность делать таймауты после определенного количества запросов в asyncio.gather() Пытался обойтись таким решением -

async def delay_wrapper(delay, task):
    print(f'Resp with delay {delay}')
    await asyncio.sleep(delay)
    return await task


async def async_prepare(source: str):
    while True:
        try:
            async with aiohttp.ClientSession(
                    connector=aiohttp.TCPConnector(force_close=True, verify_ssl=False)) as session:
                async with session.get(source, headers=HEADEARS) as response:
                    response_result = await response.read()
                    return response_result
        except aiohttp.ClientConnectionError:
            print("Oops - ClientConnectionError - the connection was dropped before we finished")

await asyncio.gather(*[delay_wrapper(delay, async_prepare(source=url)) for url in self.urls], return_exceptions=True)

Но в итоге сначала отрабабывает sleep, а уже после них скопом идут выполняться запросы... А мне нужна поочердность... А еще лучше. чтобы sleep() выполнялся после некоторого количества запросов, например после 30 или 50... Понимаю, что в лоб это можно реализовать разбив self.urls на чанки и отправлять в .gather() по частям со sleep() после каждой порции. Но хотелось бы более элегантное решение...


Ответы (1 шт):

Автор решения: eri

Ограничение на количество одновременных запросов, полезно в этом плане:

para_sem = asyncio.Semaphore(10) 
async def delay_wrapper(delay, task):
    async with para_sem:
        return await task

Вроде бы не то что нужно автору, но отпускать семафор вручную... Покажу на простом примере, чтоб не путать.

RATE = 50
rate_sem = asyncio.BoundedSemaphore(10)
async def delay_wrapper(delay, task):
    await rate_sem.acquire()
    return await task

async def releaser():
    while True:
        await asyncio.sleep(1/RATE)
        try:
            rate_sem.release() 
        except ValueError:
            pass 

rt = asyncio.create_task(releaser())
await asyncio.gather(*[delay_wrapper(delay, async_prepare(source=url)) for url in self.urls], return_exceptions=True)
rt.cancel() 

Запускается одновременно 10 запросов и каждые 1/50 секунды разрешается ещё один.

П.С. тут теряется обратная связь на окончание запросов. Нужно совместить 2 семафора получить 2 эффекта.

А по "флагам" посмотри другие примитивы из раздела https://docs.python.org/3/library/asyncio-sync.html

Можно подумать реализацию на очереди и воркерах.

→ Ссылка