Автоматическая отправка сообщений пользователю
Я написал бота, который ведёт БД с пользователями и связанными с ними дедлайнами.
Теперь я хочу реализовать следующую функцию: допустим, в 9:00 каждого утра бот пишет пользователю, какие у него установлены сегодня дедлайны.
Для реализации задуманного использую aiogram и aioschedule.
Так выглядит мой код:
async def schedule_deadline():
global USER_NAME
aioschedule.every().day.at('11:25').do(check_deadlines_daily)
while True:
print(USER_NAME)
await aioschedule.run_pending()
await asyncio.sleep(10)
async def main():
dp.include_router(router)
loop = asyncio.get_running_loop()
loop.create_task(schedule_deadline())
await dp.start_polling(bot)
if __name__ == '__main__':
asyncio.run(main())
В check_deadlines_daily
я вытаскиваю из таблицы те дедлайны, дата которых совпадает с сегодняшней, а затем отправляю пользователю.
Но проблема в том, что программа даже не входит в эту функцию ровно в то время, в которое указано в aioschedule.every().day.at('11:25').do(check_deadlines_daily)
.
Вместо этого программа перестаёт работать с ошибкой:
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<schedule_deadline() done, defined at C:\Users\asus\PycharmProjects\bot_v1\main.py:453> exception=TypeError('Passing coroutines is forbidden, use tasks explicitly.')>
Traceback (most recent call last):
File "C:\Users\asus\PycharmProjects\bot_v1\main.py", line 458, in schedule_deadline
await aioschedule.run_pending()
File "C:\Users\asus\PycharmProjects\bot_v1\.venv\Lib\site-packages\aioschedule\__init__.py", line 544, in run_pending
await default_scheduler.run_pending()
File "C:\Users\asus\PycharmProjects\bot_v1\.venv\Lib\site-packages\aioschedule\__init__.py", line 111, in run_pending
return await asyncio.wait(jobs, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\asus\AppData\Local\Programs\Python\Python312\Lib\asyncio\tasks.py", line 461, in wait
raise TypeError("Passing coroutines is forbidden, use tasks explicitly.")
TypeError: Passing coroutines is forbidden, use tasks explicitly.
В чём может быть проблема?
Ответы (1 шт):
Используй apschedule
from apschedule import AsyncIOScheduler
async def deadline():
scheduler = AsyncIOScheduler()
scheduler.add_job(check_deadlines_daily, hour=11, minute=25)
scheduler.start()
async def on_startup(_):
loop = asyncio.get_event_loop()
loop.create_task(deadline())
Думаю так будет легче