Как записать данные из Kafka в Clickhouse, если используется структура вложенного JSON?
У меня есть сообщение следующего формата, которое хранится в одном из топиков Kafka:
{"key": {
"agentId": "5584ca89-3dc6-4c62-9141-67d04cbfbe97",
"requestId": 32320,
"sessionId": 876665
},
"record": {
"trafficRequest": "DELETE FROM orders WHERE customer_name = 'Иванов Иван'",
"TimestampRequest": "1713455655850",
"requestGuid":"bf9c3d54-083b-4328-85c5-d4251a5ecf16",
"processInfo": "postgres: 15/main: user postgres 127.0.0.1(37398) idle",
"peer": "127.0.0.1:36322",
"sequenceNumberRequest": 0,
"requestType": "DML",
"statementDtos": [
{
"command": "DELETE",
"commandType": "Delete",
"tableDtos": [
{
"tableName": "orders",
"tableAlias": null
}
],
"attributeDtos": [
{
"attributeName": "customer_name",
"attributeAlias": null
}
]
}
],
"appliedPoliciesRequestId": [
"d0793d3b-0662-4ad7-81b2-33dd0043e747"
],
"responseGuid":"qf9c3d54-083b-4328-85c5-d4251a5ecf16",
"TimestampResponse": 1715931386761,
"sequenceNumberResponse": 0,
"trafficResponse": {
"columns": [
{
"name": "column1",
"type": "Text"
},
{
"name": "column2",
"type": "Text"
},
{
"name": "column3",
"type": "Text"
}
],
"completion": "SELECT 4",
"fields": [
{
"fieldCount": 3,
"field_Data": [
"11",
"22",
"33"
]
},
{
"fieldCount": 3,
"fieldData": [
"44",
"55",
"66"
]
},
{
"fieldCount": 3,
"fieldData": [
"0",
"0",
"0"
]
},
{
"fieldCount": 3,
"fieldData": ["77", "88", "99"]
}
],
"ready": true,
"Error": null
},
"peer": "127.0.0.1:37398 ",
"processInfo": "postgres: 15/main: user postgres 127.0.0.1(37398) idle",
"appliedPoliciesResponseId": [
"d0793d3b-0662-4ad7-81b2-33dd0043e747"]
}
}
Мне нужно вычитывать сообщения из этого топика и записывать данные в Clickhouse. Я создал следующий скрипт создания таблицы и материализованного представления для Clickhouse:
!/bin/bash
set -e
clickhouse client -n <<-EOSQL
CREATE TABLE IF NOT EXISTS ch_traffic (
agentId String,
requestId UInt32,
sessionId UInt32
)
ENGINE = MergeTree()
ORDER BY requestId;
CREATE TABLE IF NOT EXISTS traffic_kafka_readings (
key String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'dev-damdbf-broker:9094',
kafka_topic_list = 'C_PROCESSED_POLICIES_TRAFFIC',
kafka_group_name = 'CLICKHOUSE',
kafka_format = 'JSONAsString';
CREATE MATERIALIZED VIEW IF NOT EXISTS kafka_readings_mv TO ch_traffic AS
SELECT
JSONExtractString(key, 'agentId') AS agentId,
JSONExtractUInt(key, 'requestId') AS requestId,
JSONExtractUInt(key, 'sessionId') AS sessionId
FROM traffic_kafka_readings;
EOSQL
Проблема в том, что данный скрипт не работает. В Clickhouse создается строка, но данные там пустые:
Подскажите, как мне добиться нужного поведения?