Как получать несколько фото одним хендлером aiogram
Собственно вопрос, есть некий хендлер который ловит в себя фото(сжатые) и к примеру пересылает куда-то там.
@dp.message_handler(content_types=types.ContentTypes.PHOTO)
async def send_to_admin(message: types.Message):
await bot.send_photo(chat_id=ADMIN_ID, photo=message.photo[-1].file_id)
Так вот если этот юзер(который присылает боту фото) пришлет не одно а два и более, и они буду группированы вот так.
То возникает странная ситуация, хендлер реагирует на каждое отдельное фото, собственно отдельно. По факту он срабатывает дважды (по 1-му на фото), то есть наша message.photo имеет только вариации одной из фоток (размеры) но не имеет другого фото в себе.
Потому вернусь к вопросу, есть ли способ ловить эту "группу" одним хендлером? Ведь есть же способ отправлять группу фото, должен же быть способ и получать их "группой"?
Ответы (5 шт):
Вроде получилось сделать что-то что работает. Правда он какой-то костыльный.
from aiogram import Bot, Dispatcher, executor, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.dispatcher import FSMContext
from asyncio import sleep
from time import time
from data import config
bot = Bot(token=config.BOT_TOKEN)
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)
@dp.message_handler(content_types=["photo"])
async def get_foto(message: types.Message, state: FSMContext):
async with state.proxy() as data:
if type(data.get('photo')) is list:
data['photo'].append(message.photo[-1].file_id)
data["time"] = time()
return
else:
data['photo'] = [message.photo[-1].file_id]
data["time"] = time()
while time() - data["time"] < 3:
await sleep(3)
# в этот момент в data["photo"] лежат все фото которые мы получили,
# если что их можно куда-то переотправить
await bot.send_message(message.chat.id, "Text")
async with state.proxy() as data:
del data["time"]
del data["photo"]
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)
Вот ещё вариант.
Так мы сможем сохранять полученные фото в БД. Способ загрузки не будет иметь значение, хоть альбомом, хоть по одному. Ну и есть возможность работать с несколькими пользователями одновременно, грех не использовать асинхронность.
from aiogram import types, Dispatcher
from create_bot import dp,bot
from aiogram.dispatcher import FSMContext
from aiogram.dispatcher.filters.state import State, StatesGroup
from Keyboards import client_kb
from Data_base import sqlite_db
from aiogram.types import ReplyKeyboardMarkup, KeyboardButton
List_photo = {}
class FSMReport(StatesGroup):
city = State()
address = State()
photo = State()
async def report_photo(message: types.Message, state=FSMContext):
global List_photo
key = str(message.from_user.id)
List_photo.setdefault(key, [])
if message.content_type == 'photo':
List_photo[key].append(message.photo[0].file_id)
elif message.content_type == 'text':
async with state.proxy() as data:
data['photo'] = ','.join(List_photo[key])
List_photo.pop(key)
await sqlite_db.sql_add_photo_report(state)
await bot.send_message(message.from_user.id, 'Готово', reply_markup=client_kb.kb_client)
await state.finish()
def register_handlers_client(dp: Dispatcher):
dp.register_message_handler(report_photo, content_types=['photo', 'text'], state=FSMReport.photo)
Видим в списке по ключу data['photo'], количество id, равных количеству фото. И загружались они не по одной.
TOKEN = "1234646854fdghdfhfghfghssfhhgh"
def send_group_img(chat_id, text):
temp_files_list = list()
media = list()
files = dict()
for filename in os.listdir("temp"):
temp_files_list.append(f'{os.getcwd()}\\temp\\{filename}')
for f in enumerate(temp_files_list):
files[f"random-name-{f[0]}"] = open(f[1], "rb")
if f[0] == 0:
media.append({"type": "photo",
"media": f"attach://random-name-{f[0]}",
"caption": text}
)
else:
media.append({"type": "photo",
"media": f"attach://random-name-{f[0]}"})
params = {
"chat_id": chat_id, "media": str(media).replace("'", '"')}
request_url = "https://api.telegram.org/bot" + TOKEN + "/sendMediaGroup"
result = requests.post(request_url, params=params, files=files)
if result.status_code == 200:
return True
else:
return False
Посмотрите это, должно помочь
import asyncio
from typing import List, Union
from aiogram import Bot, Dispatcher, executor, types
from aiogram.dispatcher.handler import CancelHandler
from aiogram.dispatcher.middlewares import BaseMiddleware
bot = Bot(token="TOKEN_HERE") # Place your token here
dp = Dispatcher(bot)
class AlbumMiddleware(BaseMiddleware):
"""This middleware is for capturing media groups."""
album_data: dict = {}
def __init__(self, latency: Union[int, float] = 0.01):
"""
You can provide custom latency to make sure
albums are handled properly in highload.
"""
self.latency = latency
super().__init__()
async def on_process_message(self, message: types.Message, data: dict):
if not message.media_group_id:
return
try:
self.album_data[message.media_group_id].append(message)
raise CancelHandler() # Tell aiogram to cancel handler for this group element
except KeyError:
self.album_data[message.media_group_id] = [message]
await asyncio.sleep(self.latency)
message.conf["is_last"] = True
data["album"] = self.album_data[message.media_group_id]
async def on_post_process_message(self, message: types.Message, result: dict, data: dict):
"""Clean up after handling our album."""
if message.media_group_id and message.conf.get("is_last"):
del self.album_data[message.media_group_id]
@dp.message_handler(content_types=types.ContentType.ANY)
async def handle_albums(message: types.Message, album: List[types.Message]):
"""This handler will receive a complete album of any type."""
media_group = types.MediaGroup()
for obj in album:
if obj.photo:
file_id = obj.photo[-1].file_id
else:
file_id = obj[obj.content_type].file_id
try:
# We can also add a caption to each file by specifying `"caption": "text"`
media_group.attach({"media": file_id, "type": obj.content_type})
except ValueError:
return await message.answer("This type of album is not supported by aiogram.")
await message.answer_media_group(media_group)
if __name__ == "__main__":
dp.middleware.setup(AlbumMiddleware())
executor.start_polling(dp, skip_updates=True)
Aiogram 3 https://docs.aiogram.dev/en/latest/dispatcher/filters/magic_filters.html
@router.message(F.text.in_({'text1', 'text2', 'text3'}))
async def post_message_handler(msg: Message):
print('Yes!')


