Цикл в потоке asyncio с возможностью остановки
Нужно запускать цикл по нажатию Shift + 1 и останавливать по Shift + 2. Всё работает, кроме как Shift + 2 после запуска. То есть цикл блокирует дальнейшее исполнение. Как это исправить?
import asyncio
from pynput import keyboard
status = False
async def my_loop(): # цикл
global status
while status:
print('Testing')
await asyncio.sleep(8)
def my_loop_start():
global status
status = True
asyncio.run(my_loop()) # пуск цикла
def my_loop_stop():
global status
status = False
with keyboard.GlobalHotKeys({'<shift>+1': my_loop_start, '<shift>+2': my_loop_stop}) as hotkeys:
hotkeys.join()
Ответы (1 шт):
Автор решения: Amgarak
→ Ссылка
Ваш my_loop_start
блокирует основной поток, так что не удивительно, что программа не отрабатывает на Shift + 2 ?♀️
Вот хорошая статья, на основе которой я подкорректировал ваш код:
import asyncio
from pynput import keyboard
import threading
status = False
loop = asyncio.new_event_loop()
future = None
async def my_loop():
count = 0
while status:
print('Testing')
count += 1
await asyncio.sleep(1)
return count # Тут мне захотелось вернуть результат, но это не обязательно.
def my_loop_start():
global status, future
if not status:
status = True
future = asyncio.run_coroutine_threadsafe(my_loop(), loop)
def my_loop_stop():
global status
status = False
print(future.result())
if __name__ == '__main__':
thread = threading.Thread(target=loop.run_forever)
thread.start()
with keyboard.GlobalHotKeys({'<shift>+1': my_loop_start, '<shift>+2': my_loop_stop}) as hotkeys:
hotkeys.join()
Вывод:
Testing
Testing
Testing
Testing
4