Telegram бот на aiogram (python) ошибка TelegramNetworkError: HTTP Client says - ClientOSError: [Errno 104] Connection reset by peer
Кратко суть проблемы:
Написал telegram бота (пишу далеко не первый раз) и появилась очень странная проблема: через некоторое продолжительное время (2-5 часов) (я так понимаю при нахождении сообщения) бот зависает и появляется ошибка:
Ошибка:
ERROR:aiogram.dispatcher:Failed to fetch updates - TelegramNetworkError: HTTP Client says - ClientOSError: [Errno 104] Connection reset by peer WARNING:aiogram.dispatcher:Sleep for 1.000000 seconds and try again... (tryings = 0, bot id = 7847598237) INFO:aiogram.dispatcher:Connection established (tryings = 1, bot id = 7847598237)INFO:aiogram.event:Update id=391149334 is handled. Duration 82 ms by bot id=7847598237INFO:aiogram.event:Update id=391149335 is handled. Duration 88 ms by bot id=7847598237INFO:aiogram.event:Update id=391149337 is handled. Duration 66 ms by bot id=7847598237
Коротко о функционале бота:
Есть 2 группы в которых состоит бот: группа, где он читает сообщения и группа куда он отправляет сообщения. Пользователь может задать разные параметры (время задержки, время работы и тд). После нажатия на кнопку "включить" бот начинает чтение сообщений из одной группы и если время работы и другие параметры удовлетворяют условию, то он отправляет несколько сообщений с заданной задержкой в другую группу.
P.S.
В группе из которой читают сообщения у бота права администратора, 5 пользователей из которых только 1 отправляет сообщения только пару раз в день (спама нет).
Код бота:
import asyncio, logging, json, os
from aiogram import Bot, Dispatcher, F
from aiogram.filters import CommandStart
from aiogram.types import CallbackQuery, Message
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from datetime import datetime
from typing import cast
from config import API_TOKEN, GROUP_GET, GROUP_SEND # Импорт токена, группы получения и отправки сообщений
import modules.keyboard as kb # Импорт клавиатуры
JSON_FILENAME="data.json"
can_send_message=False
disable_for_next_message=False
is_message_handled=False
count=0
# Настройка логирования
logging.basicConfig(level=logging.INFO)
# Создание объекта блокировки задач
can_send_message_lock = asyncio.Lock()
json_lock = asyncio.Lock()
disable_for_next_message_lock = asyncio.Lock()
is_message_handled_lock = asyncio.Lock()
bot = Bot(token=API_TOKEN) # Присвоение токена
dp = Dispatcher(bot=bot)
# Создание класса состояний
class Setup(StatesGroup):
work_time_setup = State()
first_message_delay_setup = State()
remove_message_delay_setup = State()
send_message_delay_setup = State()
max_messages_count_setup = State()
# Комманда /start
@dp.message(CommandStart())
async def start_command(message: Message):
await message.answer("Добро пожаловать в бота для пробуждения ночью, в случае прихода важных сообщений.",
reply_markup=kb.start_reply_keyboard)
# Меню настроек
@dp.message(F.text == kb.BUTTON_SETTINGS)
async def settings_menu(message: Message):
await message.answer("Меню настроек",
reply_markup=kb.settings_inline_keyboard)
# Кнопка включения отправки сообщений
@dp.message(F.text == kb.BUTTON_ON)
async def enable_sending(message: Message):
global can_send_message
# Блокировка глобальной переменной на время работы
async with can_send_message_lock:
can_send_message = True
await message.answer("✅Отправка уведомлений включена.")
# Кнопка выключения отправки сообщений
@dp.message(F.text == kb.BUTTON_OFF)
async def disable_sending(message: Message):
global can_send_message
# Блокировка глобальной переменной на время работы
async with can_send_message_lock:
can_send_message = False
await message.answer("❌Отправка уведомлений выкючена.")
# Кнопка выключения уведомлений до следующего сообщения
@dp.message(F.text == kb.BUTTON_OFF_FOR_NEXT_MESSAGE)
async def disable_sending_for_next_message(message: Message):
global disable_for_next_message
# Блокировка глобальной переменной на время работы
async with disable_for_next_message_lock:
disable_for_next_message = True
await message.answer("⏸️Отправка уведомлений отключена до следующего сообщения")
# Закрытие меню настроек
@dp.callback_query(F.data == "settings_close")
async def settings_close(callback: CallbackQuery):
await callback.answer("Вы закрыли меню настроек")
if hasattr(callback, 'message') and callback.message:
message = cast(Message, callback.message) # Явно указываем тип
await message.delete()
# Настройка времени работы
@dp.callback_query(F.data == "work_time")
async def setup_work_time(callback: CallbackQuery, state: FSMContext):
await state.set_state(Setup.work_time_setup)
# Открытие файла для чтения
with open(JSON_FILENAME, "r") as file:
data = json.load(file)
# Получение данных для чтения в переменную
work_time = data.get("work_time", ["Не задано", "Не задано"])
await callback.answer("")
if hasattr(callback, 'message') and callback.message:
message = cast(Message, callback.message) # Явно указываем тип
await message.edit_text(f"Текущее время установлено с {work_time[0]} по {work_time[1]}.\nУстановите время работы в формате:\n18:00 - 9:00 (начало - конец)\nЧтобы установить круглосуточную работу, введите '00:00 - 00:00'",
reply_markup=kb.back_inline_keyboard)
# Обработчик настройки времени работы
@dp.message(Setup.work_time_setup)
async def setup_work_time_state(message: Message, state: FSMContext):
try:
if message.text:
start_time_str, end_time_str = message.text.split(" - ") # Парсим сообщение в переменные, удаляя разделитель
start_work_time = datetime.strptime(start_time_str.strip(), "%H:%M").time() # Записываем данные в переменные с типом данных время
end_work_time = datetime.strptime(end_time_str.strip(), "%H:%M").time()
await message.answer(f"Время работы установлено с {start_work_time.strftime('%H:%M')} по {end_work_time.strftime('%H:%M')}",
reply_markup=kb.start_reply_keyboard)
# Работа с json
# Создание нового списка данных, полученных из сообщения
new_data = {
"work_time": [start_time_str, end_time_str],
}
# Проверка на существование файла
if os.path.exists(JSON_FILENAME):
with open(JSON_FILENAME) as file:
try:
data = json.load(file) # Загружаем существующие данные
except json.JSONDecodeError:
data = {} # Если файл пуст или повреждён, создаём новый словарь
else:
data = {} # Если файла нет, создаём новый словарь
# Обновляем данные
data.update(new_data)
async with json_lock:
# Обноваляем данные в файле
with open(JSON_FILENAME, "w") as file:
json.dump(data, file, indent=4)
await message.answer(f"Файл {JSON_FILENAME} обновлён.")
await state.update_data(start_work_time=start_work_time, end_work_time=end_work_time)
except ValueError:
await message.answer("⚠ Ошибка! Введите время в формате HH:MM - HH:MM")
await state.clear()
# Настройка времени задержки первого сообщения
@dp.callback_query(F.data == "first_message_delay")
async def setup_first_message_delay(callback: CallbackQuery, state: FSMContext):
await state.set_state(Setup.first_message_delay_setup)
# Открытие файла для чтения
with open(JSON_FILENAME, "r") as file:
data = json.load(file)
# Получение данных для чтения в переменную
delay = data.get("first_message_delay", ["Не задано"])
await callback.answer("")
if hasattr(callback, 'message') and callback.message:
message = cast(Message, callback.message) # Явно указываем тип
await message.edit_text(f"Сейчас задержка установлена на {delay} сек.\nУстановите задержку первого сообщения в секундах в формате:\n n (сек)",
reply_markup=kb.back_inline_keyboard)
# Обработчик настройки времени задержки первого сообщения
@dp.message(Setup.first_message_delay_setup)
async def setup_first_message_delay_state(message: Message, state: FSMContext):
try:
if message.text:
first_message_delay = int(message.text) # Парсим сообщение в переменную
# Защита от неправельно введённого значения
if first_message_delay < 0:
await message.answer("Введённое значение должно быть больше 0.",
reply_markup=kb.start_reply_keyboard)
await state.clear()
return
await message.answer(f"Задержка первого сообщения установлена на {first_message_delay} сек.",
reply_markup=kb.start_reply_keyboard)
# Работа с json
# Создание нового списка данных, полученных из сообщения
new_data = {
"first_message_delay": first_message_delay,
}
# Проверка на существование файла
if os.path.exists(JSON_FILENAME):
with open(JSON_FILENAME) as file:
try:
data = json.load(file) # Загружаем существующие данные
except json.JSONDecodeError:
data = {} # Если файл пуст или повреждён, создаём новый словарь
else:
data = {} # Если файла нет, создаём новый словарь
# Обновляем данные
data.update(new_data)
async with json_lock:
# Обноваляем данные в файле
with open(JSON_FILENAME, "w") as file:
json.dump(data, file, indent=4)
await message.answer(f"Файл {JSON_FILENAME} обновлён.")
await state.update_data(first_message_delay=first_message_delay)
except ValueError:
await message.answer("Ошибка ввода значения. Укажите значение, в формате, который был выше.")
await state.clear()
# Настройк задержки перед удалением сообщения
@dp.callback_query(F.data == "remove_message_delay")
async def setup_remove_message_delay(callback: CallbackQuery, state: FSMContext):
await state.set_state(Setup.remove_message_delay_setup)
# Открытие файла для чтения
with open(JSON_FILENAME, "r") as file:
data = json.load(file)
# Получение данных для чтения в переменную
delay = data.get("remove_message_delay", ["Не задано"])
await callback.answer("")
if hasattr(callback, 'message') and callback.message:
message = cast(Message, callback.message) # Явно указываем тип
await message.edit_text(f"Сейчас задержка установлена на {delay} сек.\nУстановите задержку перед удалением сообщения сообщения в секундах в формате:\n n (сек)",
reply_markup=kb.back_inline_keyboard)
# Обработчик настройк задержки перед удалением сообщения
@dp.message(Setup.remove_message_delay_setup)
async def setup_remove_message_delay_state(message: Message, state: FSMContext):
try:
if message.text:
remove_message_delay = int(message.text) # Парсим сообщение в переменную
# Защита от неправельно введённого значения
if remove_message_delay < 0:
await message.answer("Введённое значение должно быть больше 0.",
reply_markup=kb.start_reply_keyboard)
await state.clear()
return
await message.answer(f"Задержка перед удалением сообщения установлена на {remove_message_delay} сек.",
reply_markup=kb.start_reply_keyboard)
# Работа с json
# Создание нового списка данных, полученных из сообщения
new_data = {
"remove_message_delay": remove_message_delay,
}
# Проверка на существование файла
if os.path.exists(JSON_FILENAME):
with open(JSON_FILENAME) as file:
try:
data = json.load(file) # Загружаем существующие данные
except json.JSONDecodeError:
data = {} # Если файл пуст или повреждён, создаём новый словарь
else:
data = {} # Если файла нет, создаём новый словарь
# Обновляем данные
data.update(new_data)
async with json_lock:
# Обноваляем данные в файле
with open(JSON_FILENAME, "w") as file:
json.dump(data, file, indent=4)
await message.answer(f"Файл {JSON_FILENAME} обновлён.")
await state.update_data(remove_message_delay=remove_message_delay)
except ValueError:
await message.answer("Ошибка ввода значения. Укажите значение, в формате, который был выше.")
await state.clear()
# Настройк задержки перед отправкой сообщения
@dp.callback_query(F.data == "send_message_delay")
async def setup_send_message_delay(callback: CallbackQuery, state: FSMContext):
await state.set_state(Setup.send_message_delay_setup)
# Открытие файла для чтения
with open(JSON_FILENAME, "r") as file:
data = json.load(file)
# Получение данных для чтения в переменную
delay = data.get("send_message_delay", ["Не задано"])
await callback.answer("")
if hasattr(callback, 'message') and callback.message:
message = cast(Message, callback.message) # Явно указываем тип
await message.edit_text(f"Сейчас задержка установлена на {delay} сек.\nУстановите задержку перед отправкой сообщения сообщения в секундах в формате:\n n (сек)",
reply_markup=kb.back_inline_keyboard)
# Обработчик настройк задержки перед отправкой сообщения
@dp.message(Setup.send_message_delay_setup)
async def setup_send_message_delay_state(message: Message, state: FSMContext):
try:
if message.text:
send_message_delay = int(message.text) # Парсим сообщение в переменную
# Защита от неправельно введённого значения
if send_message_delay < 0:
await message.answer("Введённое значение должно быть больше 0.",
reply_markup=kb.start_reply_keyboard)
await state.clear()
return
await message.answer(f"Задержка перед удалением сообщения установлена на {send_message_delay} сек.",
reply_markup=kb.start_reply_keyboard)
# Работа с json
# Создание нового списка данных, полученных из сообщения
new_data = {
"send_message_delay": send_message_delay,
}
# Проверка на существование файла
if os.path.exists(JSON_FILENAME):
with open(JSON_FILENAME) as file:
try:
data = json.load(file) # Загружаем существующие данные
except json.JSONDecodeError:
data = {} # Если файл пуст или повреждён, создаём новый словарь
else:
data = {} # Если файла нет, создаём новый словарь
# Обновляем данные
data.update(new_data)
async with json_lock:
# Обноваляем данные в файле
with open(JSON_FILENAME, "w") as file:
json.dump(data, file, indent=4)
await message.answer(f"Файл {JSON_FILENAME} обновлён.")
await state.update_data(send_message_delay=send_message_delay)
except ValueError:
await message.answer("Ошибка ввода значения. Укажите значение, в формате, который был выше.")
await state.clear()
# Обработчик кнопки назад
@dp.callback_query(F.data == "action_back")
async def back_to_main_menu(callback: CallbackQuery, state: FSMContext):
await state.clear() # Сбрасываем состояние
if hasattr(callback, 'message') and callback.message:
message = cast(Message, callback.message) # Явно указываем тип
await message.edit_text("Меню настройки",
reply_markup=kb.settings_inline_keyboard)
# Настройк максимального количества сообщений
@dp.callback_query(F.data == "max_messages_count")
async def setup_max_message_count(callback: CallbackQuery, state: FSMContext):
await state.set_state(Setup.max_messages_count_setup)
# Открытие файла для чтения
with open(JSON_FILENAME, "r") as file:
data = json.load(file)
# Получение данных для чтения в переменную
max_messages_count = data.get("max_messages_count", ["Не задано"])
await callback.answer("")
if hasattr(callback, 'message') and callback.message:
message = cast(Message, callback.message) # Явно указываем тип
await message.edit_text(f"Сейчас стоит лимит на {max_messages_count} сообщений.\nУстановить максимальное количество сообщений в формате:\n n (сообщений)",
reply_markup=kb.back_inline_keyboard)
# Обработчик настройки максимального количества сообщений
@dp.message(Setup.max_messages_count_setup)
async def setup_max_message_count_state(message: Message, state: FSMContext):
try:
if message.text:
max_messages_count = int(message.text) # Парсим сообщение в переменную
# Защита от неправельно введённого значения
if max_messages_count < 0:
await message.answer("Введённое значение должно быть больше 0.",
reply_markup=kb.start_reply_keyboard)
await state.clear()
return
await message.answer(f"Лимит установлен на {max_messages_count} сообщений.",
reply_markup=kb.start_reply_keyboard)
# Работа с json
# Создание нового списка данных, полученных из сообщения
new_data = {
"max_messages_count": max_messages_count,
}
# Проверка на существование файла
if os.path.exists(JSON_FILENAME):
with open(JSON_FILENAME) as file:
try:
data = json.load(file) # Загружаем существующие данные
except json.JSONDecodeError:
data = {} # Если файл пуст или повреждён, создаём новый словарь
else:
data = {} # Если файла нет, создаём новый словарь
# Обновляем данные
data.update(new_data)
async with json_lock:
# Обноваляем данные в файле
with open(JSON_FILENAME, "w") as file:
json.dump(data, file, indent=4)
await message.answer(f"Файл {JSON_FILENAME} обновлён.")
await state.update_data(max_messages_count=max_messages_count)
except ValueError:
await message.answer("Ошибка ввода значения. Укажите значение, в формате, который был выше.")
await state.clear()
# Обработчик сообщений из группы
@dp.message()
async def get_message_from_group(message: Message):
global is_message_handled, can_send_message, disable_for_next_message, count
print("? Получено сообщение из группы!") # Отображаем факт получения
print(f"? can_send_message: {can_send_message}")
print(f"? disable_for_next_message: {disable_for_next_message}")
print(f"? is_message_handled: {is_message_handled}")
count = 0 # Сбрасываем счётчик при новом сообщении
# Открытие файла для чтения
with open(JSON_FILENAME, "r") as file:
data = json.load(file)
send_message_delay = data.get("send_message_delay")
remove_message_delay = data.get("remove_message_delay")
first_message_delay = data.get("first_message_delay")
max_messages_count = data.get("max_messages_count")
work_time = data.get("work_time")
print(f"? Работаем в диапазоне {work_time[0]} - {work_time[1]}")
print(f"⏳ Задержка перед первым сообщением: {first_message_delay}")
if not is_message_handled:
is_message_handled = True
print("✅ Сообщение обработано")
# Задержка перед первым сообщением
await asyncio.sleep(first_message_delay)
# Цикл отправки сообщений
while message.chat.id == GROUP_GET and can_send_message and is_time_in_range(work_time[0], work_time[1]):
print(f"? Итерация {count + 1}/{max_messages_count}")
if disable_for_next_message:
print("⛔ disable_for_next_message = True, прерываем цикл")
disable_for_next_message = False
is_message_handled = False
break
if count >= max_messages_count:
print(f"? Достигнут лимит сообщений ({max_messages_count}), выходим")
is_message_handled = False
break
await asyncio.sleep(send_message_delay)
try:
print("? Отправляем сообщение...")
sent_message = await bot.send_message(GROUP_SEND, "Найдено сообщение в группе по работе")
print(f"✅ Сообщение отправлено: {sent_message.message_id}")
await asyncio.sleep(remove_message_delay)
await bot.delete_message(GROUP_SEND, sent_message.message_id)
print(f"? Сообщение {sent_message.message_id} удалено")
except Exception as e:
print(f"❌ Ошибка отправки: {e}")
if "Too Many Requests" in str(e):
await message.answer("⚠ Флуд-контроль! Ждем 30 секунд...")
await asyncio.sleep(30)
count += 1
# Сброс обработанных сообщений
if not can_send_message and is_message_handled:
print("? Сбрасываем обработку сообщений")
is_message_handled = False
# Функция проверки времени
def is_time_in_range(start: str, end: str) -> bool:
"""Проверяет, входит ли текущее время в диапазон."""
now = datetime.now().time() # Получаем текущее время
start_time = datetime.strptime(start, "%H:%M").time()
end_time = datetime.strptime(end, "%H:%M").time()
# Если время установлено как "00:00 - 00:00", всегда возвращаем True
if start == "00:00" and end == "00:00":
return True
if start_time <= now <= end_time:
return True
else:
return False
# Запуск бота
async def main():
await dp.start_polling(bot, allowed_updates=["message", "callback_query", "chosen_inline_result"], timeout=60, relax=1)
# Запуск асинхронной функции
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Exit")
os._exit(0)
P.P.S.
Надеюсь на помощь т.к. я уже перепробовал все методы, а этот проект очень важен для меня. Заранее спасибо.
Я пробовал добавлять перезапуск бота при ошибки соединения (не подходит т.к. теряется сообщение, которое очень важное), убирать глобальные переменные, менять группы и токен бота, менять сервер хостинга (пробовал на VDS, своём пк, PythonAnyware), переписывал код бота с нуля, убирал комманды взаимодействия с пользователем, изменял alowed_updates и менял задержки доступа к api telegram. Вроде перечислил всё.