Flask, sqlalchemy, как сделать фильтрацию
как написать фильтрацию для данных api во flask? К примеру по id пользователя (может быть несколько пользователей).
Данные для фильтрации будут приходить с фронтенда,в проекте задействован Flask и Sqlalchemy. Моделей во flaske нет, данные приходят с базы данных на mysql, подключаюсь через sqlalchemy.
models.py
engine = create_engine(f"mysql+pymysql://{user}:{password}@{host}:{port}/{name_base}")
Base = declarative_base(engine)
metadata = Base.metadata
Session = sessionmaker(bind=engine)
_session = Session()
class Users(Base):
__tablename__ = 'users'
__table_args__ = {'autoload':True}
class Projects(Base):
__tablename__ = 'projects'
__table_args__ = {'autoload':True}
class ProjectUser(Base):
__tablename__ = 'project_user'
__table_args__ = {'autoload':True}
class Tasks(Base):
__tablename__ = 'tasks'
__table_args__ = {'autoload':True}
class TaskUser(Base):
__tablename__ = 'task_user'
__table_args__ = {'autoload':True}
class Personal_access_tokens(Base):
__tablename__ = 'personal_access_tokens'
__table_args__ = {'autoload': True}
class Works(Base):
__tablename__ = 'works'
__table_args__ = {'autoload':True}
main.py
@app.route('/reports/', methods=['GET'])
def get_users():
users = _session.query(Users).all()
_res = dict()
_counter = 0
for myuser in [x.id for x in users]:
user = _session.query(Users).get(myuser)
times = sum([x.time for x in _session.query(Works).filter(Works.user_id == myuser)])
ocenka = _session.query(Tasks).join(TaskUser).filter(TaskUser.user_id == myuser, Tasks.id == TaskUser.task_id).all()
temp_vr = [x.estimate_worker for x in ocenka]
ocenka_vremeni = []
for x in temp_vr:
if x != None:
ocenka_vremeni.append(x)
ocenka_vremeni = sum(ocenka_vremeni)
# Оценка сметы
temp = [x.estimate_cost for x in ocenka]
# Сумма по смете
ocenka_smeti = []
for x in temp:
if x != None:
ocenka_smeti.append(x)
ocenka_smeti = sum(ocenka_smeti)
# время на баги
bug_time = sum([x.time for x in _session.query(Works).join(Tasks).filter(Tasks.task_type_id == 1, Works.user_id == myuser)])
try:
effectivity_ocenka = ocenka_vremeni/times * 100
effectivity_smeta = ocenka_smeti/times * 100
except ZeroDivisionError:
effectivity_ocenka = 'no data'
effectivity_smeta = 'no data'
if _res:
_res.update({_counter: {'имя': str(user.name), 'фамилия': str(user.surname),
'затрачено часов': dict(hour=(str(times//60)), minute=str(times%60)),
'часов по оценке': dict(hour=(str(ocenka_vremeni//60)), minute=str(ocenka_vremeni%60)),
'часов по смете': str(dict(hour=(str(ocenka_smeti//60)), minute=str(ocenka_smeti%60))),
'часов по багам': str(dict(hour=(str(bug_time//60)), minute=str(bug_time%60))),
'эффективность по оценке': str(effectivity_ocenka),
'эффективность по смете': str(effectivity_smeta)}})
else:
_res.update({_counter: {'имя': str(user.name), 'фамилия': str(user.surname),
'затрачено часов': dict(hour=(str(times // 60)), minute=str(times % 60)),
'часов по оценке': dict(hour=(str(ocenka_vremeni // 60)), minute=str(ocenka_vremeni % 60)),
'часов по смете': str(dict(hour=(str(ocenka_smeti // 60)), minute=str(ocenka_smeti % 60))),
'часов по багам': str(dict(hour=(str(bug_time // 60)), minute=str(bug_time % 60))),
'эффективность по оценке': str(effectivity_ocenka),
'эффективность по смете': str(effectivity_smeta)}})
_counter += 1
return jsonify(_res)