SQLAlchemy: Проблема с сохранением всех изменений из сессии в БД

Всем привет.

Есть такой метод:

async with uow:
    logger.info(f"Deleting level {level_id} for project {project_id} started")

    # Check if current_user is the project owner
    logger.info(f"Check if current_user is the project owner started")
    project = await projects_service.get_project_details_by_project_id(
        uow,
        email,
        project_id,
    )
    logger.info(f"Check if current_user is the project owner finished")

    logger.info(f"uow.users_repository.find_one started")
    user = await uow.users_repository.find_one(email=email)
    if not user:
        raise HTTPException(status_code=404, detail="Access denied")
    logger.info(f"uow.users_repository.find_one finished")

    logger.info(f"project.owner_id != user.id check started")
    if project.owner_id != user.id:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Only project owner can delete levels from the project",
        )
    logger.info(f"project.owner_id != user.id check finished")

    # Check if the level exists in the project
    logger.info(f"Check if the level exists in the project started")
    level = await uow.projectlevel_repository.find_one(id=level_id, project_id=project_id)
    if not level:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="Level not found in the project",
        )
    logger.info(f"Check if the level exists in the project finished")

    # First set to NULL for all users with this level
    logger.info(f"Setting NULL for all users with level {level_id} in project {project_id}")
    updated_count = await uow.projectusers_repository.update_many_set_level_null(project_id, level_id)
    logger.info(f"Updated {updated_count} users")

    # Check the level before deletion
    logger.info(f"Checking level {level_id} before deletion")
    level_before = await uow.projectlevel_repository.find_one(id=level_id)
    logger.info(f"Level before deletion: {level_before}")

    # Delete the level
    logger.info(f"Deleting the level {level_id}")
    deleted_level = await uow.projectlevel_repository.delete_one(level_id)
    logger.info(f"Deletion result: {deleted_level}")

    # Immediately check if the level still exists
    logger.info(f"Checking if level {level_id} still exists immediately after deletion")
    check_level_immediate = await uow.projectlevel_repository.find_one(id=level_id)
    logger.info(f"Level immediately after deletion: {check_level_immediate}")

    # Flush the session to ensure changes are visible
    logger.info("Flushing the session")
    await uow.flush()

    # Clear session cache
    await uow.expunge_all()

    # Check again after flush
    logger.info(f"Checking if level {level_id} still exists after flush")
    check_level_after_flush = await uow.projectlevel_repository.find_one(id=level_id)
    logger.info(f"Level after flush: {check_level_after_flush}")

    if check_level_after_flush:
        logger.error(f"Level {level_id} still exists in the database after deletion attempt and flush")
        raise Exception(f"Failed to delete level {level_id}")
    else:
        logger.info(f"Level {level_id} successfully deleted and not found in the database after flush")

    # We get all project users to recalculate levels
    logger.info(f"Getting all project users to recalculate levels")
    project_users, _ = await uow.projectusers_repository.find_all(project_id=project_id, get_all=True)
    logger.info(f"Found {len(project_users)} users for level recalculation")

    # Recalculate levels for all users in the project
    logger.info(f"Recalculating levels for all users in the project")
    for project_user in project_users:
        new_level_id = await calculate_new_level(uow, project_user)
        if new_level_id != project_user.level_id:
            await uow.projectusers_repository.update_one(
                project_user.id,
                {"level_id": new_level_id},
            )
    logger.info(f"Levels recalculated for all users")

    logger.info("Committing changes")
    await uow.commit()
    logger.info("Changes committed successfully")

    # Clear session cache again
    await uow.expunge_all()

    # Final check after commit
    logger.info(f"Final check: Checking if level {level_id} still exists after commit")
    final_check_level = await uow.projectlevel_repository.find_one(id=level_id)
    logger.info(f"Level after commit: {final_check_level}")

    if final_check_level:
        logger.error(f"Level {level_id} still exists in the database after commit")
        raise Exception(f"Failed to delete level {level_id} even after commit")
    else:
        logger.info(f"Level {level_id} successfully deleted and not found in the database after commit")

    return {"message": "Project level successfully deleted and user levels recalculated"}

Проблема в том, что почему-то не хочет удаляться уровень:

Вот такой лог:

backend-1  | INFO | July 9, 2024 > 13:52:18 | Level after flush: None
backend-1  | INFO | July 9, 2024 > 13:52:18 | Level 12 successfully deleted and not found in the database after flush

backend-1  | INFO | July 9, 2024 > 13:52:18 | Getting all project users to recalculate levels

backend-1  | INFO | July 9, 2024 > 13:52:18 | Found 2 users for level recalculation

backend-1  | INFO | July 9, 2024 > 13:52:18 | Recalculating levels for all users in the project
backend-1  | INFO | July 9, 2024 > 13:52:18 | Starting level calculation for ProjectUser id=18

backend-1  | INFO | July 9, 2024 > 13:52:18 | Total score for ProjectUser id=18 is 0.00



backend-1  | 2024-07-09 13:52:18,086 INFO sqlalchemy.engine.Engine COMMIT
backend-1  | INFO | July 9, 2024 > 13:52:18 | Found 2 levels for project id=18 and role id=None

backend-1  | INFO | July 9, 2024 > 13:52:18 | ProjectUser id=18 assigned to minimum level_id=11 with level score=50.0

backend-1  | INFO | July 9, 2024 > 13:52:18 | Starting level calculation for ProjectUser id=26


backend-1  | INFO | July 9, 2024 > 13:52:18 | Total score for ProjectUser id=26 is 0.00



backend-1  | INFO | July 9, 2024 > 13:52:18 | Found 2 levels for project id=18 and role id=26

backend-1  | INFO | July 9, 2024 > 13:52:18 | ProjectUser id=26 assigned to minimum level_id=11 with level score=50.0
backend-1  | INFO | July 9, 2024 > 13:52:18 | Levels recalculated for all users

backend-1  | INFO | July 9, 2024 > 13:52:18 | Committing changes

backend-1  | INFO | July 9, 2024 > 13:52:18 | Changes committed successfully

backend-1  | INFO | July 9, 2024 > 13:52:18 | Final check: Checking if level 12 still exists after commit



backend-1  | INFO | July 9, 2024 > 13:52:18 | Level after commit: id=12 created_at=datetime.datetime(2024, 7, 8, 14, 59, 16, 505576) updated_at=datetime.datetime(2024, 7, 8, 14, 59, 16, 505578) project_id=18 role_id=26 name='L6 (Middle+)' score=60.0 salary=3000.0
backend-1  | ERROR | July 9, 2024 > 13:52:18 | Level 12 still exists in the database after commit
backend-1  | 2024-07-09 13:52:18,100 INFO sqlalchemy.engine.Engine ROLLBACK
backend-1  | ERROR | July 9, 2024 > 13:52:18 | Unexpected error: Failed to delete level 12 even after commit
backend-1  | INFO:     192.168.65.1:33388 - "DELETE /api/v1/account/projectlevels/18/12 HTTP/1.1" 500 Internal Server Error

То есть посредине кода, где вызывается удаление, оно вроде помечается в сессии как удаление, но когда происходит коммит, то ничего не происходит.

Вот такой метод, который, собственно, и делает удаление:

async def delete_one(self, entity_id: int) -> bool:
    stmt = delete(self.model).where(self.model.id == entity_id)
    result = await self.session.execute(stmt)
    deleted_count = cast(int, result.rowcount)
    logger.info(f"Deleted {deleted_count} records for level_id {entity_id}")
    return deleted_count > 0

Вот UnitOfWork

from abc import ABC, abstractmethod
from types import TracebackType

from backend.db.db import async_session_maker
from backend.repositories.project_user_skill_evaluation import SkillEvaluationRepository
from backend.repositories.project_user_skills_history import (
    ProjectUserSkillsHistoryRepository,
)
from backend.repositories.project_users import ProjectUsersRepository
from backend.repositories.projectlevels import ProjectLevelRepository
from backend.repositories.projectroles import ProjectRoleRepository
from backend.repositories.projects import ProjectsRepository
from backend.repositories.projectskills import ProjectSkillsRepository
from backend.repositories.projectskillsgroup import ProjectSkillsGroupRepository
from backend.repositories.tasks import TasksRepository
from backend.repositories.users import UsersRepository
from backend.utils.span_decorator import trace_span


class IUnitOfWork(ABC):
    users_repository: UsersRepository
    tasks_repository: TasksRepository
    projects_repository: ProjectsRepository
    projectskillsgroup_repository: ProjectSkillsGroupRepository
    projectskills_repository: ProjectSkillsRepository
    projectrole_repository: ProjectRoleRepository
    projectlevel_repository: ProjectLevelRepository
    projectusers_repository: ProjectUsersRepository
    skillevaluation_repository: SkillEvaluationRepository
    projectuserskillhistory_repository: ProjectUserSkillsHistoryRepository

    @abstractmethod
    def __init__(self) -> None:
        pass

    @abstractmethod
    async def __aenter__(self) -> "IUnitOfWork":
        pass

    @abstractmethod
    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        pass

    @abstractmethod
    async def commit(self) -> None:
        pass

    @abstractmethod
    async def flush(self) -> None:
        pass

    @abstractmethod
    async def rollback(self) -> None:
        pass

    @abstractmethod
    async def expunge_all(self) -> None:
        pass


class UnitOfWork(IUnitOfWork):
    def __init__(self) -> None:
        self.session_factory = async_session_maker

    @trace_span("UnitOfWork: Start")
    async def __aenter__(self) -> "UnitOfWork":
        self.session = self.session_factory()
        self.users_repository = UsersRepository(self.session)
        self.tasks_repository = TasksRepository(self.session)
        self.projects_repository = ProjectsRepository(self.session)
        self.projectskillsgroup_repository = ProjectSkillsGroupRepository(self.session)
        self.projectskills_repository = ProjectSkillsRepository(self.session)
        self.projectrole_repository = ProjectRoleRepository(self.session)
        self.projectlevel_repository = ProjectLevelRepository(self.session)
        self.projectusers_repository = ProjectUsersRepository(self.session)
        self.skillevaluation_repository = SkillEvaluationRepository(self.session)
        self.projectuserskillhistory_repository = ProjectUserSkillsHistoryRepository(
            self.session,
        )
        return self

    @trace_span("UnitOfWork: Exit")
    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        if exc_type:
            await self.rollback()
        else:
            await self.commit()
        await self.session.close()

    @trace_span("UnitOfWork: Commit")
    async def commit(self) -> None:
        await self.session.commit()

    @trace_span("UnitOfWork: Flush")
    async def flush(self) -> None:
        await self.session.flush()

    @trace_span("UnitOfWork: Rollback")
    async def rollback(self) -> None:
        await self.session.rollback()

    @trace_span("UnitOfWork: Expunge All")
    async def expunge_all(self) -> None:
        self.session.expunge_all()

На всякий случай еще модель уровней:

from typing import Any

from sqlalchemy import Float, ForeignKey, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship

from backend.db.db import Base
from backend.models.common import IDMixin, TimestampMixin
from backend.schemas.app.project_levels import ProjectLevelsReadSchema


class ProjectLevels(Base, IDMixin, TimestampMixin):
    __tablename__ = "project_levels"
    project_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey("projects.id"),
        nullable=False,
    )
    role_id: Mapped[int] = mapped_column(
        Integer,
        ForeignKey("project_roles.id"),
        nullable=False,
    )
    name: Mapped[str] = mapped_column(String, nullable=False)
    score: Mapped[float | None] = mapped_column(Float, nullable=True)
    salary: Mapped[float | None] = mapped_column(Float, nullable=True)

    project = relationship("Project", back_populates="levels")
    role = relationship("ProjectRole", back_populates="levels")
    project_users = relationship("ProjectUser", back_populates="level")

    def to_read_model(
        self,
    ) -> ProjectLevelsReadSchema:
        return ProjectLevelsReadSchema(
            id=self.id,
            project_id=self.project_id,
            role_id=self.role_id,
            name=self.name,
            score=self.score,
            salary=self.salary,
            created_at=self.created_at,
            updated_at=self.updated_at,
        )

    def to_dict(self) -> dict[str, Any]:
        return {
            "id": self.id,
            "project_id": self.project_id,
            "role_id": self.role_id,
            "name": self.name,
            "score": self.score,
            "salary": self.salary,
            "created_at": self.created_at,
            "updated_at": self.updated_at,
        }

Ответы (1 шт):

Автор решения: Viktor Andriichuk

Решил тем, что вынес пересчет уровней за пределы сессии текущего контекстного менеджера, когда с одной стороны данные в сессии изменились с точки зрения проставления уровня пользователя в NULL, но в функции перерасчете уровня они еще старые.

→ Ссылка