Как запустить aioschedule в несколько потоков

import asyncio
import time
import aioschedule
from datetime import datetime


async def work_1():
    print('work1', datetime.now())
    await asyncio.sleep(30)
    
    
async def work_2():
    print('work2', datetime.now())
                

def main():    
    aioschedule.every(5).seconds.do(work_1)
    aioschedule.every(10).seconds.do(work_2)

    loop = asyncio.get_event_loop()
    while True:
        loop.run_until_complete(aioschedule.run_pending())
        time.sleep(1)
        
        
if __name__ == "__main__":
    main()

Вот код, в теории он должен запускать функции параллельно, т.е. work_1 и work_2 должны работать независимо друг от друга. Но работает он так, что запуская work_1 работа скрипта блокируется как раз на эти 30 секунд, и только после них work_2 и work_1 начинают работать

Это вывод терминала, если запустить скрипт

work1 2024-05-28 18:12:34.051410
work2 2024-05-28 18:13:05.055715
work1 2024-05-28 18:13:09.072192

При этом всем, нужно чтобы work1 и work2 не ждали завершения друг друга, и себя в том числе. Условно work_1 должен запускаться every(5).seconds А так он будет запускаться каждые 5 секунд и еще 30 секунд сверху внутри функции


Ответы (1 шт):

Автор решения: insolor

Похоже, что aioschedule в подобных случаях работает некорректно. Это не в вашем коде проблема, а баг в этом модуле.

Тут не нужны отдельные потоки. Можно или использовать другой модуль (в котором нет такого бага), или добавить костыль для текущего модуля, чтобы работало нормально (создавать отдельный asyncio таск, который будет работать в фоне, см. вторую часть ответа).

Как альтернативу можно использовать другой модуль - aioscheduler, но каждый таск нужно будет запускать заново вручную, там нет функциональности "повторять каждые N секунд".

Пример кода:

import asyncio
from aioscheduler import TimedScheduler
from datetime import datetime, timedelta

start_time = datetime.now()


async def work_1(scheduler):
    print('work1', (datetime.now() - start_time).total_seconds())
    scheduler.schedule(work_1(scheduler), datetime.now() + timedelta(seconds=5))
    await asyncio.sleep(30)


async def work_2(scheduler):
    print('work2', (datetime.now() - start_time).total_seconds())
    scheduler.schedule(work_2(scheduler), datetime.now() + timedelta(seconds=10))


async def main():
    scheduler = TimedScheduler(prefer_utc=False)
    scheduler.start()
    
    scheduler.schedule(work_1(scheduler), datetime.now() + timedelta(seconds=5))
    scheduler.schedule(work_2(scheduler), datetime.now() + timedelta(seconds=10))

    await scheduler.loop()


if __name__ == "__main__":
    asyncio.run(main())

Вывод:

work1 5.005256
work2 10.006278
work1 10.006345
work1 15.012025
work2 20.011543
work1 20.013419

Как костыль для aioschedule (который у вас в вопросе используется) - можно запускать длительные асинхронные задачи в отдельных asyncio тасках (не в потоках, если у вас действительно асинхронный код, то потоки не нужны).

Пример кода:

import asyncio
import time
import aioschedule
from datetime import datetime


start_time = datetime.now()


async def long_task():
    await asyncio.sleep(30)
    print('long task finished', (datetime.now() - start_time).total_seconds())


async def work_1():
    print('work1', (datetime.now() - start_time).total_seconds())
    asyncio.create_task(long_task())


async def work_2():
    print('work2', (datetime.now() - start_time).total_seconds())


def main():
    aioschedule.every(5).seconds.do(work_1)
    aioschedule.every(10).seconds.do(work_2)

    loop = asyncio.get_event_loop()
    while True:
        loop.run_until_complete(aioschedule.run_pending())
        time.sleep(1)


if __name__ == "__main__":
    main()

Вывод:

work1 5.006532
work1 10.013955
work2 10.014098
work1 15.02036
work1 20.027277
work2 20.027417
work1 25.03384
work2 30.04056
work1 30.04067
work1 35.047351
long task finished 35.047518
work1 40.053909
work2 40.054381
long task finished 40.054475
→ Ссылка