Как грамотно и правильно взаимодействовать с БД?

Я пишу на python и задался вопросом, как правильно взаимодействовать с БД? Написать класс или просто функции отдельные? Именно архитектура. Или просто по мере необходимости запросы в коде вставлять? Использую postgresql. Какой способ наиболее часто используемый и рекомендуемый?

print("Заранее спасибо, буду рад любому совету!")

Ответы (1 шт):

Автор решения: Runneso

Архитектура

Для взаимодействуя с БД есть два пути, между которыми нет чётко правильного: сырые SQL запросы или использование ORM (ORM позволяет взаимодействовать с БД, как с Python объектом, избегая написания запросов на языке SQL).

Архитектура (на примере использования ORM SQLAlchemy и FastAPI):

  1. У нас есть некий файл, в котором мы подключаемся к базе (создаём движок и сессии) и проводим начальные миграции.
from settings import Config, load_config

from sqlalchemy.engine import URL
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine, AsyncEngine
from sqlalchemy.orm import DeclarativeBase, declarative_base

config: Config = load_config()


def get_postgres_async_engine() -> AsyncEngine:
    postgres_url: URL = URL.create(
        drivername=config.postgres.POSTGRES_DRIVER,
        username=config.postgres.POSTGRES_USER,
        password=config.postgres.POSTGRES_PASSWORD,
        host=config.postgres.POSTGRES_HOST,
        port=config.postgres.POSTGRES_PORT,
        database=config.postgres.POSTGRES_DB
    )

    async_engine: AsyncEngine = create_async_engine(url=postgres_url)
    return async_engine


engine = get_postgres_async_engine()
Base: DeclarativeBase = declarative_base()


def get_postgres_async_session_maker() -> async_sessionmaker[AsyncSession]:
    async_session_maker: async_sessionmaker[AsyncSession] = async_sessionmaker(bind=engine, expire_on_commit=False)
    return async_session_maker


async def get_async_session() -> AsyncSession:
    async with get_postgres_async_session_maker()() as session:
        yield session
  1. При использование ORM мы работает с Python объектами, поэтому где-то мы должны описать наши таблицы с использованием Python классов, которые называются моделями.
from database.db import Base, engine

from datetime import datetime
from typing import List

from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy import Text, BigInteger, Float, ForeignKey


class Users(Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    username: Mapped[str] = mapped_column(Text, unique=True)
    email: Mapped[str] = mapped_column(Text, unique=True)
    password: Mapped[str] = mapped_column(Text)
    createdAt: Mapped[float] = mapped_column(Float, default=datetime.now().timestamp())
    token: Mapped["UsersTokens"] = relationship(back_populates="user", uselist=False, lazy=False)
    friends: Mapped[List["UsersFriends"]] = relationship(back_populates="user", lazy=False, collection_class=set)


class UsersTokens(Base):
    __tablename__ = "tokens"
    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    timestamp: Mapped[float] = mapped_column(Float)
    user: Mapped["Users"] = relationship(back_populates="token", uselist=False, lazy=False)
    user_fk: Mapped[int] = mapped_column(ForeignKey("users.id"))


class UsersFriends(Base):
    __tablename__ = "friends"
    id: Mapped[int] = mapped_column(BigInteger, primary_key=True, autoincrement=True)
    friend: Mapped[int] = mapped_column(BigInteger)
    user: Mapped["Users"] = relationship(back_populates="friends", uselist=False, lazy=False)
    user_fk: Mapped[int] = mapped_column(ForeignKey("users.id"))

  1. Мы должны описать CRUD, с помощью которого наше приложение будет взаимодействовать с базой. Чаще всего CRUD является отдельным классом, которыЙ содержит в себе classmethod`ы.
from fastapi.security import OAuth2PasswordRequestForm

from schemas import (CreateUser,
                     UpdateUser,
                     UpdatePassword)
from settings import Config, load_config
from .models import (Users,
                     UsersTokens,
                     UsersFriends)

from datetime import datetime, timedelta

import jwt
from fastapi.exceptions import HTTPException
from fastapi import status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from bcrypt import hashpw
from loguru import logger


class CRUD:
    config: Config = load_config()

    @classmethod
    def get_token(cls, user_id: int):
        token = jwt.encode(
            payload={
                "id": user_id,
                "createdAt": datetime.now().timestamp(),
                "destroyedAt": (datetime.now() + timedelta(days=cls.config.jwt.JWT_TIME_LIVE)).timestamp()
            },
            key=cls.config.jwt.JWT_SECRET_KEY,
            algorithm=cls.config.jwt.JWT_ALGORITHM
        )

        return token

    @classmethod
    def get_hash(cls, password: str):
        return hashpw(
            password=password.encode("utf-8"),
            salt=cls.config.bcrypt.HASH_SALT.encode("utf-8")
        ).decode("utf-8")

    @classmethod
    async def auth(cls, token: str, session: AsyncSession):
        try:
            payload = jwt.decode(
                jwt=token,
                key=cls.config.jwt.JWT_SECRET_KEY,
                algorithms=[cls.config.jwt.JWT_ALGORITHM]
            )
        except Exception as error:
            logger.error(error)
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid JWT token!")

        if datetime.now().timestamp() > payload["destroyedAt"]:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid JWT token!")

        user = await cls.get_user_by_id(payload["id"], session)

        if user is None:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid JWT token!")

        timestamp = user.token.timestamp

        if timestamp > payload["createdAt"]:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid JWT token!")
        return user

    @classmethod
    async def get_user_by_id(cls, user_id: int, session: AsyncSession):
        sql_query = select(Users).filter(Users.id == user_id)

        result = await session.execute(sql_query)
        return result.scalars().first()

    @classmethod
    async def get_users_orm(cls, session: AsyncSession):
        sql_query = select(Users).order_by(Users.id)

        result = await session.execute(sql_query)
        return result.unique().scalars().all()

    @classmethod
    async def create_user_orm(cls, user_data: CreateUser, session: AsyncSession):
        try:
            user_data.password = cls.get_hash(user_data.password)
            new_user = Users(**user_data.model_dump())  # type: ignore[call-arg]
            token = UsersTokens(timestamp=datetime.now().timestamp())  # type: ignore[call-arg]
            new_user.token = token
            session.add(new_user)
            await session.commit()
        except Exception as error:
            logger.error(error)
            await session.close()
            raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Conflict user data!")

        await session.close()

        return cls.get_token(new_user.id)

    @classmethod
    async def sign_in_orm(cls, user_data: OAuth2PasswordRequestForm, session: AsyncSession):
        sql_query = select(Users).filter(Users.username.like(user_data.username))

        result = await session.execute(sql_query)
        user = result.scalars().first()
        await session.close()

        if user is None:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="User doesnt exist!")

        if user.password != cls.get_hash(user_data.password):
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid password!")

        return cls.get_token(user.id)

    @classmethod
    async def my_profile_orm(cls, token: str, session: AsyncSession):
        return await cls.auth(token, session)

    @classmethod
    async def update_profile_orm(cls, token: str, user_data: UpdateUser, session: AsyncSession):
        user = await cls.auth(token, session)

        if user_data.new_email is not None:
            user.email = user_data.new_email

        if user_data.new_username is not None:
            user.username = user_data.new_username

        try:
            await session.commit()
        except Exception as error:
            logger.error(error)
            raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Conflict user data!")

        await session.close()
        return user

    @classmethod
    async def update_password_orm(cls, token: str, password_data: UpdatePassword, session: AsyncSession):
        user = await cls.auth(token, session)

        if user.password != cls.get_hash(password_data.old_password):
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid password!")

        user.password = cls.get_hash(password_data.new_password)
        user.token.timestamp = datetime.now().timestamp()

        await session.commit()
        await session.close()

    @classmethod
    async def my_friends_orm(cls, token: str, session: AsyncSession):
        user = await cls.auth(token, session)

        return [friend.friend for friend in user.friends]

    @classmethod
    async def add_friend_orm(cls, token: str, friend_id: int, session: AsyncSession):
        friend = await cls.get_user_by_id(friend_id, session)

        if friend is None:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Friend not found!")

        user = await cls.auth(token, session)

        if user.id == friend_id:
            raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="User cannot add himself in friends!")

        friend = UsersFriends(friend=friend_id)  # type: ignore[call-arg]
        friends = {friend.friend for friend in user.friends}

        if friend.friend in friends:
            raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Friend already in user`s friends!")

        user.friends.add(friend)

        await session.commit()
        await session.close()

        return [friend.friend for friend in user.friends]

    @classmethod
    async def remove_friend_orm(cls, token: str, friend_id: int, session: AsyncSession):
        friend = await cls.get_user_by_id(friend_id, session)

        if friend is None:
            raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Friend not found!")

        user = await cls.auth(token, session)
        friend = UsersFriends(friend=friend_id)  # type: ignore[call-arg]
        friends = {friend.friend for friend in user.friends}

        if friend.friend not in friends:
            raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="No same user in user`s friends!")

        for friend in user.friends:
            if friend.friend == friend_id:
                await session.delete(friend)
                break

        await session.commit()
        await session.close()

Полный код проекта можно посмотреть здесь - https://github.com/Runneso/Registration_API.

Насчёт других ORM, помимо SQLAlchemy

Чаще всего нам приходится взаимодействовать с БД в процессе написания API. Почти все web фреймворки имеют свою встроенную ORM (речь о Flask и Django), в других случаях выбор падает на SQLAlchemy, ведь это проект, который заслужил своё уважение за долгое время. На данный момент сложно выделить более мощное асинхронное решение для взаимодействия с БД.

P.S. если важно скорость, то сырые запросы ваш выбор.

→ Ссылка