Как правильно работать с psql базой в асинхронном режиме | telegram bot | aiogram

Начал заниматься разработкой tg бота с базой psql и появился вопрос:

Как правильно работать с psql базой в асинхронном режиме, чтобы не делать много дорогих операций и рационально использовать ресурсы.

Всем привет

Недавно начал писать телеграм бота на aiogram 3. Есть потребность использовать psql базу. Сначала решил разобраться как работать в синхронном режиме и тут вроде все просто - создаем постоянное подключение в точке старта и для каждого запроса создаем курсор, который закрываем в конце запроса.

После того, как написал тестовый хендлер, решил попробовать положить бота через этот хендлер, что успешно получилось сделать используя 7-8 вызовов хендлера с запросом к базе за короткий промежуток времени. Ну, на синхронное решение в асинхронном боте я не рассчитывал. Перешел к изучению вопроса работы в асинхронном режиме и тут появился вопрос:

Как правильно работать с psql базой в асинхронном режиме, чтобы не делать много дорогих операций и рационально использовать ресурсы.

Почитал некоторые ресурсы, которые попадались в интернете, доку, спросил у гпт. Везде в примерах для каждого запроса создается отдельное подключение. Кажется, что это не очень рационально.

Мой типичный кейс в рамках одного хендлера - получить id пользователя из сообщения, с этим id сделать запрос к базе и вытащить из таблицы записи с этим id Таких хендлеров около 10-12 штук

Буду рад примерам кода


Ответы (1 шт):

Автор решения: AnnaBazueva

Для этой задачи правильнее использовать asyncpg:

Чтоб для каждого запроса не создавать отдельного подключения,
можно получать подключение из пула.
Создайте класс Database и через метод fetch_user_data производите асинхронные запросы.

Базовый пример:

import asyncio
from aiogram import Bot, Dispatcher, types
from aiogram.contrib.fsm_storage.memory import MemoryStorage
import asyncpg

API_TOKEN = 'YOUR_API_TOKEN'
DSN = 'postgresql://user:password@localhost/dbname'

class Database:
    def __init__(self, dsn):
        self.dsn = dsn
        self.pool = None

    async def init(self):
        self.pool = await asyncpg.create_pool(dsn=self.dsn)

    async def fetch_user_data(self, user_id):
        async with self.pool.acquire() as connection:
            result = await connection.fetch('SELECT * FROM users WHERE id = $1', user_id)
            return result

    async def close(self):
        await self.pool.close()


async def on_startup(dp):
    # Инициализация базы данных
    await db.init()


async def on_shutdown(dp):
    # Закрытие базы данных
    await db.close()


async def handle_start(message: types.Message):
    user_id = message.from_user.id
    user_data = await db.fetch_user_data(user_id)
    await message.answer(f"Данные пользователя: {user_data}")


async def main():
    # Создание экземпляра бота и диспетчера
    bot = Bot(token=API_TOKEN)
    storage = MemoryStorage()
    dp = Dispatcher(bot, storage=storage)

    # Регистрация обработчиков
    dp.register_message_handler(handle_start, commands=["start"])

    # Запуск бота
    await dp.start_polling(on_startup=on_startup, on_shutdown=on_shutdown)

if __name__ == '__main__':
    db = Database(DSN)
    asyncio.run(main())
→ Ссылка