Как записать данные из Kafka в Clickhouse, если используется структура вложенного JSON?

У меня есть сообщение следующего формата, которое хранится в одном из топиков Kafka:

  {"key": {
    "agentId": "5584ca89-3dc6-4c62-9141-67d04cbfbe97",
    "requestId": 32320,
    "sessionId": 876665
  },
"record": {
    "trafficRequest": "DELETE FROM orders WHERE customer_name = 'Иванов Иван'",
     "TimestampRequest": "1713455655850",
     "requestGuid":"bf9c3d54-083b-4328-85c5-d4251a5ecf16",
    "processInfo": "postgres: 15/main: user postgres 127.0.0.1(37398) idle",
    "peer": "127.0.0.1:36322",
    "sequenceNumberRequest": 0, 
    "requestType": "DML",
    "statementDtos": [
      {
        "command": "DELETE",
        "commandType": "Delete",
        "tableDtos": [
          {
            "tableName": "orders",
            "tableAlias": null
          }
        ],
        "attributeDtos": [
          {
            "attributeName": "customer_name",
            "attributeAlias": null
          }
        ]
     }
    ],
    "appliedPoliciesRequestId": [
    "d0793d3b-0662-4ad7-81b2-33dd0043e747"
    ],
    "responseGuid":"qf9c3d54-083b-4328-85c5-d4251a5ecf16",
    "TimestampResponse": 1715931386761,
    "sequenceNumberResponse": 0,
    "trafficResponse": {
        "columns": [
            {
                "name": "column1",
                "type": "Text"
            },
            {
                "name": "column2",
                "type": "Text"
            },
            {
                "name": "column3",
                "type": "Text"
            }
        ],
   "completion": "SELECT 4",
        "fields": [
            {
                "fieldCount": 3,
                "field_Data": [
                    "11",
                    "22",
                    "33"
                ]
            },
            {
                "fieldCount": 3,
                "fieldData": [
                    "44",
                    "55",
                    "66"
                ]
            },
            {
                "fieldCount": 3,
                "fieldData": [
                    "0",
                    "0",
                    "0"
                ]
            },
{
                "fieldCount": 3,
                "fieldData": ["77", "88", "99"]
}
        ],
     "ready": true,
     "Error": null
      },
    "peer": "127.0.0.1:37398 ",
    "processInfo": "postgres: 15/main: user postgres 127.0.0.1(37398) idle",
     "appliedPoliciesResponseId": [
    "d0793d3b-0662-4ad7-81b2-33dd0043e747"]
  }
}

Мне нужно вычитывать сообщения из этого топика и записывать данные в Clickhouse. Я создал следующий скрипт создания таблицы и материализованного представления для Clickhouse:

!/bin/bash
set -e

clickhouse client -n <<-EOSQL
    CREATE TABLE IF NOT EXISTS ch_traffic (
        agentId String,
        requestId UInt32,
        sessionId UInt32
    )
    ENGINE = MergeTree()
    ORDER BY requestId;

    CREATE TABLE IF NOT EXISTS traffic_kafka_readings (
        key String
    ) ENGINE = Kafka
    SETTINGS kafka_broker_list = 'dev-damdbf-broker:9094',
             kafka_topic_list = 'C_PROCESSED_POLICIES_TRAFFIC',
             kafka_group_name = 'CLICKHOUSE',
             kafka_format = 'JSONAsString';

    CREATE MATERIALIZED VIEW IF NOT EXISTS kafka_readings_mv TO ch_traffic AS
    SELECT
        JSONExtractString(key, 'agentId') AS agentId,
        JSONExtractUInt(key, 'requestId') AS requestId,
        JSONExtractUInt(key, 'sessionId') AS sessionId
    FROM traffic_kafka_readings;
EOSQL

Проблема в том, что данный скрипт не работает. В Clickhouse создается строка, но данные там пустые:

введите сюда описание изображения

Подскажите, как мне добиться нужного поведения?


Ответы (0 шт):