Запуск tg bot + fast api
Возникла такая необходимость при создании проекта. У меня есть fastapi магазин, где пользователь добавляет товары в корзину и отправляет. Так же есть tg_bot, данное fastapi будет работать как tg WebApp, мне необходимо отправлять заявки в отдельный канал. Как мне реализовать запуск и fastapi и tg_bot совместно, ChatGPT предложил вариант, запускается и работает все хорошо, но никак не получается отключить в последующем, в терминале ctr + c не срабатывает и все зависает
tg_bot.py
import asyncio
import signal
from aiogram import Bot, Dispatcher, html
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from aiogram.filters import CommandStart
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton, WebAppInfo
TOKEN =''
CHANNEL_ID = ''
dp = Dispatcher()
@dp.message(CommandStart())
async def command_start_handler(message: Message) -> None:
markup = InlineKeyboardMarkup(
inline_keyboard=[
[
InlineKeyboardButton(
text="Оформить заказ",
web_app=WebAppInfo(url=f"https://..io"),
)
]
]
)
await message.answer("По кнопке ниже вы можете оформить заказ!", reply_markup=markup)
# Отправка сообщения в канал
await send_message_to_channel(dp.bot)
async def send_message_to_channel(bot: Bot) -> None:
await bot.send_message(CHANNEL_ID, "Пользователь ввёл команду /start")
async def start_bot() -> None:
bot = Bot(token=TOKEN, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
dp.bot = bot
await dp.start_polling(bot)
# Обработчик завершения для Telegram бота
shutdown_event = asyncio.Event()
# Обработчик сигналов
def signal_handler(sig, frame):
print(f"Received signal: {sig}. Shutting down...")
shutdown_event.set()
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, signal_handler)
async def shutdown():
await shutdown_event.wait()
dp.bot.session.close()
main.py
import asyncio
import signal
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from contextlib import asynccontextmanager
from app.api.router import router as router_api
from app.api.tg_bot import start_bot, shutdown_event # Импортируйте ваш Telegram-бот
@asynccontextmanager
async def lifespan(app: FastAPI):
# Выполняем действия перед запуском приложения
loop = asyncio.get_event_loop()
bot_task = loop.create_task(start_bot())
try:
yield
finally:
# Выполняем действия при завершении приложения
shutdown_event.set() # Сигнализируем о завершении
bot_task.cancel()
try:
await bot_task
except asyncio.CancelledError:
pass # Ожидаем завершения задачи бота
app = FastAPI(lifespan=lifespan)
app.mount('/static', StaticFiles(directory='app/static'), 'static')
app.include_router(router_api)