N+1 запрос в SerializerMethodField

view

    def get_queryset(self) -> QuerySet[Good]:
        ....
        qs = (
            Good.objects.values('brand_id', 'brand__name')
            .annotate(
                ....
            )
            .prefetch_related(Prefetch('history', StocksHistory.objects.filter(Q(**subquery_filter_args))))
            .order_by('-total_sales')
        )
        return qs

serializer

class ExtendedBrandSerializer(serializers.ModelSerializer):
    ...
    history = serializers.SerializerMethodField()

    class Meta:
        model = Good
        fields = (
            ...
            'history',
        )

    def get_history(self, good: dict) -> dict:
      ....

      return StocksHistorySerializer(
        StocksHistory.objects.extra(select={'day': 'date( snap_at )'})
        .values('day')
        .filter(history_filter_query)
        .annotate(
            ....
        ),
        many=True,
      ).data

Relation: StocksHistory (*) -> (1) Good.

В логах вижу N+1 запрос в SerializerMethodField. Как можно пофиксить?

Возможно, есть вариант вынести annotate с сериалайзера во вьюху?

Суть в том, что в ответе мне нужен ключ history, в котором будет список объектов "дочерних" элементов?

Возможно, есть у кого идеи, как это на RAW SQL переписать?

UPDATE:

На данный момент я сделал это с двумя запросами и вручную объединил 2 кверисета в нужную мне результат, но мне такое решение как-то совсем не нравится.

view

    def get_queryset(self) -> QuerySet[Good]:
        subquery_filter_args = {}
        history_filter_args = {}
        if all([key in self.request.query_params for key in ['history_from_date', 'history_to_date']]):
            from_date, to_date = get_history_date_range(request=self.request)
            subquery_filter_args.update({'snap_at__range': (from_date, to_date)})
            history_filter_args.update({'history__snap_at__range': (from_date, to_date)})
        history_filter_query = Q(**history_filter_args)

        qs = (
            Good.objects.values('brand_id', 'brand__name')
            .annotate(
                total_sales=Sum('history__sales', filter=history_filter_query),
                avg_sales_per_day=Avg('history__sales', filter=history_filter_query),
                total_revenue=Sum('history__revenue', filter=history_filter_query),
                avg_revenue_per_day=Avg('history__revenue', filter=history_filter_query),
                rating=Avg('history__rating', filter=history_filter_query),
                feedbacks=Sum('history__feedbacks', filter=history_filter_query),
                base_price=Avg('history__base_price', filter=history_filter_query),
                price=Avg('history__price', filter=history_filter_query),
                price_with_discount=Avg('history__price_with_discount', filter=history_filter_query),
                discount=Avg('history__discount', filter=history_filter_query),
                max_price=Max('history__price', filter=history_filter_query),
                min_price=Min('history__price', filter=history_filter_query),
                avg_price=Avg('history__price', filter=history_filter_query),
            )
            .prefetch_related(Prefetch('history', StocksHistory.objects.filter(Q(**subquery_filter_args))))
            .order_by('-total_sales')
        )
        return qs

    def list(self, request, *args, **kwargs):
        queryset = self.filter_queryset(self.get_queryset())
        page = self.paginate_queryset(queryset)
        if page is not None:
            serializer = self.get_serializer(page, many=True)
            data = serializer.data
            add_histories_to_top_response(request=self.request, data=data)
            response = self.get_paginated_response(data)
            return response

        serializer = self.get_serializer(queryset, many=True)
        data = serializer.data
        add_histories_to_top_response(request=self.request, data=data)
        return Response(data)

helper

def add_histories_to_top_response(request: Request, data: OrderedDict):
    history_filter_args = {'good__brand__in': tuple(item['brand_id'] for item in data)}
    if all([key in request.query_params for key in ['history_from_date', 'history_to_date']]):
        from_date, to_date = get_history_date_range()
        history_filter_args.update({'history__snap_at__range': (from_date, to_date)})
    history_filter_query = Q(**history_filter_args)

    histories = StocksHistorySerializer(
        StocksHistory.objects.extra(select={'day': 'date( snap_at )'})
        .values('day')
        .filter(history_filter_query)
        .annotate(
            sales=Sum('sales', filter=history_filter_query),
            feedbacks=Sum('feedbacks', filter=history_filter_query),
            rating=Avg('rating', filter=history_filter_query),
            base_price=Avg('base_price', filter=history_filter_query),
            price=Avg('price', filter=history_filter_query),
            price_with_discount=Avg('price_with_discount', filter=history_filter_query),
            revenue=Avg('revenue', filter=history_filter_query),
            stock_balances=Avg('stock_balances', filter=history_filter_query),
            snap_at=TruncDay('snap_at', filter=history_filter_query),
        )
        .values(
            'snap_at',
            'feedbacks',
            'rating',
            'price',
            'base_price',
            'price_with_discount',
            'sales',
            'revenue',
            'stock_balances',
            'good__brand_id',
        ),
        many=True,
    ).data

    set_histories(data, histories)


def set_histories(data: OrderedDict, histories: OrderedDict):
    for item in data:
        item['history'] = []
        for key, group in itertools.groupby(histories, lambda x: x['brand_id']):
            if item['brand_id'] == key:
                item['history'] = list(group)


def get_history_date_range(request: Request) -> tuple[datetime, datetime]:
    _date_format = '%Y-%m-%d'

    from_date = datetime.strptime(request.query_params['history_from_date'], _date_format).replace(tzinfo=utc)
    to_date = datetime.strptime(request.query_params['history_to_date'], _date_format).replace(tzinfo=utc)
    return from_date, to_date


Ответы (1 шт):

Автор решения: Za Ars

Второй вариант, который должен сработать, если не получается сделать одним запросом.

Указываете класс, определенный ниже, в filter_backends вашей вьюхи, он должен стоять в самом конце.

from rest_framework.filters import BaseFilterBackend


class HistoryFilterBackend(BaseFilterBackend):
    
    def filter_queryset(self, request, queryset, view):
        """
        Return a filtered queryset.
        """
        secondary_queryset = queryset.model.history.all().filter(id__in=queryset.values_list('id', flat=True))
        secondary_data_dict = {x.good_id: x for x in secondary_queryset}  # Под вопросом, требуются правки, информация
        
        new_queryset = []
        for x in queryset:
            # <param_name> - название поля, в котором будет храниться доп инфа
            if x.id in secondary_data_dict:
                setattr(x, '<param_name>', secondary_data_dict[x.id])  
            else:
                setattr(x, '<param_name>', None)
            new_queryset.append(x)
        
        return new_queryset


Правда вероятно, сломается пагинация, но думаю это легко решить, создав фейковый queryset. Пока ответить про qs не могу, надо подумать. Дополню при появлении доп информации

Тогда сериализатор должен будет выглядеть следущим образом


class ....(...):

    history = serializer.SerializerMethodField()
    def get_history(self, obj):
        """
        obj - это что поместили в <param_name>. Если поместили словарь - будет словарь.
        """
        # что то делаем
        return result  # любое словарь/значение/данные сериализатора и тд

→ Ссылка