Как в Kafka Streams объединить сообщения с разным типом данных из разных топиков?

У меня есть такая задача: в один топик Kafka приходит запрос вида:

{
    "agentId": "d04c115a-5b0e-442f-8753-03d67bd02335",
    "trafficRequest": {
        "request": [
            {
                "parse": "select 1,2,3",
                "statement": ""
            },
            {
                "bind": {
                    "portal": "",
                    "statement": "",
                    "values": [],
                    "result_format": []
                }
            },
            {
                "parse": " select 4,5,6",
                "statement": ""
            },
            {
                "bind": {
                    "portal": "",
                    "statement": "",
                    "values": [],
                    "result_format": []
                }
            },
            {
                "parse": " select 7",
                "statement": ""
            },
            {
                "bind": {
                    "portal": "",
                    "statement": "",
                    "values": [],
                    "result_format": []
                }
            }
        ],
        "ready": true
    },
    "trafficTimestamp": 1723631897771,
    "requestId": "0",
    "sessionId": "228",
    "sequenceNumber": "20",
    "peer": "172.17.0.1:39174 ",
    "processInfo": "postgres: postgres postgres 172.17.0.1(39174) idle"
}

для работы с сообщениями этого топика создал RawTrafficRequestDto:

    private UUID agentId;
    private TrafficRequestDataDto trafficRequest;
    private Date trafficTimestamp;
    private Integer requestId;
    private Integer sequenceNumber;
    private Integer sessionId;
    private String processInfo;
    private String peer;
    private String query;

В другой топик поступает ответ на этот запрос вида:

{
    "agentId": "d04c115a-5b0e-442f-8753-03d67bd02335",
    "trafficTimestamp": 1723631897791,
    "requestId": "0",
    "sessionId": "228",
    "sequenceNumber": "20",
    "trafficResponse": {
        "response": [
            {
                "column": [
                    {
                        "name": "?column?",
                        "type": "Text"
                    },
                    {
                        "name": "?column?",
                        "type": "Text"
                    },
                    {
                        "name": "?column?",
                        "type": "Text"
                    }
                ]
            },
            {
                "field": {
                    "field_count": 3,
                    "field_data": [
                        "1",
                        "2",
                        "3"
                    ]
                }
            },
            {
                "completion": "SELECT 1"
            },
            {
                "column": [
                    {
                        "name": "?column?",
                        "type": "Text"
                    },
                    {
                        "name": "?column?",
                        "type": "Text"
                    },
                    {
                        "name": "?column?",
                        "type": "Text"
                    }
                ]
            },
            {
                "field": {
                    "field_count": 3,
                    "field_data": [
                        "4",
                        "5",
                        "6"
                    ]
                }
            },
            {
                "completion": "SELECT 1"
            },
            {
                "column": [
                    {
                        "name": "?column?",
                        "type": "Text"
                    }
                ]
            },
            {
                "field": {
                    "field_count": 1,
                    "field_data": [
                        "7"
                    ]
                }
            },
            {
                "completion": "SELECT 1"
            }
        ],
        "ready": true
    },
    "peer": "",
    "processInfo": "postgres: postgres postgres 172.17.0.1(39174) idle"
}

для работы с сообщениями этого топика создал RawTrafficResponseDto:

    private UUID agentId;
    private Date trafficTimestamp;
    private Integer requestId;
    private Integer sessionId;
    private Integer sequenceNumber;
    private String processInfo;
    private String peer;
    private TrafficResponseModelDto trafficResponseModelDto;

Мне нужно объединить запрос и ответ и отправить единое сообщение в другой топик. Ответ и запрос могут быть связаны по полям requestId и sessionId. Должно получиться сообщение вида:

{
    "key": {
        "agentId": "3f8d8a31-cf9f-464c-8cef-131181cfb314",
        "requestId": 5,
        "sessionId": "1943"
    },
    "record": {
        "trafficRequest": "SELECT * FROM employees",
        "timestampRequest": 1723711306347,
        "requestGuid": "a1bdad2f-5a1e-449c-8176-b515f1b34ec4",
        "processInfo": "postgres: damdbf damdbf_db 192.168.16.179(52834) idle",
        "peer": "192.168.16.179:52834 ",
        "sequenceNumberRequest": 25,
        "statementDtos": [
            {
                "command": "SELECT",
                "commandType": null,
                "tableDtos": [
                    {
                        "tableName": "employees",
                        "tableAlias": null
                    }
                ],
                "attributeDtos": [
                    {
                        "attributeName": "*",
                        "attributeAlias": null
                    }
                ]
            }
        ],
        "requestType": "DML",
        "appliedPoliciesRequestId": null,
        "responseGuid": "63d0140f-ed01-41b3-8ef6-48861280caa3",
        "timestampResponse": 1723711306357,
        "sequenceNumberResponse": 22,
        "trafficResponse": {
            "completion": "SELECT 5",
            "ready": true,
            "column": [
                {
                    "name": "id",
                    "type": "Text"
                },
                {
                    "name": "first_name",
                    "type": "Text"
                },
                {
                    "name": "last_name",
                    "type": "Text"
                },
                {
                    "name": "position",
                    "type": "Text"
                },
                {
                    "name": "salary",
                    "type": "Text"
                },
                {
                    "name": "hire_date",
                    "type": "Text"
                }
            ],
            "field": {
                "fieldCount": "6",
                "fieldData": [
                    "5",
                    "Sarah",
                    "Davis",
                    "QA Engineer",
                    "58000.00",
                    "2022-08-05"
                ]
            }
        },
        "appliedPoliciesResponseId": null
    }
}

Я создал 2 pipeline (в одном обрабатываются запросы, в другом ответы). Выглядит код так:

private final List<RecordDto> recordData = new ArrayList<>();

    @Autowired
    void buildRequestPipeline(StreamsBuilder streamsBuilder) {
        KStream<String, RawTrafficRequestDto> messageStream = streamsBuilder
                .stream(KafkaConstants.TRAFFIC_REQUEST, Consumed.with(Serdes.String(), RAW_TRAFFIC_REQUEST_DTO_SERDE));

        messageStream
                .processValues(TrafficRequestDtoAddHeaderProcessor::new)
                .filter((key, value) -> messageSendFilter(value))
                .flatMapValues(value -> {
                    recordData.clear();
                    List<TrafficRequestDto> trafficRequestDtos = requestHandler(value);
                    List<RecordDto> combinedData = new ArrayList<>();

                    for (TrafficRequestDto trafficRequestDto : trafficRequestDtos) {
                        RecordDto recordDto = new RecordDto(
                                trafficRequestDto.getTrafficRequest(),
                                trafficRequestDto.getTimestamp(),
                                UUID.randomUUID(),
                                trafficRequestDto.getProcessInfo(),
                                trafficRequestDto.getPeer(),
                                trafficRequestDto.getSequenceNumber(),
                                trafficRequestDto.getQueryType(),
                                trafficRequestDto.getStatementDtos(),
                                trafficRequestDto.getAppliedPolicyIds()
                        );
                        recordData.add(recordDto);
                        combinedData.add(recordDto);
                    }
                    return combinedData;
                });
    }

    @Autowired
    void buildResponsePipeline(StreamsBuilder streamsBuilder) {
        KStream<String, RawTrafficResponseDto> messageStream = streamsBuilder
                .stream(KafkaConstants.TRAFFIC_RESPONSE, Consumed.with(Serdes.String(), RAW_TRAFFIC_RESPONSE_SERDE));

        messageStream
                .processValues(TrafficResponseDtoAddHeaderProcessor::new)
                .flatMapValues(this::processResponse)
                .filter(((key, value) -> value != null))
                .to(KafkaConstants.PROCESSED_POLICIES_TRAFFIC, Produced.valueSerde(PROCESSED_TRAFFIC_DTO_SERDE));
    }

Этот код работает и выполняет нужную задачу, но не оптимально. Для хранения промежуточных результатов (списка с RecordDto) я использую List, но читал, что в Kafka Streams для этого лучше будет использовать KTable или применить объединение KStreams. Реализовал вот так:

@Autowired
    void buildJoinedPipeline(StreamsBuilder streamsBuilder) {
        KStream<String, RawTrafficRequestDto> requestStream = streamsBuilder
                .stream(KafkaConstants.TRAFFIC_REQUEST, Consumed.with(Serdes.String(), RAW_TRAFFIC_REQUEST_DTO_SERDE));

        KStream<String, RecordDto> stringRecordDtoKStream = requestStream
                .processValues(TrafficRequestDtoAddHeaderProcessor::new)
                .filter((key, value) -> messageSendFilter(value))
                .flatMapValues(value -> {
                    List<TrafficRequestDto> trafficRequestDtos = requestHandler(value);
                    List<RecordDto> combinedData = new ArrayList<>();

                    for (TrafficRequestDto trafficRequestDto : trafficRequestDtos) {
                        RecordDto recordDto = new RecordDto(
                                trafficRequestDto.getTrafficRequest(),
                                trafficRequestDto.getTimestamp(),
                                UUID.randomUUID(),
                                trafficRequestDto.getProcessInfo(),
                                trafficRequestDto.getPeer(),
                                trafficRequestDto.getSequenceNumber(),
                                trafficRequestDto.getQueryType(),
                                trafficRequestDto.getStatementDtos(),
                                trafficRequestDto.getAppliedPolicyIds()
                        );
                        combinedData.add(recordDto);
                    }
                    return combinedData;
                });

        KStream<String, RawTrafficResponseDto> responseStream = streamsBuilder
                .stream(KafkaConstants.TRAFFIC_RESPONSE, Consumed.with(Serdes.String(), RAW_TRAFFIC_RESPONSE_DTO_SERDE));

        KStream<String, TrafficResponseDto> stringTrafficResponseDtoKStream = responseStream
                .processValues(TrafficResponseDtoAddHeaderProcessor::new)
                .mapValues((readOnlyKey, value) -> trafficResponseMapper.toTrafficResponse(value));

        KTable<String, RecordDto> requestsTable  = stringRecordDtoKStream
                .filter(((key, value) -> key != null && value != null))
                .groupByKey()
                .reduce((oldValue, newValue) -> newValue);

        KStream<String, ProcessedTrafficDto> processedTrafficStream = stringTrafficResponseDtoKStream
                .join(requestsTable,
                        (response, request) -> {
                            ProcessedTrafficDto processedTrafficDto = new ProcessedTrafficDto();
                            KeyDto keyDto = new KeyDto(response.getAgentId(), response.getRequestId(), response.getSessionId());
                            processedTrafficDto.setKey(keyDto);

                            RecordDto recordDto = new RecordDto(
                            request.getTrafficRequest(),
                            request.getTimestampRequest(),
                            request.getRequestGuid(),
                            request.getProcessInfo(),
                            request.getPeer(),
                            request.getSequenceNumberRequest(),
                            request.getQueryType(),
                            request.getStatementDtos(),
                            request.getAppliedPoliciesRequestId(),
                            UUID.randomUUID(),
                            response.getTrafficTimestamp(),
                            response.getSequenceNumber(),
                            response.getResponseItemDto(),
                            response.getAppliedPolicyIds()
                            );

                            processedTrafficDto.setRecord(recordDto);

                            return processedTrafficDto;
                        }
                );

        processedTrafficStream.to(KafkaConstants.PROCESSED_POLICIES_TRAFFIC);
    }

но теперь в новый топик вообще никакие сообщения не уходят, а я получаю ошибку:

2024-08-22 13:17:58.261  WARN [collector] 7076 --- [collector] [-StreamThread-1] o.a.k.s.kstream.internals.KStreamReduce  : Skipping record due to null key or value. topic=[I_TRAFFIC_REQUEST] partition=[0] offset=[39]
2024-08-22 13:17:58.261  WARN [collector] 7076 --- [collector] [-StreamThread-1] o.a.k.s.kstream.internals.KStreamReduce  : Skipping record due to null key or value. topic=[I_TRAFFIC_REQUEST] partition=[0] offset=[39]
2024-08-22 13:17:58.261  WARN [collector] 7076 --- [collector] [-StreamThread-1] o.a.k.s.kstream.internals.KStreamReduce  : Skipping record due to null key or value. topic=[I_TRAFFIC_REQUEST] partition=[0] offset=[39]
2024-08-22 13:18:17.118  WARN [collector] 7076 --- [collector] [-StreamThread-1] o.a.k.s.k.internals.KStreamKTableJoin    : Skipping record due to null join key or value. topic=[I_TRAFFIC_RESPONSE] partition=[0] offset=[20]

Что я делаю не так? С Kafka Streams знаком только первую неделю, поэтому пока сложно понять, что делаю не так.


Ответы (0 шт):