Нужно сделать полностью асинхронного телеграмм бота
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
import asyncio
import sys
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("Hi.")
async def run_bot(token: str) -> None:
app = ApplicationBuilder().token(token).build()
app.add_handler(CommandHandler("start", start))
print("Бот запущен...")
await app.run_polling()
if __name__ == "__main__":
token = 'mytoken'
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(run_bot(token))
except KeyboardInterrupt:
print("Бот остановлен.")
sys.exit(0)
except Exception as e:
print(f"Произошла ошибка: {e}")
finally:
loop.close()
Суть в том, что данный код работает когда уберу async run_bot
, но почему нельзя что бы все было в async
?
почему так нельзя делать или я что то не сделал?
Ответы (1 шт):
Автор решения: Алексей Сундеев
→ Ссылка
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes
import sys
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text("Hi.")
async def run_bot(token: str) -> None:
app = ApplicationBuilder().token(token).build()
app.add_handler(CommandHandler("start", start))
print("Бот запущен...")
await app.run_polling()
if __name__ == "__main__":
token = 'mytoken'
try:
# Просто запускаем асинхронную функцию
import asyncio
asyncio.run(run_bot(token)) # run_bot будет вызван асинхронно
except KeyboardInterrupt:
print("Бот остановлен.")
sys.exit(0)
except Exception as e:
print(f"Произошла ошибка: {e}")
Использование asyncio.run() вместо loop.run_until_complete(). asyncio.run() управляет жизненным циклом событий, включая создание и закрытие цикла, что делает код проще. app.run_polling() уже является асинхронным методом, и он сам управляет запуском цикла событий, поэтому нет необходимости вручную контролировать цикл.