Как реализовать функцию антифлуда в Aiogram
Имеется бот в телеграмме, бот отсылает запросы на сервер. Нужно как-то реализовать систему игнорирования пользователя в течении какого-то времени. Бот на аиограмме, слипы не подойдут.
@dp.message_handler(commands="parse")
async def parse(message: types.Message):
data = message.text[len('/parse'):]
count = data .split('\n')
count = len(count)
antik = count * 15 //тут я считаю сколько бот должен игнорировать юзера в секундах (в среднем на 1 запрос уходит порядка 15 секунд), например, если юзер отправит 3 заголовка для парсинга, то бот будет парсить эти заголовки где-то 45с, следовательно бот должен игнорировать его повторные попытки парсинга в течение 45ти секунд.
Как реализовать эту самую проверку?
Ответы (2 шт):
Автор решения: GTapok
→ Ссылка
Нашёл один вариант, код не мой, но он достаточно простой.
from aiogram import Bot, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.dispatcher import Dispatcher
from aiogram.utils import executor
bot = Bot(token='123:AbC')
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)
async def anti_flood(*args, **kwargs):
m = args[0]
#тут будет то, что нужно при флуде.
await m.answer("Не флуди :)")
@dp.message_handler(content_types=['text'])
@dp.throttled(anti_flood, rate=1) #rate это количество секунд, при котором входящие сообщения считаются флудом.
async def main(m: types.Message):
await m.answer(f"Вы отправили мне: {m.text} , я согласен с этим !")
if __name__ == '__main__':
executor.start_polling(dp, fast=True, skip_updates=True)
Помог чем смог, приятель.
Автор решения: pshpth_ sht
→ Ссылка
from aiogram import types, Dispatcher
from aiogram.dispatcher.handler import CancelHandler, current_handler
from aiogram.dispatcher.middlewares import BaseMiddleware
from aiogram.types import Message
from aiogram.utils.exceptions import Throttled
from tgbot.data.config import get_admins
class ThrottlingMiddleware(BaseMiddleware):
def __init__(self, limit=0.5, key_prefix='antiflood_'):
self.rate_limit = limit
self.prefix = key_prefix
super(ThrottlingMiddleware, self).__init__()
async def on_process_message(self, message: Message, data: dict):
handler = current_handler.get()
dispatcher = Dispatcher.get_current()
if handler:
limit = getattr(handler, "throttling_rate_limit", self.rate_limit)
key = getattr(handler, "throttling_key", f"{self.prefix}_{handler.__name__}")
else:
limit = self.rate_limit
key = f"{self.prefix}_message"
if message.from_user.id not in get_admins():
try:
await dispatcher.throttle(key, rate=limit)
except Throttled as t:
await self.message_throttled(message, t)
raise CancelHandler()
@staticmethod
async def message_throttled(message: types.Message, throttled: Throttled):
if throttled.exceeded_count <= 2:
await message.reply("спамить плохо")
def rate_limit(limit: int, key=None):
def decorator(func):
setattr(func, "throttling_rate_limit", limit)
if key:
setattr(func, "throttling_key", key)
return func
return decorator
Лучшее решение - мидлварь, он будет отлавливать любые сообщения пользователей и в случае спама отменять хендлер.
# подключаем их
def setup_middlewares(dp: Dispatcher):
dp.middleware.setup(ThrottlingMiddleware())
....
Ну и теперь вызываем наш setup_middlewares при запуске.