Telegram бот на aiogram (python) ошибка TelegramNetworkError: HTTP Client says - ClientOSError: [Errno 104] Connection reset by peer

Кратко суть проблемы:

Написал telegram бота (пишу далеко не первый раз) и появилась очень странная проблема: через некоторое продолжительное время (2-5 часов) (я так понимаю при нахождении сообщения) бот зависает и появляется ошибка:

Ошибка:

ERROR:aiogram.dispatcher:Failed to fetch updates - TelegramNetworkError: HTTP Client says - ClientOSError: [Errno 104] Connection reset by peer WARNING:aiogram.dispatcher:Sleep for 1.000000 seconds and try again... (tryings = 0, bot id = 7847598237) INFO:aiogram.dispatcher:Connection established (tryings = 1, bot id = 7847598237)INFO:aiogram.event:Update id=391149334 is handled. Duration 82 ms by bot id=7847598237INFO:aiogram.event:Update id=391149335 is handled. Duration 88 ms by bot id=7847598237INFO:aiogram.event:Update id=391149337 is handled. Duration 66 ms by bot id=7847598237

Коротко о функционале бота:

Есть 2 группы в которых состоит бот: группа, где он читает сообщения и группа куда он отправляет сообщения. Пользователь может задать разные параметры (время задержки, время работы и тд). После нажатия на кнопку "включить" бот начинает чтение сообщений из одной группы и если время работы и другие параметры удовлетворяют условию, то он отправляет несколько сообщений с заданной задержкой в другую группу.

P.S.

В группе из которой читают сообщения у бота права администратора, 5 пользователей из которых только 1 отправляет сообщения только пару раз в день (спама нет).

Код бота:

import asyncio, logging, json, os
from aiogram import Bot, Dispatcher, F
from aiogram.filters import CommandStart
from aiogram.types import CallbackQuery, Message
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from datetime import datetime
from typing import cast


from config import API_TOKEN, GROUP_GET, GROUP_SEND # Импорт токена, группы получения и отправки сообщений
import modules.keyboard as kb # Импорт клавиатуры

JSON_FILENAME="data.json"

can_send_message=False
disable_for_next_message=False
is_message_handled=False
count=0

# Настройка логирования 
logging.basicConfig(level=logging.INFO)

# Создание объекта блокировки задач
can_send_message_lock = asyncio.Lock()
json_lock = asyncio.Lock()
disable_for_next_message_lock = asyncio.Lock()
is_message_handled_lock = asyncio.Lock()

bot = Bot(token=API_TOKEN) # Присвоение токена
dp = Dispatcher(bot=bot)

# Создание класса состояний
class Setup(StatesGroup):
    work_time_setup = State()
    first_message_delay_setup = State()
    remove_message_delay_setup = State()
    send_message_delay_setup = State()
    max_messages_count_setup = State()

# Комманда /start
@dp.message(CommandStart())
async def start_command(message: Message):
    await message.answer("Добро пожаловать в бота для пробуждения ночью, в случае прихода важных сообщений.",
                         reply_markup=kb.start_reply_keyboard)

# Меню настроек
@dp.message(F.text == kb.BUTTON_SETTINGS)
async def settings_menu(message: Message):
    await message.answer("Меню настроек", 
                         reply_markup=kb.settings_inline_keyboard)


# Кнопка включения отправки сообщений
@dp.message(F.text == kb.BUTTON_ON)
async def enable_sending(message: Message):

    global can_send_message

    # Блокировка глобальной переменной на время работы
    async with can_send_message_lock:
        can_send_message = True

    await message.answer("✅Отправка уведомлений включена.")


# Кнопка выключения отправки сообщений
@dp.message(F.text == kb.BUTTON_OFF)
async def disable_sending(message: Message):

    global can_send_message

    # Блокировка глобальной переменной на время работы
    async with can_send_message_lock:
        can_send_message = False

    await message.answer("❌Отправка уведомлений выкючена.")

# Кнопка выключения уведомлений до следующего сообщения
@dp.message(F.text == kb.BUTTON_OFF_FOR_NEXT_MESSAGE)
async def disable_sending_for_next_message(message: Message):

    global disable_for_next_message

    # Блокировка глобальной переменной на время работы
    async with disable_for_next_message_lock:
        disable_for_next_message = True

    await message.answer("⏸️Отправка уведомлений отключена до следующего сообщения")


# Закрытие меню настроек
@dp.callback_query(F.data == "settings_close")
async def settings_close(callback: CallbackQuery):
    await callback.answer("Вы закрыли меню настроек")

    if hasattr(callback, 'message') and callback.message:
        message = cast(Message, callback.message)  # Явно указываем тип
        await message.delete()


# Настройка времени работы
@dp.callback_query(F.data == "work_time")
async def setup_work_time(callback: CallbackQuery, state: FSMContext):
    await state.set_state(Setup.work_time_setup)

    # Открытие файла для чтения
    with open(JSON_FILENAME, "r") as file:
        data = json.load(file)

    # Получение данных для чтения в переменную
    work_time = data.get("work_time", ["Не задано", "Не задано"])

    await callback.answer("")
    if hasattr(callback, 'message') and callback.message:
        message = cast(Message, callback.message)  # Явно указываем тип
        await message.edit_text(f"Текущее время установлено с {work_time[0]} по {work_time[1]}.\nУстановите время работы в формате:\n18:00 - 9:00 (начало - конец)\nЧтобы установить круглосуточную работу, введите '00:00 - 00:00'",
                                reply_markup=kb.back_inline_keyboard)


# Обработчик настройки времени работы
@dp.message(Setup.work_time_setup)
async def setup_work_time_state(message: Message, state: FSMContext):

    try:
        if message.text:
            start_time_str, end_time_str = message.text.split(" - ") # Парсим сообщение в переменные, удаляя разделитель
            start_work_time = datetime.strptime(start_time_str.strip(), "%H:%M").time() # Записываем данные в переменные с типом данных время
            end_work_time = datetime.strptime(end_time_str.strip(), "%H:%M").time()

            await message.answer(f"Время работы установлено с {start_work_time.strftime('%H:%M')} по {end_work_time.strftime('%H:%M')}",
                                reply_markup=kb.start_reply_keyboard)

            # Работа с json
            # Создание нового списка данных, полученных из сообщения
            new_data = {
                "work_time": [start_time_str, end_time_str],
            }

            # Проверка на существование файла
            if os.path.exists(JSON_FILENAME): 
                with open(JSON_FILENAME) as file:
                    try:
                        data = json.load(file)  # Загружаем существующие данные        

                    except json.JSONDecodeError:
                        data = {}  # Если файл пуст или повреждён, создаём новый словарь

            else:
                data = {}  # Если файла нет, создаём новый словарь

            # Обновляем данные
            data.update(new_data)

            async with json_lock:
                # Обноваляем данные в файле
                with open(JSON_FILENAME, "w") as file:
                    json.dump(data, file, indent=4)

            await message.answer(f"Файл {JSON_FILENAME} обновлён.")

            await state.update_data(start_work_time=start_work_time, end_work_time=end_work_time)

    except ValueError:
        await message.answer("⚠ Ошибка! Введите время в формате HH:MM - HH:MM")

    await state.clear()

# Настройка времени задержки первого сообщения
@dp.callback_query(F.data == "first_message_delay")
async def setup_first_message_delay(callback: CallbackQuery, state: FSMContext):

    await state.set_state(Setup.first_message_delay_setup)

    # Открытие файла для чтения
    with open(JSON_FILENAME, "r") as file:
        data = json.load(file)

    # Получение данных для чтения в переменную
    delay = data.get("first_message_delay", ["Не задано"])

    await callback.answer("")
    if hasattr(callback, 'message') and callback.message:
        message = cast(Message, callback.message)  # Явно указываем тип
        await message.edit_text(f"Сейчас задержка установлена на {delay} сек.\nУстановите задержку первого сообщения в секундах в формате:\n n (сек)",
                                reply_markup=kb.back_inline_keyboard)


# Обработчик настройки времени задержки первого сообщения
@dp.message(Setup.first_message_delay_setup)
async def setup_first_message_delay_state(message: Message, state: FSMContext):

    try:
        if message.text:
            first_message_delay = int(message.text) # Парсим сообщение в переменную

            # Защита от неправельно введённого значения
            if first_message_delay < 0:
                await message.answer("Введённое значение должно быть больше 0.",
                                    reply_markup=kb.start_reply_keyboard)
                await state.clear()
                return

            await message.answer(f"Задержка первого сообщения установлена на {first_message_delay} сек.",
                                reply_markup=kb.start_reply_keyboard)

            # Работа с json
            # Создание нового списка данных, полученных из сообщения
            new_data = {
                "first_message_delay": first_message_delay,
            }

            # Проверка на существование файла
            if os.path.exists(JSON_FILENAME): 
                with open(JSON_FILENAME) as file:
                    try:
                        data = json.load(file)  # Загружаем существующие данные        

                    except json.JSONDecodeError:
                        data = {}  # Если файл пуст или повреждён, создаём новый словарь

            else:
                data = {}  # Если файла нет, создаём новый словарь

            # Обновляем данные
            data.update(new_data)

            async with json_lock:
                # Обноваляем данные в файле
                with open(JSON_FILENAME, "w") as file:
                    json.dump(data, file, indent=4)


            await message.answer(f"Файл {JSON_FILENAME} обновлён.")

            await state.update_data(first_message_delay=first_message_delay)

    except ValueError:
        await message.answer("Ошибка ввода значения. Укажите значение, в формате, который был выше.")

    await state.clear()


# Настройк задержки перед удалением сообщения
@dp.callback_query(F.data == "remove_message_delay")
async def setup_remove_message_delay(callback: CallbackQuery, state: FSMContext):
    await state.set_state(Setup.remove_message_delay_setup)

    # Открытие файла для чтения
    with open(JSON_FILENAME, "r") as file:
        data = json.load(file)

    # Получение данных для чтения в переменную
    delay = data.get("remove_message_delay", ["Не задано"])


    await callback.answer("")
    if hasattr(callback, 'message') and callback.message:
        message = cast(Message, callback.message)  # Явно указываем тип
        await message.edit_text(f"Сейчас задержка установлена на {delay} сек.\nУстановите задержку перед удалением сообщения сообщения в секундах в формате:\n n (сек)",
                                reply_markup=kb.back_inline_keyboard)


# Обработчик настройк задержки перед удалением сообщения
@dp.message(Setup.remove_message_delay_setup)
async def setup_remove_message_delay_state(message: Message, state: FSMContext):

    try:
        if message.text:
            remove_message_delay = int(message.text) # Парсим сообщение в переменную

            # Защита от неправельно введённого значения
            if remove_message_delay < 0:
                await message.answer("Введённое значение должно быть больше 0.",
                                    reply_markup=kb.start_reply_keyboard)
                await state.clear()
                return

            await message.answer(f"Задержка перед удалением сообщения установлена на {remove_message_delay} сек.",
                                reply_markup=kb.start_reply_keyboard)

            # Работа с json
            # Создание нового списка данных, полученных из сообщения
            new_data = {
                "remove_message_delay": remove_message_delay,
            }

            # Проверка на существование файла
            if os.path.exists(JSON_FILENAME): 
                with open(JSON_FILENAME) as file:
                    try:
                        data = json.load(file)  # Загружаем существующие данные        

                    except json.JSONDecodeError:
                        data = {}  # Если файл пуст или повреждён, создаём новый словарь

            else:
                data = {}  # Если файла нет, создаём новый словарь

            # Обновляем данные
            data.update(new_data)

            async with json_lock:
                # Обноваляем данные в файле
                with open(JSON_FILENAME, "w") as file:
                    json.dump(data, file, indent=4)

            await message.answer(f"Файл {JSON_FILENAME} обновлён.")

            await state.update_data(remove_message_delay=remove_message_delay)

    except ValueError:
        await message.answer("Ошибка ввода значения. Укажите значение, в формате, который был выше.")

    await state.clear()

# Настройк задержки перед отправкой сообщения
@dp.callback_query(F.data == "send_message_delay")
async def setup_send_message_delay(callback: CallbackQuery, state: FSMContext):

    await state.set_state(Setup.send_message_delay_setup)

    # Открытие файла для чтения
    with open(JSON_FILENAME, "r") as file:
        data = json.load(file)

    # Получение данных для чтения в переменную
    delay = data.get("send_message_delay", ["Не задано"])

    await callback.answer("")
    if hasattr(callback, 'message') and callback.message:
        message = cast(Message, callback.message)  # Явно указываем тип
        await message.edit_text(f"Сейчас задержка установлена на {delay} сек.\nУстановите задержку перед отправкой сообщения сообщения в секундах в формате:\n n (сек)",
                                reply_markup=kb.back_inline_keyboard)

# Обработчик настройк задержки перед отправкой сообщения
@dp.message(Setup.send_message_delay_setup)
async def setup_send_message_delay_state(message: Message, state: FSMContext):

    try:
        if message.text:
            send_message_delay = int(message.text) # Парсим сообщение в переменную

            # Защита от неправельно введённого значения
            if send_message_delay < 0:
                await message.answer("Введённое значение должно быть больше 0.",
                                    reply_markup=kb.start_reply_keyboard)
                await state.clear()
                return

            await message.answer(f"Задержка перед удалением сообщения установлена на {send_message_delay} сек.",
                                reply_markup=kb.start_reply_keyboard)

            # Работа с json
            # Создание нового списка данных, полученных из сообщения
            new_data = {
                "send_message_delay": send_message_delay,
            }
            # Проверка на существование файла
            if os.path.exists(JSON_FILENAME): 
                with open(JSON_FILENAME) as file:
                    try:
                        data = json.load(file)  # Загружаем существующие данные        
                    except json.JSONDecodeError:
                        data = {}  # Если файл пуст или повреждён, создаём новый словарь
            else:
                data = {}  # Если файла нет, создаём новый словарь
            # Обновляем данные
            data.update(new_data)
            async with json_lock:
                # Обноваляем данные в файле
                with open(JSON_FILENAME, "w") as file:
                    json.dump(data, file, indent=4)

            await message.answer(f"Файл {JSON_FILENAME} обновлён.")

            await state.update_data(send_message_delay=send_message_delay)

    except ValueError:
        await message.answer("Ошибка ввода значения. Укажите значение, в формате, который был выше.")

    await state.clear()

# Обработчик кнопки назад
@dp.callback_query(F.data == "action_back")
async def back_to_main_menu(callback: CallbackQuery, state: FSMContext):

    await state.clear()  # Сбрасываем состояние

    if hasattr(callback, 'message') and callback.message:
        message = cast(Message, callback.message)  # Явно указываем тип
        await message.edit_text("Меню настройки",
                                reply_markup=kb.settings_inline_keyboard)


# Настройк максимального количества сообщений
@dp.callback_query(F.data == "max_messages_count")
async def setup_max_message_count(callback: CallbackQuery, state: FSMContext):

    await state.set_state(Setup.max_messages_count_setup)

    # Открытие файла для чтения
    with open(JSON_FILENAME, "r") as file:
        data = json.load(file)

    # Получение данных для чтения в переменную
    max_messages_count = data.get("max_messages_count", ["Не задано"])

    await callback.answer("")
    if hasattr(callback, 'message') and callback.message:
        message = cast(Message, callback.message)  # Явно указываем тип
        await message.edit_text(f"Сейчас стоит лимит на {max_messages_count} сообщений.\nУстановить максимальное количество сообщений в формате:\n n (сообщений)",
                                reply_markup=kb.back_inline_keyboard)

# Обработчик настройки максимального количества сообщений
@dp.message(Setup.max_messages_count_setup)
async def setup_max_message_count_state(message: Message, state: FSMContext):

    try:
        if message.text:
            max_messages_count = int(message.text) # Парсим сообщение в переменную

            # Защита от неправельно введённого значения
            if max_messages_count < 0:
                await message.answer("Введённое значение должно быть больше 0.",
                                    reply_markup=kb.start_reply_keyboard)
                await state.clear()
                return

            await message.answer(f"Лимит установлен на {max_messages_count} сообщений.",
                                reply_markup=kb.start_reply_keyboard)

            # Работа с json
            # Создание нового списка данных, полученных из сообщения
            new_data = {
                "max_messages_count": max_messages_count,
            }
            # Проверка на существование файла
            if os.path.exists(JSON_FILENAME): 
                with open(JSON_FILENAME) as file:
                    try:
                        data = json.load(file)  # Загружаем существующие данные        
                    except json.JSONDecodeError:
                        data = {}  # Если файл пуст или повреждён, создаём новый словарь
            else:
                data = {}  # Если файла нет, создаём новый словарь

            # Обновляем данные
            data.update(new_data)
            async with json_lock:
                # Обноваляем данные в файле
                with open(JSON_FILENAME, "w") as file:
                    json.dump(data, file, indent=4)

            await message.answer(f"Файл {JSON_FILENAME} обновлён.")

            await state.update_data(max_messages_count=max_messages_count)

    except ValueError:
        await message.answer("Ошибка ввода значения. Укажите значение, в формате, который был выше.")

    await state.clear()


# Обработчик сообщений из группы
@dp.message()
async def get_message_from_group(message: Message):

    global is_message_handled, can_send_message, disable_for_next_message, count

    print("? Получено сообщение из группы!")  # Отображаем факт получения
    print(f"? can_send_message: {can_send_message}")
    print(f"? disable_for_next_message: {disable_for_next_message}")
    print(f"? is_message_handled: {is_message_handled}")

    count = 0  # Сбрасываем счётчик при новом сообщении

    # Открытие файла для чтения
    with open(JSON_FILENAME, "r") as file:
        data = json.load(file)

    send_message_delay = data.get("send_message_delay")
    remove_message_delay = data.get("remove_message_delay")
    first_message_delay = data.get("first_message_delay")
    max_messages_count = data.get("max_messages_count")
    work_time = data.get("work_time")

    print(f"? Работаем в диапазоне {work_time[0]} - {work_time[1]}")
    print(f"⏳ Задержка перед первым сообщением: {first_message_delay}")

    if not is_message_handled:
        is_message_handled = True  
        print("✅ Сообщение обработано")

        # Задержка перед первым сообщением
        await asyncio.sleep(first_message_delay)

        # Цикл отправки сообщений
        while message.chat.id == GROUP_GET and can_send_message and is_time_in_range(work_time[0], work_time[1]):
            print(f"? Итерация {count + 1}/{max_messages_count}")
            
            if disable_for_next_message:
                print("⛔ disable_for_next_message = True, прерываем цикл")
                disable_for_next_message = False
                is_message_handled = False
                break

            if count >= max_messages_count:
                print(f"? Достигнут лимит сообщений ({max_messages_count}), выходим")
                is_message_handled = False
                break

            await asyncio.sleep(send_message_delay)

            try:
                print("? Отправляем сообщение...")
                sent_message = await bot.send_message(GROUP_SEND, "Найдено сообщение в группе по работе")
                print(f"✅ Сообщение отправлено: {sent_message.message_id}")

                await asyncio.sleep(remove_message_delay)
                await bot.delete_message(GROUP_SEND, sent_message.message_id)
                print(f"? Сообщение {sent_message.message_id} удалено")

            except Exception as e:
                print(f"❌ Ошибка отправки: {e}")
                if "Too Many Requests" in str(e):
                    await message.answer("⚠ Флуд-контроль! Ждем 30 секунд...")
                    await asyncio.sleep(30)

            count += 1

    # Сброс обработанных сообщений
    if not can_send_message and is_message_handled:
        print("? Сбрасываем обработку сообщений")
        is_message_handled = False


# Функция проверки времени
def is_time_in_range(start: str, end: str) -> bool:
    """Проверяет, входит ли текущее время в диапазон."""
    now = datetime.now().time()  # Получаем текущее время

    start_time = datetime.strptime(start, "%H:%M").time()
    end_time = datetime.strptime(end, "%H:%M").time()

    # Если время установлено как "00:00 - 00:00", всегда возвращаем True
    if start == "00:00" and end == "00:00":
        return True

    if start_time <= now <= end_time:
        return True

    else: 
        return False


# Запуск бота
async def main(): 

    await dp.start_polling(bot, allowed_updates=["message", "callback_query", "chosen_inline_result"], timeout=60, relax=1)


# Запуск асинхронной функции
if __name__ == "__main__": 
    logging.basicConfig(level=logging.INFO)
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Exit")
        os._exit(0)

P.P.S.

Надеюсь на помощь т.к. я уже перепробовал все методы, а этот проект очень важен для меня. Заранее спасибо.

Я пробовал добавлять перезапуск бота при ошибки соединения (не подходит т.к. теряется сообщение, которое очень важное), убирать глобальные переменные, менять группы и токен бота, менять сервер хостинга (пробовал на VDS, своём пк, PythonAnyware), переписывал код бота с нуля, убирал комманды взаимодействия с пользователем, изменял alowed_updates и менял задержки доступа к api telegram. Вроде перечислил всё.


Ответы (0 шт):