Как парсить тг канал в режиме онлайн?

У меня есть код, который сохраняет историю сообщений каналов телеграмм, но мне надо сделать, чтобы он мог сохранять новые сообщения в режиме реального времени

Код парсинга канала:

import logging  # стандартная библиотека для логирования
import parser_functions  # библиотека этого парсера
from telethon import TelegramClient, events, sync, connection  # pip3 install telethon
from telethon.tl.functions.channels import JoinChannelRequest
from config import api_id, api_hash  # получение айди и хэша нашего приложения из файла config.py
from loguru import logger
import asyncio
from telethon.errors.rpcerrorlist import FloodWaitError


# настройка логгера
logging.basicConfig(
    level=logging.INFO,
    filename='parser_log.log',
    filemode='w',
    format="%(asctime)s %(levelname)s %(message)s"
)


url = ["XXXX"]
flag = True



async def main():
    async with TelegramClient('new', api_id, api_hash) as client:
        for channel in url:
            try:
                logger.info(f"Аккаунт был подключен!")
                await client(JoinChannelRequest(channel))
                err = await parser_functions.parse(client, channel)  # обработка сообщений
                logger.info(f"Сообщения были спарсины!")
            except FloodWaitError as fwe:
                print(f'Waiting for {fwe}')
                await asyncio.sleep(delay=fwe.seconds)
        await client.run_until_disconnected()




if __name__ == "__main__":
    asyncio.run(main())

Функция забора ID канала и обработка сообщений и медиа файлов:

from telethon.tl.types import MessageEntityTextUrl
from glob import glob
from dateutil.relativedelta import relativedelta  # pip3 install python-dateutil
import datetime
import os
import asyncio


async def get_channel_id(client, link):  # получение ID канала
    m = await client.get_messages(link, limit=1)
    channel_id = m[0].peer_id.channel_id
    return str(channel_id)


def clearify_text(msg):  # очищение текста от символов гиперссылки
    text = msg.message
    text_splitted = text.split()
    text_listed = [word for word in text_splitted if word != ' ']
    return " ".join(text_listed)


async def get_message_content(client, msg, url, channel_name, directory_name):  # получение содержимого сообщения
    msg_date = str(msg.date)  # дата отправки сообщения
    msg_url = url + '/' + str(msg.id)  # каст ссылки на сообщение
    file = open(f"{channel_name}/{directory_name}/{directory_name}_meta.txt", 'a+')  # запись метаданных сообщения
    file.write(msg_url)
    file.write('\n' + msg_date)
    file.close()
    if msg.message:  # если сообщение содержит текст, запись этого текста в текстовый файл в папке сообщения
        text = clearify_text(msg=msg)
        file = open(f"{channel_name}/{directory_name}/{directory_name}.txt", "w")
        file.write(text)
        file.close()
    if msg.media:  # если сообщение содержит медиа (фото, видео, документы, файлы), загрузка медиа в папку сообщения
        await client.download_media(message=msg, file=f"{channel_name}/{directory_name}")
    if msg.entities:  # запись гиперссылок из текста сообщения в файл сообщения
        urls = [ent.url for ent in msg.entities if isinstance(ent, MessageEntityTextUrl)]
        file = open(f"{channel_name}/{directory_name}/{directory_name}.txt", mode='a+')
        for u in urls:
            file.write('\n' + u)
        file.close()


async def find_last_parsed_date(path):  # определение даты, с которой начинать парсинг
    paths = glob(f"{path}/*/*meta.txt", recursive=True)  # поиск существующих метаданных по уже собранным сообщениям
    oldest = datetime.datetime.strptime("1970-01-01 00:00:00+00:00", "%Y-%m-%d %H:%M:%S%z")
    temp = oldest
    for p in paths:  # поиск даты отправки последнего сообщения
        with open(p, 'r') as file:
            date = datetime.datetime.strptime(file.readlines()[-1], "%Y-%m-%d %H:%M:%S%z")
            if date > oldest:
                oldest = date
    if temp == oldest:
        oldest = datetime.datetime.now() - relativedelta(months=3)  # если сообщений нет, офсет устанавливается на
                                                                    # три месяца от текущей даты
    return oldest


async def parse(client, url):  # сбор сообщений из канала
    err = []  # переменная возможной ошибки
    channel_id = await get_channel_id(client, url)  # получение ID канала
    os.makedirs(channel_id, exist_ok=True)  # создание папки канала в текущей директории
    oldest = await find_last_parsed_date(channel_id)  # получение даты, с которой начинать парсинг
    async for message in client.iter_messages(url, reverse=True, offset_date=oldest):  # итератор по сообщениям (урл - ссылка
                                                                                 # на канал, реверс - итерация от старых
                                                                                 # к новым, офсет - дата с которой
                                                                                 # начинать парсинг
        try:
            directory_name = str(message.id)  # получение ID сообщения
            os.makedirs(f"{channel_id}/{directory_name}", exist_ok=True)  # создание папки сообщения
            await get_message_content(client, message, url, channel_id, directory_name)  # обработка сообщения

        except Exception as passing:  # обработка ошибок
            err.append(passing)
            continue
    return err  # возврат возможных ошибок

Чтобы записалась история сообщений в канале, надо запустить скрипт заново, а мне нужно чтобы это можно было сделать в онлайне (без перезапуск скрипта), долго искал информацию, но не нашёл ничего годного для моего кода, прошу, пожалуйста, помочь


Ответы (1 шт):

Автор решения: kristal

Отвечая на ваш вопрос, должно быть как-то так:

@client.on(events.NewMessage())
async def new_message_handler(event):
    """Обработчик новых сообщений"""
    channel_id = await parser_functions.get_channel_id(client, url)  # Тут сами определите что у вас в url
    try:
        directory_name = str(event.message.id)
        os.makedirs(f"{channel_id}/{directory_name}", exist_ok=True)
        await parser_functions.get_message_content(client, event.message, url, channel_id, directory_name)

    except Exception as passing:
        err.append(passing)
        continue

Код писался без редактора, поэтому может сразу не заработать. А вообще, даже на этом этапе видно что архитектура не самая удачная. Функции, как швейцарский нож, делают несколько вещей одновременно(Да здравствует single responsibility), внедрение нового функционала тянет за собой дублирование уже существующего кода, аргументы функций меняют названия хотя принимают один и тот же аргумент(Я про url котрый сначала channel). Да и под каждое новое сообщение файлы никто не создаёт(Почитайте про базы данных или хотя бы про json). Надеюсь, я смог донести Вам свою мысль. Совершенствуйтесь и удачи в дальнейшем изучении языка.

→ Ссылка