Java не читается сообщение из топика Kafka
Здравствуйте я только начал изучение Kafka, возникла проблема при реализации тестового примера, записывать сообщение в топик получается, а вот прочитать нет, строка вывода сообщения выглядит следующим образом, но в логе терминала информация не выводится:
if (receivedText != null) {
logger.info(
"Message received ==> topic = {}, partition = {}, offset = {}, key = {}, value = {}",
record.topic(), record.partition(), record.offset(), record.key(), receivedText);
}
Полный исходник примера привожу ниже. Содержимое Producer.class:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDate;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class Producer {
final KafkaProducer<String, String> producerInformation;
final Logger logger = LoggerFactory.getLogger(Producer.class);
public Producer(String bootstrapServer) {
Properties props = producerProps(bootstrapServer);
producerInformation = new KafkaProducer<>(props);
logger.info("Producer was initialized");
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
String server = "127.0.0.1:9092";
String topic = "test_topic";
String message = "{\n" +
"\t\"FIO\":\"IVANOV IVAN IVANOVICH\",\n" +
"\t\"position1\":\"позиция1\"\n" +
"\t\"result1\":\"результат1\",\n" +
"\t\"codePosition\":\"1\",\n" +
"\t\"dateRegistration\":\"2022-08-12 09:14:46\"\n" +
"}";
Producer producer = new Producer(server);
producer.put(topic, "record1", message);
producer.put(topic, "record2", message);
producer.close();
}
private Properties producerProps(String bootstrapServer) {
String serializer = StringSerializer.class.getName();
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializer);
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);
return props;
}
private void put(String topic, String key, String value) throws ExecutionException, InterruptedException {
logger.info("put value: " + value + ", for key: " + key + " date: " + LocalDate.now());
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producerInformation.send(record, (recordMetadata, e) -> {
if (e != null) {
logger.error("Error while producing", e);
return;
}
logger.info("Received new meta information" +
" Topic: " + recordMetadata.topic() +
" Offset: " + recordMetadata.offset() +
" Timestamp: " + recordMetadata.timestamp());
}).get();
}
private void close() {
logger.info("Closing producer's connection");
producerInformation.close();
}
}
Класс Consumer.class:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
final KafkaConsumer<String, String> consumerInformation;
final Logger logger = LoggerFactory.getLogger(Consumer.class);
public Consumer(String bootstrapServer) {
Properties props = producerProps(bootstrapServer);
consumerInformation = new KafkaConsumer<>(props);
logger.info("Producer was initialized");
}
public static void main(String[] args) {
String server = "127.0.0.1:9092";
String topic = "test_topic";
Consumer consumer = new Consumer(server);
consumer.get(consumer, topic);
}
private Properties producerProps(String bootstrapServer) {
String serializer = StringDeserializer.class.getName();
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, serializer);
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serializer);
return props;
}
private void get(Consumer consumer, String topic) {
consumerInformation.subscribe(Collections.singletonList(topic));
try {
String receivedText = null;
while (!"exit".equalsIgnoreCase(receivedText)) {
ConsumerRecords<String, String> records = consumerInformation.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
receivedText = record.value();
if (receivedText != null) {
logger.info(
"Message received ==> topic = {}, partition = {}, offset = {}, key = {}, value = {}",
record.topic(), record.partition(), record.offset(), record.key(), receivedText);
}
}
}
} finally {
consumer.close();
}
}
private void close() {
logger.info("Closing consumer's connection");
consumerInformation.close();
}
}
Содержимое docker-compose.yml
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_RETENTION_MS=600000
- KAFKA_LOG_RETENTION_MS=600000
- KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=30000
depends_on:
- zookeeper
Прошу помощи в решении моей проблемы и совета, что делаю не правильно.