кафка консумер перечитывает сообщения, по которым уже закоммичен offset
Недавно работаю с кафкой. Запускаю консумера. прочитаны все сообщения в топике к которому он подключается . Т.е. offset равен количеству сообщений.
Консумер в консоль пишет что нет сообщений для чтения: no messages received from kafka within the allocated time for partition 0 of topic_name at offset 3
Но в бесконечном цикле, который запускается в консумере , все равно перечитывает все сообщения топика с самого начала.
Это нормально? как этого не допускать?
с кафкой работаю на golang. Использую пакет github.com/segmentio/kafka-go
...
func consume(ctx context.Context) {
r := getKafkaReader(brokerAddress, "json-requests")
// в бесконечном цикле ждем сообщения в топике кафка
for {
msg, err := r.ReadMessage(ctx)
if err != nil {
panic("consume.could not read message " + err.Error())
}
fmt.Println("consume.received: ", string(msg.Value))
}
}
// Создаем читателя
func getKafkaReader(kafkaUrl, topicName string) *kafka.Reader {
l := log.New(os.Stdout, "consume.kafka reader: ", 0)
brokers := strings.Split(kafkaUrl, ",")
return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
Partition: 0,
Topic: topicName,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
StartOffset: kafka.LastOffset,
Logger: l,
})
}
...
Ответы (1 шт):
Автор решения: Андрей Алексеев
→ Ссылка
Вопрос решился добавлением GroupID Без указания группы консюмера , игнорируется опция StartOffset