Как организовать одновременную работу Telegram бота (Aiogram) и Flask?
Всем доброго времени суток! Нужен совет.
У меня есть бот (написанный с помощью telebot) и API (Flask), которые запущены в двух потоках и спокойно работают одновременно. Бот запущен через polling.
API занимается обработкой входящих POST запросов с нескольких адресов.
Сейчас переписываю бота с использованием aiogram. Знаю, что нельзя смешивать асинхронность и потоки, но возникает трудности с одновременной работой этих двух модулей. Работает либо бот, либо API.
Какие есть варианты подружить Flask и Aiogram, или же стоит отказаться от использования Flask и обрабатывать запросы другим способом?
Также читал про вариант с webhook`ами, вроде там можно отлавливать POST-запросы, и тогда Flask не нужен.
Пробовал вариант с запуском в разных потоках:
import threading
import time
from time import sleep
from flask import Flask,request
from flask_restful import Api, Resource
import config
import asyncio,hashlib,requests,threading,time
import logs
from aiogram import Bot, Dispatcher, executor, types
from asyncio import new_event_loop, set_event_loop
app = Flask(__name__)
api = Api(app)
dp = Dispatcher(Bot(token=config.bot_token))
@dp.message_handler(commands="start")
async def message_start(message: types.Message):
await message.answer("HI")
@app.route('/test', methods=['POST'])
def get_test():
data = request.get_json()
print (data)
return ("ok")
def app_run():
while True:
try:
app.run()
except Exception as ex:
print(ex)
def bot_polling():
try:
executor.start_polling(dp, skip_updates=True)
set_event_loop(new_event_loop())
except Exception as ex:
print(ex)
bot_thread = threading.Thread(target=bot_polling)
bot_thread.daemon = True
bot_thread.start()
app_thread = threading.Thread(target=app_run)
app_thread.daemon = True
app_thread.start()
if __name__ == "__main__":
while True:
try:
sleep(1)
except KeyboardInterrupt:
break
Пробовал через асинхронность с помощью встроенной в aiogram on_startup:
import threading
import time
from time import sleep
from flask import Flask, jsonify, request
from flask_restful import Api, Resource
import config
import asyncio,hashlib,requests,threading,time
import logs
from aiogram import Bot, Dispatcher, executor, types
from asyncio import new_event_loop, set_event_loop
app = Flask(__name__)
api = Api(app)
dp = Dispatcher(Bot(token=config.bot_token))
@dp.message_handler(commands="start")
async def message_start(message: types.Message):
print("hi")
await message.answer("HI")
@app.route('/test', methods=['POST'])
def get_test():
data = request.get_json()
print (data)
return ("ok")
async def is_enabled():
while True:
await app.run()
async def on_startup(x):
asyncio.create_task(is_enabled())
if __name__ == '__main__':
executor.start_polling(dispatcher=dp, skip_updates=True, on_startup=on_startup)
Ответы (1 шт):
Вариант с потоками почти правильный. Нужно только loop создать до вызова start_polling:
set_event_loop(new_event_loop())
executor.start_polling(dp, skip_updates=True)
С асинхронным тоже можно. Вот так будет работать:
...
async def on_startup(x):
asyncio.create_task(asyncio.to_thread(app.run))
if __name__ == '__main__':
executor.start_polling(dispatcher=dp, skip_updates=True, on_startup=on_startup)
Есть только проблема с остановкой flask, есть способы и ее побороть.