Python, aiogram: прошу помочь с передачей объекта класса в aiogram
Пытаюсь создать асинхронное приложение, которое будет управляться через aiogram. Для этого я создал класс AsyncioManager.
class AsyncioManager:
def __init__(self):
self.tasks = []
def add_task(self, coroutine):
self.tasks.append(coroutine)
async def run_all_tasks(self):
await asyncio.gather(*self.tasks)
async def console_menu(self):
while True:
code = await asyncio.to_thread(input, 'Code? ')
elif code == 'tasks':
print(self.tasks)
elif code == 'exit':
break
async def main(self):
self.add_task(self.console_menu())
await self.run_all_tasks()
Затем я запускаю этот класс в главном файле:
manager = AsyncioManager()
manager.add_task(start())
manager.add_task(aiogram_bot.main(manager))
asyncio.run(manager.main())
в функции start() у меня реализовано получение некоторой информации с помощью сокетов, это не так важно. Сам бот aiogram выглядит так:
import asyncio
from aiogram import Bot, Dispatcher
from aiogram.enums.parse_mode import ParseMode
from aiogram.fsm.storage.memory import MemoryStorage
from aiorgramcode.handlers import bot_messages, user_commands
from aiogram.dispatcher import FSMContext
bot = Bot(token=TELEGRAM_API_TOKEN, parse_mode=ParseMode.HTML)
dp = Dispatcher(storage=MemoryStorage())
async def main(manager: asyncio_manager.AsyncioManager):
dp.include_router(bot_messages.router)
dp.startup.register(on_startup)
dp.shutdown.register(on_shutdown)
await bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types())
как видите, я пробую передать manager и получить в main. Но как дальше работать и передать в роутеры я не понимаю. Роуты импортирую, хранятся они в отдельных файлах
from aiorgramcode.handlers import bot_messages, user_commands
Я хочу использовать в роутах функционал по добавлению новых задач, т.е. использовать manager.add_task(some_coro()) к примеру.
async def example():
print(1)
asyncio.sleep(1)
@router.message(CommandStart())
async def start_handler(msg: Message):
await msg.answer(f"Привет! Выбери действие", reply_markup=reply.main_kb)
# await manager.add_task(example())
Как это можно реализовать, может быть нужно по другому реализовать всю структуру. Спасибо.