Асинхронность python не работает таймер
from asyncio import create_task, get_event_loop, sleep
class Timer:
def __init__(self, color: bool):
self.seconds: int = 900
self.expires: bool = color
self.color = lambda x: "белых" if color else "чёрных"
get_event_loop().run_until_complete(self.start_timer())
async def start_timer(self):
"""Запускает цикл событий"""
await create_task(self.update_timer())
async def update_timer(self):
"""Ведёт время отсчёта"""
while self.seconds:
if self.expires:
self.seconds -= 1
await sleep(1)
assert self.seconds, f"Закончилось время у {self.color}. Противоположный игрок победил"
def flip_the_timer(self):
"""Меняет положение таймера (активный/деактивный)"""
self.expires = not self.expires
def get_timeset(self) -> str:
""":return время в виде таймера на электронных часах"""
return f"{round(self.seconds / 60)}:{self.seconds % 60}"
Проблемный код выше
У меня всегда были проблемы с асинхронностью, но всегда по тихоньку прорывался, но на этой неделе встретил какой-то убийственный случай
Таймер запускается, ошибок не ловлю, но даже в силу асинхронности код не идёт дальше, а тупо ждёт таймер, при этом конкурентно должен запуститься второй таймер, но он не появляется
Создал таску, запустил, как только не пробовал решить проблему.
Товарищи программисты дружащие с асинхронностью, помогите пожалуйста!
Ответы (1 шт):
Автор решения: gord1402
→ Ссылка
Вот пример использования asyncio.loop:
from asyncio import sleep, new_event_loop
async def task1():
for i in range(15):
print(f'task {i}')
await sleep(1)
async def main():
print('main1')
await sleep(2)
print('main2')
await sleep(2)
print('main3')
await sleep(2)
loop = new_event_loop()
task_main = loop.create_task(main())
task_1 = loop.create_task(task1())
loop.run_until_complete(task_1)
Результат:
main1
task 0
task 1
main2
task 2
task 3
main3
task 4
task 5
task 6
task 7
task 8
task 9
task 10
task 11
task 12
task 13
task 14
Process finished with exit code 0
Но в вашем случае я думаю лучше подходит threading:
import time
from threading import Thread
class Timer:
def __init__(self, color: bool):
self.seconds: int = 900
self.expires: bool = color
self.color = lambda x: "белых" if color else "чёрных"
self.thread = Thread(target=self.update_timer)
self.thread.start()
def update_timer(self):
"""Ведёт время отсчёта"""
while self.seconds:
if self.expires:
self.seconds -= 1
time.sleep(1)
assert self.seconds, f"Закончилось время у {self.color}. Противоположный игрок победил"
def flip_the_timer(self):
"""Меняет положение таймера (активный/деактивный)"""
self.expires = not self.expires
def get_timeset(self) -> str:
""":return время в виде таймера на электронных часах"""
return f"{round(self.seconds / 60)}:{self.seconds % 60}"
timer = Timer(True)
print('Do something....')