Aiogram 2.24 – Дублируется сообщение о добавлении медиа-группы и публикуется лишний файл

Использую aiogram 2.24 для отправки медиа-группы в Telegram-канал. При добавлении группы из 3 файлов бот отвечает "Медиа-группа добавлена в очередь!" трижды, хотя должен один раз. Кроме того, в канал отправляется 4 файла вместо 3.

import logging
import json
import os
import uuid

from aiogram import Bot, Dispatcher, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
from aiogram.dispatcher.filters import Command, BoundFilter
from aiogram.types import Message, ContentType, ParseMode, InputMediaPhoto, InputMediaVideo
import config

# Настройка логирования
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

# Конфигурация бота
BOT_TOKEN = config.BOT_TOKEN
ADMIN_ID = config.ADMIN_ID
CHANNEL_ID = config.CHANNEL_ID

# Инициализация бота и диспетчера
bot = Bot(token=BOT_TOKEN)
storage = MemoryStorage()
dp = Dispatcher(bot, storage=storage)

# Файл для хранения очереди медиафайлов
MEDIA_QUEUE_FILE = 'media_queue.json'
# Файл для хранения media_group_ids
MEDIA_GROUP_IDS_FILE = 'media_group_ids.json'


# Функция для загрузки очереди из файла
def load_media_queue():
    """Загружает очередь медиафайлов из файла."""
    if os.path.exists(MEDIA_QUEUE_FILE):
        with open(MEDIA_QUEUE_FILE, 'r') as f:
            try:
                return json.load(f)
            except json.JSONDecodeError:
                logger.warning("Failed to decode media queue from JSON file. Starting with an empty queue.")
                return []
    else:
        return []

# Функция для сохранения очереди в файл
def save_media_queue(queue):
    """Сохраняет очередь медиафайлов в файл."""
    with open(MEDIA_QUEUE_FILE, 'w') as f:
        json.dump(queue, f)

# Функция для загрузки media_group_ids из файла
def load_media_group_ids():
    """Загружает media_group_ids из файла."""
    if os.path.exists(MEDIA_GROUP_IDS_FILE):
        with open(MEDIA_GROUP_IDS_FILE, 'r') as f:
            try:
                return set(json.load(f))  # Преобразуем список в set
            except json.JSONDecodeError:
                logger.warning("Failed to decode media_group_ids from JSON file. Starting with an empty set.")
                return set()
    else:
        return set()

# Функция для сохранения media_group_ids в файл
def save_media_group_ids(media_group_ids):
    """Сохраняет media_group_ids в файл."""
    with open(MEDIA_GROUP_IDS_FILE, 'w') as f:
        json.dump(list(media_group_ids), f)  # Преобразуем set в list для сохранения

# Очередь медиафайлов (загружается из файла)
media_queue = load_media_queue()
# media_group_ids (загружается из файла)
media_group_ids = load_media_group_ids()


# Фильтр администратора
class AdminFilter(BoundFilter):
    async def check(self, message: Message) -> bool:
        return message.from_user.id == ADMIN_ID

# Регистрация фильтра
dp.filters_factory.bind(AdminFilter)

# Функция для экранирования HTML
def escape_html(text: str) -> str:
    """Экранирует специальные символы в HTML."""
    if text is None:
        return ""
    escape_chars = r"<>\"&"
    return "".join([f"&{char};" if char in escape_chars else char for char in text])

# Обработчик команды /start
@dp.message_handler(Command(['start']))
async def command_start_handler(message: Message) -> None:
    """Обрабатывает команду /start."""
    user_name = message.from_user.full_name
    welcome_text = f"Привет, <b>{escape_html(user_name)}</b>! Я бот для пересылки контента в канал.\nИспользуй меня для отправки фото и видео!"
    try:
        await message.answer(welcome_text, parse_mode=ParseMode.HTML)
        logger.info(f"User {message.from_user.id} started the bot")
    except Exception as e:
        logger.exception(f"Error sending start message: {e}")

# Обработчик одиночного фото
@dp.message_handler(AdminFilter(), content_types=[ContentType.PHOTO], is_media_group=False)
async def handle_photo(message: types.Message):
    """Обрабатывает одиночное фото."""
    try:
        file_id = message.photo[-1].file_id
        caption = message.caption if message.caption else ""
        media_group_id = await add_media_to_queue(file_id=file_id, file_type='photo', caption=caption)
        await message.reply(escape_html("Фотография добавлена в очередь! media_group_id = "+ str(media_group_id)), parse_mode=ParseMode.HTML)
        logger.info(f"Photo added to queue with media_group_id: {media_group_id}")
    except Exception as e:
        logger.exception(f"Error handling photo: {e}")
        await message.reply(escape_html(f"Произошла ошибка при обработке фото: {e}"), parse_mode=ParseMode.HTML)

# Обработчик одиночного видео
@dp.message_handler(AdminFilter(), content_types=[ContentType.VIDEO], is_media_group=False)
async def handle_video(message: types.Message):
    """Обрабатывает одиночное видео."""
    try:
        file_id = message.video.file_id
        caption = message.caption if message.caption else ""
        media_group_id = await add_media_to_queue(file_id=file_id, file_type='video', caption=caption)
        await message.reply(escape_html("Видео добавлено в очередь! media_group_id = " + str(media_group_id)), parse_mode=ParseMode.HTML)
        logger.info(f"Video added to queue with media_group_id: {media_group_id}")
    except Exception as e:
        logger.exception(f"Error handling video: {e}")
        await message.reply(escape_html(f"Произошла ошибка при обработке видео: {e}"), parse_mode=ParseMode.HTML)

# Обработчик медиа-групп (альбомов)
media_group_ids = set()  # Множество для хранения ID медиагрупп

@dp.message_handler(AdminFilter(), content_types=[ContentType.PHOTO, ContentType.VIDEO], is_media_group=True)
async def handle_media_group(message: types.Message):
    """Обрабатывает медиа-группы (альбомы)."""
    try:
        media_group_id = message.media_group_id
        logger.info(f"Handling media group with id: {media_group_id}")  # Лог

        if not media_group_id:
            logger.warning("Media group ID is missing")
            return

        # Проверяем, была ли уже обработана эта медиагруппа
        if media_group_id in media_group_ids:
            logger.info(f"Media group {media_group_id} already processed, skipping")
            return

        content_type = 'photo' if message.photo else 'video'  # Определяем тип контента (photo или video)
        media = message.photo if content_type == 'photo' else message.video  # Получаем список фото или видео
        caption = message.caption if message.caption else ""

        if not media:
            logger.warning("Media is empty")
            return

        for i, item in enumerate(media):
            file_id = item.file_id  # Получаем file_id из элемента медиагруппы
            item_caption = caption if i == 0 else None  # Caption только для первого элемента

            # Проверяем, что такой file_id еще не добавлен в очередь
            if any(d['file_id'] == file_id for d in media_queue):
                logger.warning(f"File ID {file_id} already in queue, skipping")
                continue

            media_queue.append({'file_id': file_id, 'file_type': content_type, 'caption': item_caption, 'media_group_id': media_group_id})

        save_media_queue(media_queue)  # Сохраняем очередь в файл
        await message.reply(escape_html("Медиа-группа добавлена в очередь!"), parse_mode=ParseMode.HTML)
        logger.info(f"Media group added to queue with id: {media_group_id}")

        media_group_ids.add(media_group_id)  # Добавляем ID медиагруппы в множество
        logger.info(f"Media group ID {media_group_id} added to media_group_ids")  # Лог

    except Exception as e:
        logger.exception(f"Error handling media group: {e}")
        await message.reply(escape_html(f"Произошла ошибка при обработке медиа-группы: {e}"), parse_mode=ParseMode.HTML)


# Задержка между публикациями в секундах
PUBLICATION_DELAY = 10
# Задержка при пустой очереди в секундах
EMPTY_QUEUE_DELAY = 60


async def publish_from_queue():
    """Автоматически публикует медиафайлы из очереди с заданной задержкой."""
    global media_queue  # Указываем, что media_queue - глобальная переменная
    global media_group_ids # Добавляем, чтобы правильно очищать множество media_group_ids
    while True:
        try:
            if media_queue:
                # Получаем первый элемент из очереди
                item = media_queue[0]
                media_group_id = item.get('media_group_id')

                # Группируем элементы, если есть media_group_id (медиагруппа)
                if media_group_id:
                    logger.debug(f"Publishing media group with ID: {media_group_id}, queue size before processing: {len(media_queue)}")  # Добавлено
                    # Находим все элементы этой медиагруппы
                    media_group_items = [item for item in media_queue if str(item.get('media_group_id')) == str(media_group_id)]
                    logger.debug(f"Media group items found: {len(media_group_items)}")  # Добавлено

                    # Если медиагруппа пуста, пропускаем
                    if not media_group_items:
                        logger.warning(f"Media group {media_group_id} is empty, skipping")
                        media_queue.pop(0)
                        save_media_queue(media_queue)
                        continue

                    # Ограничиваем количество элементов в медиагруппе до 10 (лимит Telegram)
                    media_group_items = media_group_items[:10]

                    # Формируем список InputMedia для отправки
                    media_group = []
                    for i, item in enumerate(media_group_items):
                        file_id = item['file_id']
                        file_type = item['file_type']
                        caption = item['caption']

                        if file_type == 'photo':
                            media_group.append(InputMediaPhoto(media=file_id, caption=escape_html(caption) if i == 0 else None, parse_mode=ParseMode.HTML))
                        elif file_type == 'video':
                            media_group.append(InputMediaVideo(media=file_id, caption=escape_html(caption) if i == 0 else None, parse_mode=ParseMode.HTML))

                    try:
                        # Отправляем медиагруппу
                        await bot.send_media_group(chat_id=CHANNEL_ID, media=media_group)
                        logger.info(f"Media group {media_group_id} sent from queue")

                        # Удаляем отправленные элементы из очереди  - **ИЗМЕНЕНИЕ ЗДЕСЬ**
                        for item_to_remove in media_group_items: # Итерируемся по media_group_items, чтобы удалять правильно
                            if item_to_remove in media_queue:
                                media_queue.remove(item_to_remove)

                        save_media_queue(media_queue)  # Сохраняем очередь после удаления элементов
                        logger.debug(f"Queue size after processing media group: {len(media_queue)}")  # Добавлено

                        # Удаляем media_group_id из media_group_ids после успешной отправки
                        if media_group_id in media_group_ids:
                            media_group_ids.remove(media_group_id)
                            save_media_group_ids(media_group_ids) # Сохраняем media_group_ids
                            logger.info(f"Media group ID {media_group_id} removed from media_group_ids")

                    except Exception as e:
                        logger.error(f"Error sending media group: {e}")
                        # Если произошла ошибка, не удаляем элементы из очереди, чтобы повторить попытку позже
                        await asyncio.sleep(PUBLICATION_DELAY)  # Ждем перед следующей попыткой

                else:  # Отправляем одиночный файл
                    logger.debug(f"Publishing single media file, queue size before processing: {len(media_queue)}") # Добавлено
                    file_id = item['file_id']
                    file_type = item['file_type']
                    caption = item['caption']
                    try:
                        if file_type == 'photo':
                            await bot.send_photo(chat_id=CHANNEL_ID, photo=file_id, caption=escape_html(caption), parse_mode=ParseMode.HTML)
                        elif file_type == 'video':
                            await bot.send_video(chat_id=CHANNEL_ID, video=file_id, caption=escape_html(caption), parse_mode=ParseMode.HTML)
                        logger.info("Media file sent from queue")

                        # Удаляем отправленный элемент из очереди
                        media_queue.pop(0) # Удаляем первый элемент, так как он является одиночным файлом
                        save_media_queue(media_queue) # Сохраняем очередь
                        logger.debug(f"Queue size after processing single media: {len(media_queue)}") # Добавлено
                    except Exception as e:
                        logger.error(f"Error sending single media file: {e}")
                        # Если произошла ошибка, не удаляем элемент из очереди, чтобы повторить попытку позже
                        await asyncio.sleep(PUBLICATION_DELAY)  # Ждем перед следующей попыткой

            else:
                logger.info("Media queue is empty")
                await asyncio.sleep(EMPTY_QUEUE_DELAY)
                continue

            # Пауза перед следующей публикацией (если очередь не пуста)
            await asyncio.sleep(PUBLICATION_DELAY)

        except Exception as e:
            logger.exception(f"Error publishing from queue: {e}")
            await asyncio.sleep(EMPTY_QUEUE_DELAY)

async def add_media_to_queue(file_id: str, file_type: str, caption: str = None, media_group_id: str = None):
    """Добавляет медиафайл в очередь на публикацию."""
    global media_queue

    # Для медиагрупп media_group_id уже установлен в обработчике, а для одиночных генерируем уникальный ID.  Это важно!
    if media_group_id is None:
        media_group_id = str(uuid.uuid4()) # Генерируем новый ID для одиночного файла
        logger.debug(f"Generated media_group_id {media_group_id} for single media")

    item = {
        'file_id': file_id,
        'file_type': file_type,
        'caption': caption,
        'media_group_id': media_group_id  # Сохраняем media_group_id
    }

    media_queue.append(item)
    save_media_queue(media_queue)
    logger.info(f"Media file added to queue with media_group_id: {media_group_id}")

    return media_group_id

# Обработчик для других типов сообщений
@dp.message_handler()
async def echo_message(message: types.Message):
    """Обрабатывает все остальные сообщения."""
    try:
        await message.answer(escape_html("Я принимаю только фото и видео от администратора."), parse_mode=ParseMode.HTML)
    except Exception as e:
        logger.exception(f"Error sending echo message: {e}")

# Функция для запуска бота
async def main():
    """Запускает бота."""
    try:
        # Загружаем очередь из файла
        global media_queue
        media_queue = load_media_queue()
        global media_group_ids # Загружаем media_group_ids из файла.
        media_group_ids = load_media_group_ids()

        # Запускаем функцию автоматической публикации в асинхронном режиме
        asyncio.create_task(publish_from_queue())

        await dp.start_polling()
    finally:
        # Сохраняем очередь и media_group_ids перед завершением работы
        save_media_queue(media_queue)
        save_media_group_ids(media_group_ids)
        await dp.storage.close()
        await dp.storage.wait_closed()
        await bot.session.close()

if __name__ == "__main__":
    asyncio.run(main())```

Ответы (0 шт):