- ВКонтакте
- РћРТвЂВВВВВВВВнокласснРСвЂВВВВВВВВРєРСвЂВВВВВВВВ
- РњРѕР№ Р В Р’В Р РЋРЎв„ўР В Р’В Р РЋРІР‚ВВВВВВВВРЎР‚
- Viber
- Skype
- Telegram
Aiogram 2.24 – Дублируется сообщение о добавлении медиа-группы и публикуется лишний файл
Использую aiogram 2.24 для отправки медиа-группы в Telegram-канал. При добавлении группы из 3 файлов бот отвечает "Медиа-группа добавлена в очередь!" трижды, хотя должен один раз. Кроме того, в канал отправляется 4 файла вместо 3.
import logging
import json
import os
import uuid
from aiogram import Bot, Dispatcher, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.dispatcher.filters import Command, BoundFilter
from aiogram.types import Message, ContentType, ParseMode, InputMediaPhoto, InputMediaVideo
import config
# Настройка логирования
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Конфигурация бота
BOT_TOKEN = config.BOT_TOKEN
ADMIN_ID = config.ADMIN_ID
CHANNEL_ID = config.CHANNEL_ID
# Инициализация бота и диспетчера
bot = Bot(token=BOT_TOKEN)
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)
# Файл для хранения очереди медиафайлов
MEDIA_QUEUE_FILE = 'media_queue.json'
# Файл для хранения media_group_ids
MEDIA_GROUP_IDS_FILE = 'media_group_ids.json'
# Функция для загрузки очереди из файла
def load_media_queue():
"""Загружает очередь медиафайлов из файла."""
if os.path.exists(MEDIA_QUEUE_FILE):
with open(MEDIA_QUEUE_FILE, 'r') as f:
try:
return json.load(f)
except json.JSONDecodeError:
logger.warning("Failed to decode media queue from JSON file. Starting with an empty queue.")
return []
else:
return []
# Функция для сохранения очереди в файл
def save_media_queue(queue):
"""Сохраняет очередь медиафайлов в файл."""
with open(MEDIA_QUEUE_FILE, 'w') as f:
json.dump(queue, f)
# Функция для загрузки media_group_ids из файла
def load_media_group_ids():
"""Загружает media_group_ids из файла."""
if os.path.exists(MEDIA_GROUP_IDS_FILE):
with open(MEDIA_GROUP_IDS_FILE, 'r') as f:
try:
return set(json.load(f)) # Преобразуем список в set
except json.JSONDecodeError:
logger.warning("Failed to decode media_group_ids from JSON file. Starting with an empty set.")
return set()
else:
return set()
# Функция для сохранения media_group_ids в файл
def save_media_group_ids(media_group_ids):
"""Сохраняет media_group_ids в файл."""
with open(MEDIA_GROUP_IDS_FILE, 'w') as f:
json.dump(list(media_group_ids), f) # Преобразуем set в list для сохранения
# Очередь медиафайлов (загружается из файла)
media_queue = load_media_queue()
# media_group_ids (загружается из файла)
media_group_ids = load_media_group_ids()
# Фильтр администратора
class AdminFilter(BoundFilter):
async def check(self, message: Message) -> bool:
return message.from_user.id == ADMIN_ID
# Регистрация фильтра
dp.filters_factory.bind(AdminFilter)
# Функция для экранирования HTML
def escape_html(text: str) -> str:
"""Экранирует специальные символы в HTML."""
if text is None:
return ""
escape_chars = r"<>\"&"
return "".join([f"&{char};" if char in escape_chars else char for char in text])
# Обработчик команды /start
@dp.message_handler(Command(['start']))
async def command_start_handler(message: Message) -> None:
"""Обрабатывает команду /start."""
user_name = message.from_user.full_name
welcome_text = f"Привет, <b>{escape_html(user_name)}</b>! Я бот для пересылки контента в канал.\nИспользуй меня для отправки фото и видео!"
try:
await message.answer(welcome_text, parse_mode=ParseMode.HTML)
logger.info(f"User {message.from_user.id} started the bot")
except Exception as e:
logger.exception(f"Error sending start message: {e}")
# Обработчик одиночного фото
@dp.message_handler(AdminFilter(), content_types=[ContentType.PHOTO], is_media_group=False)
async def handle_photo(message: types.Message):
"""Обрабатывает одиночное фото."""
try:
file_id = message.photo[-1].file_id
caption = message.caption if message.caption else ""
media_group_id = await add_media_to_queue(file_id=file_id, file_type='photo', caption=caption)
await message.reply(escape_html("Фотография добавлена в очередь! media_group_id = "+ str(media_group_id)), parse_mode=ParseMode.HTML)
logger.info(f"Photo added to queue with media_group_id: {media_group_id}")
except Exception as e:
logger.exception(f"Error handling photo: {e}")
await message.reply(escape_html(f"Произошла ошибка при обработке фото: {e}"), parse_mode=ParseMode.HTML)
# Обработчик одиночного видео
@dp.message_handler(AdminFilter(), content_types=[ContentType.VIDEO], is_media_group=False)
async def handle_video(message: types.Message):
"""Обрабатывает одиночное видео."""
try:
file_id = message.video.file_id
caption = message.caption if message.caption else ""
media_group_id = await add_media_to_queue(file_id=file_id, file_type='video', caption=caption)
await message.reply(escape_html("Видео добавлено в очередь! media_group_id = " + str(media_group_id)), parse_mode=ParseMode.HTML)
logger.info(f"Video added to queue with media_group_id: {media_group_id}")
except Exception as e:
logger.exception(f"Error handling video: {e}")
await message.reply(escape_html(f"Произошла ошибка при обработке видео: {e}"), parse_mode=ParseMode.HTML)
# Обработчик медиа-групп (альбомов)
media_group_ids = set() # Множество для хранения ID медиагрупп
@dp.message_handler(AdminFilter(), content_types=[ContentType.PHOTO, ContentType.VIDEO], is_media_group=True)
async def handle_media_group(message: types.Message):
"""Обрабатывает медиа-группы (альбомы)."""
try:
media_group_id = message.media_group_id
logger.info(f"Handling media group with id: {media_group_id}") # Лог
if not media_group_id:
logger.warning("Media group ID is missing")
return
# Проверяем, была ли уже обработана эта медиагруппа
if media_group_id in media_group_ids:
logger.info(f"Media group {media_group_id} already processed, skipping")
return
content_type = 'photo' if message.photo else 'video' # Определяем тип контента (photo или video)
media = message.photo if content_type == 'photo' else message.video # Получаем список фото или видео
caption = message.caption if message.caption else ""
if not media:
logger.warning("Media is empty")
return
for i, item in enumerate(media):
file_id = item.file_id # Получаем file_id из элемента медиагруппы
item_caption = caption if i == 0 else None # Caption только для первого элемента
# Проверяем, что такой file_id еще не добавлен в очередь
if any(d['file_id'] == file_id for d in media_queue):
logger.warning(f"File ID {file_id} already in queue, skipping")
continue
media_queue.append({'file_id': file_id, 'file_type': content_type, 'caption': item_caption, 'media_group_id': media_group_id})
save_media_queue(media_queue) # Сохраняем очередь в файл
await message.reply(escape_html("Медиа-группа добавлена в очередь!"), parse_mode=ParseMode.HTML)
logger.info(f"Media group added to queue with id: {media_group_id}")
media_group_ids.add(media_group_id) # Добавляем ID медиагруппы в множество
logger.info(f"Media group ID {media_group_id} added to media_group_ids") # Лог
except Exception as e:
logger.exception(f"Error handling media group: {e}")
await message.reply(escape_html(f"Произошла ошибка при обработке медиа-группы: {e}"), parse_mode=ParseMode.HTML)
# Задержка между публикациями в секундах
PUBLICATION_DELAY = 10
# Задержка при пустой очереди в секундах
EMPTY_QUEUE_DELAY = 60
async def publish_from_queue():
"""Автоматически публикует медиафайлы из очереди с заданной задержкой."""
global media_queue # Указываем, что media_queue - глобальная переменная
global media_group_ids # Добавляем, чтобы правильно очищать множество media_group_ids
while True:
try:
if media_queue:
# Получаем первый элемент из очереди
item = media_queue[0]
media_group_id = item.get('media_group_id')
# Группируем элементы, если есть media_group_id (медиагруппа)
if media_group_id:
logger.debug(f"Publishing media group with ID: {media_group_id}, queue size before processing: {len(media_queue)}") # Добавлено
# Находим все элементы этой медиагруппы
media_group_items = [item for item in media_queue if str(item.get('media_group_id')) == str(media_group_id)]
logger.debug(f"Media group items found: {len(media_group_items)}") # Добавлено
# Если медиагруппа пуста, пропускаем
if not media_group_items:
logger.warning(f"Media group {media_group_id} is empty, skipping")
media_queue.pop(0)
save_media_queue(media_queue)
continue
# Ограничиваем количество элементов в медиагруппе до 10 (лимит Telegram)
media_group_items = media_group_items[:10]
# Формируем список InputMedia для отправки
media_group = []
for i, item in enumerate(media_group_items):
file_id = item['file_id']
file_type = item['file_type']
caption = item['caption']
if file_type == 'photo':
media_group.append(InputMediaPhoto(media=file_id, caption=escape_html(caption) if i == 0 else None, parse_mode=ParseMode.HTML))
elif file_type == 'video':
media_group.append(InputMediaVideo(media=file_id, caption=escape_html(caption) if i == 0 else None, parse_mode=ParseMode.HTML))
try:
# Отправляем медиагруппу
await bot.send_media_group(chat_id=CHANNEL_ID, media=media_group)
logger.info(f"Media group {media_group_id} sent from queue")
# Удаляем отправленные элементы из очереди - **ИЗМЕНЕНИЕ ЗДЕСЬ**
for item_to_remove in media_group_items: # Итерируемся по media_group_items, чтобы удалять правильно
if item_to_remove in media_queue:
media_queue.remove(item_to_remove)
save_media_queue(media_queue) # Сохраняем очередь после удаления элементов
logger.debug(f"Queue size after processing media group: {len(media_queue)}") # Добавлено
# Удаляем media_group_id из media_group_ids после успешной отправки
if media_group_id in media_group_ids:
media_group_ids.remove(media_group_id)
save_media_group_ids(media_group_ids) # Сохраняем media_group_ids
logger.info(f"Media group ID {media_group_id} removed from media_group_ids")
except Exception as e:
logger.error(f"Error sending media group: {e}")
# Если произошла ошибка, не удаляем элементы из очереди, чтобы повторить попытку позже
await asyncio.sleep(PUBLICATION_DELAY) # Ждем перед следующей попыткой
else: # Отправляем одиночный файл
logger.debug(f"Publishing single media file, queue size before processing: {len(media_queue)}") # Добавлено
file_id = item['file_id']
file_type = item['file_type']
caption = item['caption']
try:
if file_type == 'photo':
await bot.send_photo(chat_id=CHANNEL_ID, photo=file_id, caption=escape_html(caption), parse_mode=ParseMode.HTML)
elif file_type == 'video':
await bot.send_video(chat_id=CHANNEL_ID, video=file_id, caption=escape_html(caption), parse_mode=ParseMode.HTML)
logger.info("Media file sent from queue")
# Удаляем отправленный элемент из очереди
media_queue.pop(0) # Удаляем первый элемент, так как он является одиночным файлом
save_media_queue(media_queue) # Сохраняем очередь
logger.debug(f"Queue size after processing single media: {len(media_queue)}") # Добавлено
except Exception as e:
logger.error(f"Error sending single media file: {e}")
# Если произошла ошибка, не удаляем элемент из очереди, чтобы повторить попытку позже
await asyncio.sleep(PUBLICATION_DELAY) # Ждем перед следующей попыткой
else:
logger.info("Media queue is empty")
await asyncio.sleep(EMPTY_QUEUE_DELAY)
continue
# Пауза перед следующей публикацией (если очередь не пуста)
await asyncio.sleep(PUBLICATION_DELAY)
except Exception as e:
logger.exception(f"Error publishing from queue: {e}")
await asyncio.sleep(EMPTY_QUEUE_DELAY)
async def add_media_to_queue(file_id: str, file_type: str, caption: str = None, media_group_id: str = None):
"""Добавляет медиафайл в очередь на публикацию."""
global media_queue
# Для медиагрупп media_group_id уже установлен в обработчике, а для одиночных генерируем уникальный ID. Это важно!
if media_group_id is None:
media_group_id = str(uuid.uuid4()) # Генерируем новый ID для одиночного файла
logger.debug(f"Generated media_group_id {media_group_id} for single media")
item = {
'file_id': file_id,
'file_type': file_type,
'caption': caption,
'media_group_id': media_group_id # Сохраняем media_group_id
}
media_queue.append(item)
save_media_queue(media_queue)
logger.info(f"Media file added to queue with media_group_id: {media_group_id}")
return media_group_id
# Обработчик для других типов сообщений
@dp.message_handler()
async def echo_message(message: types.Message):
"""Обрабатывает все остальные сообщения."""
try:
await message.answer(escape_html("Я принимаю только фото и видео от администратора."), parse_mode=ParseMode.HTML)
except Exception as e:
logger.exception(f"Error sending echo message: {e}")
# Функция для запуска бота
async def main():
"""Запускает бота."""
try:
# Загружаем очередь из файла
global media_queue
media_queue = load_media_queue()
global media_group_ids # Загружаем media_group_ids из файла.
media_group_ids = load_media_group_ids()
# Запускаем функцию автоматической публикации в асинхронном режиме
asyncio.create_task(publish_from_queue())
await dp.start_polling()
finally:
# Сохраняем очередь и media_group_ids перед завершением работы
save_media_queue(media_queue)
save_media_group_ids(media_group_ids)
await dp.storage.close()
await dp.storage.wait_closed()
await bot.session.close()
if __name__ == "__main__":
asyncio.run(main())```