Как разработать архитектуру плагинов на aiogram3?
Основная проблема. Есть FastApi приложение которое при запуске инициирует экземпляры ботов. Принимает вебхуки и направляет апдейты в нужный инстанс. Вопрос, как сделать возможность разработки и подключения плагинов для ботов. Т.е банально логика меню при команде /start будет плагином, вплоть до middleware, и соответственно как настроить взаимодействие между плагинами
Не могу подобрать правильное архитектурное решение(
Нашел старинный репозиторий, но вероятно он уже не актуален: https://github.com/Forevka/AiogramPluginSystem
Пример экземпляра BaseBot:
from typing import Dict, Any, Optional, List, Type
from aiogram import Bot, Dispatcher
from aiogram.filters import CommandStart, Command
from aiogram.fsm.context import FSMContext
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.fsm.storage.redis import RedisStorage
from aiogram.types import Update, InlineKeyboardMarkup, InlineKeyboardButton, WebAppInfo
from loguru import logger
from sqlalchemy import select
from app.core.config import settings
from app.db.session import AsyncSessionLocal
from app.models import Bot as BotModel, BotPlugin, Plugin
from app.services.redis_metric_service import RedisMetricStorage
from app.telegram_bot.middlewares.auth_middleware import UserMiddleware
from app.telegram_bot.middlewares.metrics_middleware import TelegramEventMetricMiddleware
from app.telegram_bot.plugins.base import BasePlugin
class BaseBot:
def __init__(self, bot_model: BotModel):
self.bot_model = bot_model
self.bot = Bot(token=self.bot_model.token)
self.plugins: List[BasePlugin] = []
self.plugin_list = []
self.redis_metric_storage = RedisMetricStorage()
try:
self.storage = RedisStorage.from_url(settings.REDIS_URL)
except Exception as e:
logger.error(f"Failed to connect to Redis: {e}")
self.storage = MemoryStorage()
self.dp = Dispatcher(storage=self.storage)
self.dp.message.middleware(UserMiddleware(self.bot_model))
self.dp.message.middleware(TelegramEventMetricMiddleware(self.redis_metric_storage, self.bot_model))
self.dp.callback_query.middleware(TelegramEventMetricMiddleware(self.redis_metric_storage, self.bot_model))
self.dp.callback_query.middleware(UserMiddleware(self.bot_model))
self._setup_handlers()
def _setup_handlers(self):
"""Настройка базовых обработчиков."""
self.dp.message.register(self.start_handler, CommandStart())
self.dp.message.register(self.admin_handler, Command("admin"))
def register_plugin(self, plugin_class: Type[BasePlugin]):
"""Регистрация плагина."""
plugin = plugin_class()
plugin.register_handlers(self.dp)
self.plugins.append(plugin)
logger.info(f"Plugin {plugin.name} v{plugin.version} registered")
def disable_plugin(self, plugin_name: str):
"""Отключение плагина."""
plugin = next((p for p in self.plugins if p.name == plugin_name), None)
if plugin:
plugin.unregister_handlers(self.dp)
self.plugins.remove(plugin)
logger.info(f"Plugin {plugin.name} v{plugin.version} disabled")
else:
logger.warning(f"Plugin {plugin_name} not found")
def initialize_plugins(self):
"""Инициализация всех плагинов."""
for plugin in self.plugins:
plugin.register_handlers(self.dp)
logger.info(f"Plugin {plugin.name} v{plugin.version} initialized")
async def start_handler(self, message, state: FSMContext, **data):
"""Обработчик команды /start."""
user = data.get("user")
await message.answer(f"Привет, {user.tg_id}!")
async def admin_handler(self, message, state: FSMContext, **data):
"""Обработчик команды /admin."""
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="? Панель администратора", web_app=WebAppInfo(url=settings.ADMIN_PANEL_URL))]
])
await message.answer("Панель администратора", reply_markup=keyboard)
async def handle_update(self, update: Dict[str, Any]) -> Optional[bool]:
"""Обработка входящего обновления."""
try:
telegram_update = Update(**update)
await self.dp.feed_update(self.bot, telegram_update)
return True
except Exception as e:
logger.error(f"Error handling update for bot {self.bot_model.token[:8]}: {str(e)}")
return False
async def set_webhook(self) -> bool:
"""Устанавливает вебхук для бота."""
logger.info(f"Setting webhook for bot {self.bot_model.name}...")
webhook_path = f"{settings.WEBHOOK_BASE_URL}{settings.API_V1_STR}/webhook/{self.bot_model.id}"
try:
await self.bot.set_webhook(
url=webhook_path,
allowed_updates=["message", "callback_query", "inline_query"],
drop_pending_updates=True
)
logger.info(f"Webhook set successfully for bot {self.bot_model.token[:8]}... URL: {webhook_path}")
return True
except Exception as e:
logger.error(f"Error setting webhook for bot {self.bot_model.token[:8]}: {str(e)}")
return False
async def delete_webhook(self) -> bool:
"""Удаляет вебхук бота."""
try:
await self.bot.delete_webhook()
logger.info(f"Webhook deleted successfully for bot {self.bot_model.token[:8]}...")
return True
except Exception as e:
logger.error(f"Error deleting webhook for bot {self.bot_model.token[:8]}: {str(e)}")
return False
async def load_plugins(self) -> None:
try:
logger.info(f"Loading plugins for bot {self.bot_model.name}...")
async with AsyncSessionLocal() as session:
async with session.begin():
plugins_query = await session.execute(
select(BotPlugin).where(BotPlugin.bot_id == self.bot_model.id))
plugins = plugins_query.scalars().all()
plugins_list = []
for plugin_obj in plugins:
plugin_query = await session.execute(select(Plugin).where(Plugin.id == plugin_obj.plugin_id))
plugin_data = plugin_query.scalars().first()
plugins_list.append({
"bot_plugin_id": plugin_obj.id,
"plugin_id": plugin_obj.plugin_id,
"name": plugin_data.name,
"version": plugin_data.version,
"config": plugin_obj.config,
"is_active": plugin_obj.is_active,
})
self.plugin_list = plugins_list
logger.info(f"Plugins loaded for bot {self.bot_model.name} | {len(plugins_list)} plugins")
except Exception as e:
logger.error(f"Error getting plugins: {e}")
async def start(self) -> bool:
"""Запускает бота."""
try:
try:
await self.bot.send_message(chat_id=123, text="Bot started")
except Exception as e:
logger.error(f"Error sending message: {e}")
await self.load_plugins()
return await self.set_webhook()
except Exception as e:
logger.error(f"Error starting bot: {e}")
return False
async def stop(self) -> bool:
"""Останавливает бота."""
try:
return await self.delete_webhook()
except Exception as e:
logger.error(f"Error stopping bot: {e}")
return False
Ответы (1 шт):
Автор решения: Артём Мотовилов
→ Ссылка
Возможно это решение: https://github.com/NKTKLN/Universal-bot
Свежий проект от конца ноября 2024