Как установить и получить цикл событий в потоке
Передаю в отдельный поток цикл событий, устанавливаю его в функции, получаю, указано что он запущен, но при попытке получить его с помощью asyncio.get_running_loop() падает ошибка RuntimeError: no running event loop
Пример кода
def test_func(loop):
asyncio.set_event_loop(loop)
l = asyncio.get_event_loop()
l.is_running() # True
result = asyncio.get_running_loop() # RuntimeError: no running event loop
async def main():
loop = asyncio.get_running_loop()
t = threading.Thread(target=test_func, args=(loop,))
t.start()
asyncio.run(main())
Не могу понять почему не работает asyncio.get_running_loop()
Python 3.9.13
Ответы (1 шт):
Автор решения: quswadress
→ Ссылка
Согласно документации asyncio.get_running_loop():
Эта функция может быть вызвана только из корутины или из callback.
Когда функция test_func таковым не является.
Как вариант, вы можете сделать функцию test_func асинхронной и использовать run_in_executor для его вызова вместо threading
async def test_func(loop):
... # Код
async def main():
loop = asyncio.get_running_loop()
task = await loop.run_in_executor(None, test_func, loop)
await task # thread.join()