sqlalchemy + aiogram

Всем привет!

from sqlalchemy import ForeignKey, String, BigInteger
from sqlalchemy.orm import Mapped, mapped_column, DeclarativeBase, relationship
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine, AsyncResult

engine = create_async_engine(url='sqlite+aiosqlite:///db.sqlite3',
                             echo=True)
    
async_session = async_sessionmaker(engine)


class Base(AsyncAttrs, DeclarativeBase):
    pass


class User(Base):
    __tablename__ = 'users'
    
    id: Mapped[int] = mapped_column(primary_key=True)
    tg_id = mapped_column(BigInteger)
    tg_username = mapped_column(String)

from aiogram import Router, F
from aiogram.types import Message
from aiogram.filters import Filter, CommandObject, Command


admin = Router()

ADMINS = [5041307451, 67890]

class AdminProtect(Filter):
    async def __call__(self, message: Message):
        return message.from_user.id

@admin.message(AdminProtect(), Command('kick'))
async def apanel(message: Message, command: CommandObject):
        if not command.args:
            await message.answer('Аргументы не переданы')
            return
        else:
            value1, value2 = command.args.split(' ', maxsplit=1)
            print(value1, value2)
            if not message.from_user.id in ADMINS:
                await message.answer('Не админ')
                return
            else:
                 async with async_session() as session:
                    people = await session. (User). (User.tg_username == '@' + value1).all()
                    await message.answer(f'@{message.from_user.username} кикнул {people.tg_id}. Причина: {value2}')
                    await message.bot.ban_chat_member(chat_id=message.chat.id, user_id={people.tg_id})
                        # await message.answer(f'@{message.from_user.username} кикнул @{message.reply_to_message.from_user.username}')
                        # await message.bot.ban_chat_member(chat_id=message.chat.id, user_id=message.reply_to_message.from_user.id)



async def async_main():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

Вот код, мне нужно из бд вытащить tg_id, зная tg_username, подскажите как и через что это сделать? Понял то что неасинхроное можно сделать через .filter. Но тут столкнулся с проблемой, буду рад готовому коду и объяснениям!

Заранее спасибо каждому!


Ответы (1 шт):

Автор решения: edoxa1
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

async def get_tg_id(session: AsyncSession, username: str) -> int | None:
    select_statement = (
        select(User.tg_id)
        .where(User.tg_username == username)
    )

    result = session.execute(select_statement)
    await session.commit()
    # возвращаем tg_id если существует, иначе None
    return result.scalar_one_or_none()

Внутри блока async with вызываешь вышеописанную функцию:

@admin.message(AdminProtect(), Command('kick'))
async def apanel(message: Message, command: CommandObject):
    # ... много кода позади...
    if ...:
         ...
    else:
        if ...:
        else:
            async with async_session() as session:
                tg_id = await get_tg_id(session, message.from_user.username)
                if not tg_id:
                    print('не удалось забанить')
                # баним tg_id
→ Ссылка