Java не читается сообщение из топика Kafka

Здравствуйте я только начал изучение Kafka, возникла проблема при реализации тестового примера, записывать сообщение в топик получается, а вот прочитать нет, строка вывода сообщения выглядит следующим образом, но в логе терминала информация не выводится:

if (receivedText != null) {
                        logger.info(
                                "Message received ==> topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                                record.topic(), record.partition(), record.offset(), record.key(), receivedText);
                    }

Полный исходник примера привожу ниже. Содержимое Producer.class:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDate;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Producer {
    final KafkaProducer<String, String> producerInformation;
    final Logger logger = LoggerFactory.getLogger(Producer.class);

    public Producer(String bootstrapServer) {
        Properties props = producerProps(bootstrapServer);
        producerInformation = new KafkaProducer<>(props);

        logger.info("Producer was initialized");
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String server = "127.0.0.1:9092";
        String topic = "test_topic";
        String message = "{\n" +
                "\t\"FIO\":\"IVANOV IVAN IVANOVICH\",\n" +
                "\t\"position1\":\"позиция1\"\n" +
                "\t\"result1\":\"результат1\",\n" +
                "\t\"codePosition\":\"1\",\n" +
                "\t\"dateRegistration\":\"2022-08-12 09:14:46\"\n" +
                "}";

        Producer producer = new Producer(server);
        producer.put(topic, "record1", message);
        producer.put(topic, "record2", message);
        producer.close();
    }

    private Properties producerProps(String bootstrapServer) {
        String serializer = StringSerializer.class.getName();
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializer);
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);
        return props;
    }

    private void put(String topic, String key, String value) throws ExecutionException, InterruptedException {
        logger.info("put value: " + value + ", for key: " + key + " date: " + LocalDate.now());

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        producerInformation.send(record, (recordMetadata, e) -> {
            if (e != null) {
                logger.error("Error while producing", e);
                return;
            }

            logger.info("Received new meta information" +
                    " Topic: " + recordMetadata.topic() +
                    " Offset: " + recordMetadata.offset() +
                    " Timestamp: " + recordMetadata.timestamp());
        }).get();
    }

    private void close() {
        logger.info("Closing producer's connection");
        producerInformation.close();
    }
}

Класс Consumer.class:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
    final KafkaConsumer<String, String> consumerInformation;
    final Logger logger = LoggerFactory.getLogger(Consumer.class);

    public Consumer(String bootstrapServer) {
        Properties props = producerProps(bootstrapServer);
        consumerInformation = new KafkaConsumer<>(props);

        logger.info("Producer was initialized");
    }

    public static void main(String[] args) {
        String server = "127.0.0.1:9092";
        String topic = "test_topic";

        Consumer consumer = new Consumer(server);
        consumer.get(consumer, topic);
    }

    private Properties producerProps(String bootstrapServer) {
        String serializer = StringDeserializer.class.getName();
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, serializer);
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serializer);
        return props;
    }

    private void get(Consumer consumer, String topic) {
        consumerInformation.subscribe(Collections.singletonList(topic));
        try {
            String receivedText = null;
            while (!"exit".equalsIgnoreCase(receivedText)) {
                ConsumerRecords<String, String> records = consumerInformation.poll(Duration.ofMillis(100));

                for (ConsumerRecord<String, String> record : records) {
                    receivedText = record.value();
                    if (receivedText != null) {
                        logger.info(
                                "Message received ==> topic = {}, partition = {}, offset = {}, key = {}, value = {}",
                                record.topic(), record.partition(), record.offset(), record.key(), receivedText);
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }


    private void close() {
        logger.info("Closing consumer's connection");
        consumerInformation.close();
    }
}

Содержимое docker-compose.yml

version: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_RETENTION_MS=600000
      - KAFKA_LOG_RETENTION_MS=600000
      - KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=30000
    depends_on:
      - zookeeper

Прошу помощи в решении моей проблемы и совета, что делаю не правильно.


Ответы (0 шт):