Можно ли в asyncio сделать два ожидания по timeout

Изучаю asyncio , и понадобилось решить следующую задачу. Если время работы корутины более 3 сек, нужно завершить задачу. Остальные задачи нужно завершить после 10 сек (общее время работы программы 10 сек). Примерно такой код.

async def some_coro(time):
    print(f"Корутина {asyncio.current_task().get_name()} запустилась")
    try:
       await asyncio.sleep(time)
       print(f"Корутина {asyncio.current_task().get_name()} завершилась")
    except asyncio.CancelledError:
       print(f"Корутина {asyncio.current_task().get_name()} отменена")
    

async def wait_10():
    print('начало отсчета 10 сек')
    await asyncio.sleep(10)
    print ('10 сек прошло. Завершаем все задачи')
    for task in asyncio.all_tasks():
        try:
            task.cancel()
        except asyncio.CancelledError:
            pass

async def main():
   tasks = [asyncio.create_task(some_coro(12)), asyncio.create_task(some_coro(2)), asyncio.create_task(some_coro(15))]
    #await asyncio.create_task(wait_10())
   await asyncio.wait(tasks, timeout=5)

asyncio.run(main())  

Если я просто запускаю один asyncio.wait(tasks, timeout=5). то просто незавершенные задачи отменяются через 5 сек (но нет общего счетчика в 10 сек) Если я добавляю await asyncio.create_task(wait_10()), то такое ощущение, что timeout в asyncio.wait "уже не работает", и у меня все задачи завершаются после 10 сек, а не по таймауту 5 сек. Первое, хотелось бы понять для себя, почему так происходит? А второе, можно ли сделать "два отдельных" счетчика? Чувствую, что упускаю какие-то основы asyncio, и хотелось бы разобраться. Заранее спасибо.


Ответы (1 шт):

Автор решения: CrazyElf

Если я просто запускаю один asyncio.wait(tasks, timeout=5), то просто незавершенные задачи отменяются через 5 сек

Нет, отменяются не задачи, отменяется их ожидание через asyncio.wait. И возвращаются два списка: завершённых задач и не завершённых.

Если хотите, чтобы после timeout произошла отмена задач, используйте asyncio.wait_for, вот эта функция отменит задачи, которые будут работать дольше, чем timeout.

Если я добавляю await asyncio.create_task(wait_10()), то такое ощущение, что timeout в asyncio.wait "уже не работает", и у меня все задачи завершаются после 10 сек, а не по таймауту 5 сек.

Конечно, так и есть, потому что вы сначала ожидаете окончания работы задачи wait_10 и только после этого начинает работать следующие ожидание задач, а на тот момент все задачи вы уже убили-с в задаче wait_10. Кстати, вы там убиваете все задачи без разбора, включая саму задачу wait_10 и любые задачи в активном статусе, в том числе текущий await.

Вот вам пример кода, из которого может что-то будет более понятно. Только я тут main() запускаю немного по-другому, потому что работаю через Jupyter Notebook, там ваш способ запуска не работает.

Да, и заодно я исправил ошибки, неправильные названия вызываемых функций и т.п.

import asyncio

async def main_task(time):
    print(f"Корутина {asyncio.current_task().get_name()} запустилась")
    try:
       await asyncio.sleep(time)
       print(f"Корутина {asyncio.current_task().get_name()} завершилась")
    except asyncio.CancelledError:
       print(f"Корутина {asyncio.current_task().get_name()} отменена")
    

async def wait_10():
    print('начало отсчета 10 сек')
    await asyncio.sleep(10)
    print ('10 сек прошло. Завершаем все задачи')
    for task in asyncio.all_tasks():
        try:
            name = task.get_name()
            if task.get_name().startswith('MT'):
                print(f'Отменяем задачу {name}')
                task.cancel()
        except asyncio.CancelledError:
            pass

def show_stats(tasks):
    print(f'Запущено задач: {sum(not t.done() for t in tasks)}')

async def main():
    tasks = [asyncio.create_task(main_task(12), name='MT12'), 
             asyncio.create_task(main_task(2), name='MT2'), 
             asyncio.create_task(main_task(15), name='MT15')]
    kill_task = asyncio.create_task(wait_10())
    show_stats(tasks)
    await kill_task
    show_stats(tasks)
    await asyncio.wait(tasks)
    print('Завершаем работу')

await main()

Вывод:

Запущено задач: 3
Корутина MT12 запустилась
Корутина MT2 запустилась
Корутина MT15 запустилась
начало отсчета 10 сек
Корутина MT2 завершилась
10 сек прошло. Завершаем все задачи
Отменяем задачу MT15
Отменяем задачу MT12
Корутина MT15 отменена
Корутина MT12 отменена
Запущено задач: 0
Завершаем работу

Я дал задачам имена, чтобы отличать те задачи, которые нужно принудительно завершать, от других задач, которые лучше не трогать.

Добавил статистику запущенных задач. В конце ожидание задач уже так, на всякий случай. На этот момент все задачи уже принудительно завершены.

→ Ссылка