Как в Kafka Streams объединить сообщения с разным типом данных из разных топиков?
У меня есть такая задача: в один топик Kafka приходит запрос вида:
{
"agentId": "d04c115a-5b0e-442f-8753-03d67bd02335",
"trafficRequest": {
"request": [
{
"parse": "select 1,2,3",
"statement": ""
},
{
"bind": {
"portal": "",
"statement": "",
"values": [],
"result_format": []
}
},
{
"parse": " select 4,5,6",
"statement": ""
},
{
"bind": {
"portal": "",
"statement": "",
"values": [],
"result_format": []
}
},
{
"parse": " select 7",
"statement": ""
},
{
"bind": {
"portal": "",
"statement": "",
"values": [],
"result_format": []
}
}
],
"ready": true
},
"trafficTimestamp": 1723631897771,
"requestId": "0",
"sessionId": "228",
"sequenceNumber": "20",
"peer": "172.17.0.1:39174 ",
"processInfo": "postgres: postgres postgres 172.17.0.1(39174) idle"
}
для работы с сообщениями этого топика создал RawTrafficRequestDto:
private UUID agentId;
private TrafficRequestDataDto trafficRequest;
private Date trafficTimestamp;
private Integer requestId;
private Integer sequenceNumber;
private Integer sessionId;
private String processInfo;
private String peer;
private String query;
В другой топик поступает ответ на этот запрос вида:
{
"agentId": "d04c115a-5b0e-442f-8753-03d67bd02335",
"trafficTimestamp": 1723631897791,
"requestId": "0",
"sessionId": "228",
"sequenceNumber": "20",
"trafficResponse": {
"response": [
{
"column": [
{
"name": "?column?",
"type": "Text"
},
{
"name": "?column?",
"type": "Text"
},
{
"name": "?column?",
"type": "Text"
}
]
},
{
"field": {
"field_count": 3,
"field_data": [
"1",
"2",
"3"
]
}
},
{
"completion": "SELECT 1"
},
{
"column": [
{
"name": "?column?",
"type": "Text"
},
{
"name": "?column?",
"type": "Text"
},
{
"name": "?column?",
"type": "Text"
}
]
},
{
"field": {
"field_count": 3,
"field_data": [
"4",
"5",
"6"
]
}
},
{
"completion": "SELECT 1"
},
{
"column": [
{
"name": "?column?",
"type": "Text"
}
]
},
{
"field": {
"field_count": 1,
"field_data": [
"7"
]
}
},
{
"completion": "SELECT 1"
}
],
"ready": true
},
"peer": "",
"processInfo": "postgres: postgres postgres 172.17.0.1(39174) idle"
}
для работы с сообщениями этого топика создал RawTrafficResponseDto:
private UUID agentId;
private Date trafficTimestamp;
private Integer requestId;
private Integer sessionId;
private Integer sequenceNumber;
private String processInfo;
private String peer;
private TrafficResponseModelDto trafficResponseModelDto;
Мне нужно объединить запрос и ответ и отправить единое сообщение в другой топик. Ответ и запрос могут быть связаны по полям requestId
и sessionId
. Должно получиться сообщение вида:
{
"key": {
"agentId": "3f8d8a31-cf9f-464c-8cef-131181cfb314",
"requestId": 5,
"sessionId": "1943"
},
"record": {
"trafficRequest": "SELECT * FROM employees",
"timestampRequest": 1723711306347,
"requestGuid": "a1bdad2f-5a1e-449c-8176-b515f1b34ec4",
"processInfo": "postgres: damdbf damdbf_db 192.168.16.179(52834) idle",
"peer": "192.168.16.179:52834 ",
"sequenceNumberRequest": 25,
"statementDtos": [
{
"command": "SELECT",
"commandType": null,
"tableDtos": [
{
"tableName": "employees",
"tableAlias": null
}
],
"attributeDtos": [
{
"attributeName": "*",
"attributeAlias": null
}
]
}
],
"requestType": "DML",
"appliedPoliciesRequestId": null,
"responseGuid": "63d0140f-ed01-41b3-8ef6-48861280caa3",
"timestampResponse": 1723711306357,
"sequenceNumberResponse": 22,
"trafficResponse": {
"completion": "SELECT 5",
"ready": true,
"column": [
{
"name": "id",
"type": "Text"
},
{
"name": "first_name",
"type": "Text"
},
{
"name": "last_name",
"type": "Text"
},
{
"name": "position",
"type": "Text"
},
{
"name": "salary",
"type": "Text"
},
{
"name": "hire_date",
"type": "Text"
}
],
"field": {
"fieldCount": "6",
"fieldData": [
"5",
"Sarah",
"Davis",
"QA Engineer",
"58000.00",
"2022-08-05"
]
}
},
"appliedPoliciesResponseId": null
}
}
Я создал 2 pipeline (в одном обрабатываются запросы, в другом ответы). Выглядит код так:
private final List<RecordDto> recordData = new ArrayList<>();
@Autowired
void buildRequestPipeline(StreamsBuilder streamsBuilder) {
KStream<String, RawTrafficRequestDto> messageStream = streamsBuilder
.stream(KafkaConstants.TRAFFIC_REQUEST, Consumed.with(Serdes.String(), RAW_TRAFFIC_REQUEST_DTO_SERDE));
messageStream
.processValues(TrafficRequestDtoAddHeaderProcessor::new)
.filter((key, value) -> messageSendFilter(value))
.flatMapValues(value -> {
recordData.clear();
List<TrafficRequestDto> trafficRequestDtos = requestHandler(value);
List<RecordDto> combinedData = new ArrayList<>();
for (TrafficRequestDto trafficRequestDto : trafficRequestDtos) {
RecordDto recordDto = new RecordDto(
trafficRequestDto.getTrafficRequest(),
trafficRequestDto.getTimestamp(),
UUID.randomUUID(),
trafficRequestDto.getProcessInfo(),
trafficRequestDto.getPeer(),
trafficRequestDto.getSequenceNumber(),
trafficRequestDto.getQueryType(),
trafficRequestDto.getStatementDtos(),
trafficRequestDto.getAppliedPolicyIds()
);
recordData.add(recordDto);
combinedData.add(recordDto);
}
return combinedData;
});
}
@Autowired
void buildResponsePipeline(StreamsBuilder streamsBuilder) {
KStream<String, RawTrafficResponseDto> messageStream = streamsBuilder
.stream(KafkaConstants.TRAFFIC_RESPONSE, Consumed.with(Serdes.String(), RAW_TRAFFIC_RESPONSE_SERDE));
messageStream
.processValues(TrafficResponseDtoAddHeaderProcessor::new)
.flatMapValues(this::processResponse)
.filter(((key, value) -> value != null))
.to(KafkaConstants.PROCESSED_POLICIES_TRAFFIC, Produced.valueSerde(PROCESSED_TRAFFIC_DTO_SERDE));
}
Этот код работает и выполняет нужную задачу, но не оптимально. Для хранения промежуточных результатов (списка с RecordDto) я использую List, но читал, что в Kafka Streams для этого лучше будет использовать KTable или применить объединение KStreams. Реализовал вот так:
@Autowired
void buildJoinedPipeline(StreamsBuilder streamsBuilder) {
KStream<String, RawTrafficRequestDto> requestStream = streamsBuilder
.stream(KafkaConstants.TRAFFIC_REQUEST, Consumed.with(Serdes.String(), RAW_TRAFFIC_REQUEST_DTO_SERDE));
KStream<String, RecordDto> stringRecordDtoKStream = requestStream
.processValues(TrafficRequestDtoAddHeaderProcessor::new)
.filter((key, value) -> messageSendFilter(value))
.flatMapValues(value -> {
List<TrafficRequestDto> trafficRequestDtos = requestHandler(value);
List<RecordDto> combinedData = new ArrayList<>();
for (TrafficRequestDto trafficRequestDto : trafficRequestDtos) {
RecordDto recordDto = new RecordDto(
trafficRequestDto.getTrafficRequest(),
trafficRequestDto.getTimestamp(),
UUID.randomUUID(),
trafficRequestDto.getProcessInfo(),
trafficRequestDto.getPeer(),
trafficRequestDto.getSequenceNumber(),
trafficRequestDto.getQueryType(),
trafficRequestDto.getStatementDtos(),
trafficRequestDto.getAppliedPolicyIds()
);
combinedData.add(recordDto);
}
return combinedData;
});
KStream<String, RawTrafficResponseDto> responseStream = streamsBuilder
.stream(KafkaConstants.TRAFFIC_RESPONSE, Consumed.with(Serdes.String(), RAW_TRAFFIC_RESPONSE_DTO_SERDE));
KStream<String, TrafficResponseDto> stringTrafficResponseDtoKStream = responseStream
.processValues(TrafficResponseDtoAddHeaderProcessor::new)
.mapValues((readOnlyKey, value) -> trafficResponseMapper.toTrafficResponse(value));
KTable<String, RecordDto> requestsTable = stringRecordDtoKStream
.filter(((key, value) -> key != null && value != null))
.groupByKey()
.reduce((oldValue, newValue) -> newValue);
KStream<String, ProcessedTrafficDto> processedTrafficStream = stringTrafficResponseDtoKStream
.join(requestsTable,
(response, request) -> {
ProcessedTrafficDto processedTrafficDto = new ProcessedTrafficDto();
KeyDto keyDto = new KeyDto(response.getAgentId(), response.getRequestId(), response.getSessionId());
processedTrafficDto.setKey(keyDto);
RecordDto recordDto = new RecordDto(
request.getTrafficRequest(),
request.getTimestampRequest(),
request.getRequestGuid(),
request.getProcessInfo(),
request.getPeer(),
request.getSequenceNumberRequest(),
request.getQueryType(),
request.getStatementDtos(),
request.getAppliedPoliciesRequestId(),
UUID.randomUUID(),
response.getTrafficTimestamp(),
response.getSequenceNumber(),
response.getResponseItemDto(),
response.getAppliedPolicyIds()
);
processedTrafficDto.setRecord(recordDto);
return processedTrafficDto;
}
);
processedTrafficStream.to(KafkaConstants.PROCESSED_POLICIES_TRAFFIC);
}
но теперь в новый топик вообще никакие сообщения не уходят, а я получаю ошибку:
2024-08-22 13:17:58.261 WARN [collector] 7076 --- [collector] [-StreamThread-1] o.a.k.s.kstream.internals.KStreamReduce : Skipping record due to null key or value. topic=[I_TRAFFIC_REQUEST] partition=[0] offset=[39]
2024-08-22 13:17:58.261 WARN [collector] 7076 --- [collector] [-StreamThread-1] o.a.k.s.kstream.internals.KStreamReduce : Skipping record due to null key or value. topic=[I_TRAFFIC_REQUEST] partition=[0] offset=[39]
2024-08-22 13:17:58.261 WARN [collector] 7076 --- [collector] [-StreamThread-1] o.a.k.s.kstream.internals.KStreamReduce : Skipping record due to null key or value. topic=[I_TRAFFIC_REQUEST] partition=[0] offset=[39]
2024-08-22 13:18:17.118 WARN [collector] 7076 --- [collector] [-StreamThread-1] o.a.k.s.k.internals.KStreamKTableJoin : Skipping record due to null join key or value. topic=[I_TRAFFIC_RESPONSE] partition=[0] offset=[20]
Что я делаю не так? С Kafka Streams знаком только первую неделю, поэтому пока сложно понять, что делаю не так.