Как установить и получить цикл событий в потоке

Передаю в отдельный поток цикл событий, устанавливаю его в функции, получаю, указано что он запущен, но при попытке получить его с помощью asyncio.get_running_loop() падает ошибка RuntimeError: no running event loop

Пример кода



    def test_func(loop):
        asyncio.set_event_loop(loop)
        l = asyncio.get_event_loop()
        l.is_running() # True
        result = asyncio.get_running_loop() # RuntimeError: no running event loop
    
    
    async def main():
        loop = asyncio.get_running_loop()
        t = threading.Thread(target=test_func, args=(loop,))
        t.start()
    
    
    asyncio.run(main())

Не могу понять почему не работает asyncio.get_running_loop()

Python 3.9.13


Ответы (1 шт):

Автор решения: quswadress

Согласно документации asyncio.get_running_loop():

Эта функция может быть вызвана только из корутины или из callback.

Когда функция test_func таковым не является.

Как вариант, вы можете сделать функцию test_func асинхронной и использовать run_in_executor для его вызова вместо threading

async def test_func(loop):
    ...  # Код


async def main():
    loop = asyncio.get_running_loop()
    task = await loop.run_in_executor(None, test_func, loop)
    await task  # thread.join()
→ Ссылка