не подключиться к apache kafka после добавления еще одного брокера
Запускаю брокеры кафка в docker . Через docker-compose создаю контейнеры с кафкой зукипером и другими сервисами взаимодействующими с кафкой. Каждый брокер кафка и зукипер в отдельном контейнере.
Web ui для kafka (akhq.io) видит оба брокера.
Но подключиться из сервиса на golang не получается. с одним брокером все работало. Использую библиотеку github.com/segmentio/kafka-go
Пытаюсь подключиться через Dial , но получаю panic
func listTopics() map[string]struct{} {
conn, err := kafka.Dial("tcp", conf.Config.APP.KafkaBrokerAddress)
if err != nil {
panic(err.Error())
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
return m
panic(err.Error())
}
for _, p := range partitions {
if _, exist := m[p.Topic]; exist == false {
m[p.Topic] = struct{}{}
}
}
return m
}
Если в метод Dial передаю kafka-0:29092,kafka-1:29072 либо kafka-0:9092,kafka-1:9072
Получаю panic: dial tcp: lookup kafka-0:29092,kafka-1:29072: Try again либо dial tcp: lookup kafka-0:9092,kafka-1:9072: Try again
Если в Dial передаю адрес одного сервиса с брокером кафка kafka-0:29092 либо kafka-0:9092
То panic : dial tcp: 127.0.0.1:9072 connection refused либо dial tcp: 127.0.0.1:9092 connection refused
Dial нельзя передавать несколько адресов брокеров? или в чем проблема?
docker-compose.yaml
version: '3'
volumes:
zookeeper-data-0:
driver: local
zookeeper-log-0:
driver: local
kafka-data-0:
driver: local
kafka-data-1:
driver: local
services:
akhq-panel: #akhq.io
image: tchiotludo/akhq
environment:
AKHQ_CONFIGURATION: |
micronaut:
security:
enabled: true
akhq:
security:
default-group: no-roles
basic-auth: #Password in sha256
- username: admin
password: password
groups: # Groups for the user
- admin
connections:
docker-kafka-server-1:
properties:
bootstrap.servers: "kafka-0:29092,kafka-1:29072"
schema-registry:
url: "http://schema-registry-0:8085"
ports:
- 8888:8080
depends_on:
- kafka-0
- kafka-1
- schema-registry-0
zookeeper:
image: confluentinc/cp-zookeeper
volumes:
- zookeeper-data-0:/var/lib/zookeeper/data:Z
- zookeeper-log-0:/var/lib/zookeeper/log:Z
environment:
ZOOKEEPER_CLIENT_PORT: '2181'
ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false'
kafka-0:
image: confluentinc/cp-kafka
volumes:
- kafka-data-0:/var/lib/kafka:Z
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: '0'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_NUM_PARTITIONS: '12'
KAFKA_COMPRESSION_TYPE: 'gzip'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1'
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
KAFKA_JMX_PORT: '9091'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
KAFKA_LISTENERS: 'INTERNAL://kafka-0:29092,EXTERNAL://kafka-0:9092'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka-0:29092,EXTERNAL://localhost:9092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
depends_on:
- zookeeper
kafka-1:
image: confluentinc/cp-kafka
volumes:
- kafka-data-1:/var/lib/kafka:Z
ports:
- 9072:9072
environment:
KAFKA_BROKER_ID: '1'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_NUM_PARTITIONS: '12'
KAFKA_COMPRESSION_TYPE: 'gzip'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1'
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
KAFKA_JMX_PORT: '9071'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
KAFKA_LISTENERS: 'INTERNAL://kafka-1:29072,EXTERNAL://kafka-1:9072'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka-1:29072,EXTERNAL://localhost:9072'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
depends_on:
- zookeeper
schema-registry-0:
image: confluentinc/cp-schema-registry
ports:
- 8085:8085
depends_on:
- kafka-0
- kafka-1
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka-0:29092'
SCHEMA_REGISTRY_HOST_NAME: 'schema-registry-0'
SCHEMA_REGISTRY_LISTENERS: 'http://schema-registry-0:8085'
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: 'INFO'
adapter: #kafka adapter
build:
context: .
dockerfile: ./adapter.dockerfile
environment:
- GO_ENV=dev
- GOPATH=/go
container_name: kafka-adapter
depends_on:
- kafka-0
- kafka-1version: '3'
volumes:
zookeeper-data-0:
driver: local
zookeeper-log-0:
driver: local
kafka-data-0:
driver: local
kafka-data-1:
driver: local
services:
akhq-panel: #akhq.io
image: tchiotludo/akhq
environment:
AKHQ_CONFIGURATION: |
micronaut:
security:
enabled: true
akhq:
security:
default-group: no-roles
basic-auth: #Password in sha256
- username: admin
password: password
groups: # Groups for the user
- admin
connections:
docker-kafka-server-1:
properties:
bootstrap.servers: "kafka-0:29092,kafka-1:29072"
schema-registry:
url: "http://schema-registry-0:8085"
ports:
- 8888:8080
depends_on:
- kafka-0
- kafka-1
- schema-registry-0
zookeeper:
image: confluentinc/cp-zookeeper
volumes:
- zookeeper-data-0:/var/lib/zookeeper/data:Z
- zookeeper-log-0:/var/lib/zookeeper/log:Z
environment:
ZOOKEEPER_CLIENT_PORT: '2181'
ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false'
kafka-0:
image: confluentinc/cp-kafka
volumes:
- kafka-data-0:/var/lib/kafka:Z
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: '0'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_NUM_PARTITIONS: '12'
KAFKA_COMPRESSION_TYPE: 'gzip'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1'
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
KAFKA_JMX_PORT: '9091'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
KAFKA_LISTENERS: 'INTERNAL://kafka-0:29092,EXTERNAL://kafka-0:9092'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka-0:29092,EXTERNAL://localhost:9092'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
depends_on:
- zookeeper
kafka-1:
image: confluentinc/cp-kafka
volumes:
- kafka-data-1:/var/lib/kafka:Z
ports:
- 9072:9072
environment:
KAFKA_BROKER_ID: '1'
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_NUM_PARTITIONS: '12'
KAFKA_COMPRESSION_TYPE: 'gzip'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1'
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1'
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
KAFKA_JMX_PORT: '9071'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer'
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
KAFKA_LISTENERS: 'INTERNAL://kafka-1:29072,EXTERNAL://kafka-1:9072'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://kafka-1:29072,EXTERNAL://localhost:9072'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_INTER_BROKER_LISTENER_NAME: 'INTERNAL'
depends_on:
- zookeeper
schema-registry-0:
image: confluentinc/cp-schema-registry
ports:
- 8085:8085
depends_on:
- kafka-0
- kafka-1
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka-0:29092'
SCHEMA_REGISTRY_HOST_NAME: 'schema-registry-0'
SCHEMA_REGISTRY_LISTENERS: 'http://schema-registry-0:8085'
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: 'INFO'
adapter: #kafka adapter
build:
context: .
dockerfile: ./adapter.dockerfile
environment:
- GO_ENV=dev
- GOPATH=/go
container_name: kafka-adapter
depends_on:
- kafka-0
- kafka-1