Почему не работает аннотация Qualifier при передаче бина в метод?
Собственно решаю задачу(в рамках самообразования) по написанию конфига по работе с кафкой, создал все необходимые бины(consumer) для прослушивания топика. Далее, мне потребовалось прослушать другой топик и я создаю уже другой слушатель. В конфиге оказалось два бина с одинаковым возвращаемым классом(а как известно спринг в первую очередь ищет бины по интерфейсу)
Вот такая ошибка:
- orderConsumerFactory: defined by method 'orderConsumerFactory' in class path resource [com/example/tacocloud/config/KafkaConfig.class]
- tacoConsumerFactory: defined by method 'tacoConsumerFactory' in class path resource [com/example/tacocloud/config/KafkaConfig.class]
То есть два фактори которые живут в контексте при старте приложения и когда спринг пытается понять что инжектить - не может разобраться
Гугл сказал использовать @Qualifier и проименовать бины чтобы спринг мог понимать какой именно бин нужно инжектить в метод, так как их два в контексте, эту задачу я решил так:
package com.example.tacocloud.config;
import com.example.tacocloud.model.jpa.Order;
import com.example.tacocloud.model.jpa.Taco;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
@EnableKafka
@EnableConfigurationProperties
public class KafkaConfig {
// Order consumer
@Bean("orderConsumerConfig")
@ConfigurationProperties("spring.kafka.consumer.order")
public Map<String, Object> orderConsumerConfig() {
return new HashMap<>();
}
@Bean("orderConsumerFactory")
public DefaultKafkaConsumerFactory orderConsumerFactory(@Qualifier("orderConsumerConfig")
Map<String, Object> orderConsumerConfig) {
return new DefaultKafkaConsumerFactory(orderConsumerConfig);
}
@Bean("orderMessageListener")
public MessageListener orderMessageListener() {
return o -> {
log.info("Order received from Kafka {}", o);
};
}
@Bean("orderKafkaMessageListenerContainer")
public KafkaMessageListenerContainer orderKafkaMessageListenerContainer(
@Qualifier("orderConsumerConfig") Map<String, Object> orderConsumerConfig,
@Qualifier("orderConsumerFactory") DefaultKafkaConsumerFactory orderConsumerFactory) {
var containerProperties = new ContainerProperties(new String[]{(String) orderConsumerConfig.get("topic")});
containerProperties.setMessageListener(orderMessageListener());
containerProperties.setGroupId("first");
return new KafkaMessageListenerContainer(orderConsumerFactory, containerProperties);
}
// Taco consumer
// listener container
@Bean("tacoConsumerConfig")
@ConfigurationProperties("spring.kafka.consumer.taco")
public Map<String, Object> tacoConsumerConfig() {
return new HashMap<>();
}
@Bean("tacoConsumerFactory")
public DefaultKafkaConsumerFactory tacoConsumerFactory(
@Qualifier("tacoProducerConfig") Map<String, Object> tacoProducerConfig) {
return new DefaultKafkaConsumerFactory<>(tacoProducerConfig);
}
@Bean("tacoConsumerListener")
public MessageListener tacoConsumerListener() {
return (MessageListener<String, Taco>) data -> log.info("Taco received from Kafka {}", data);
}
@Bean("tacoConsumerContainerListener")
public ConcurrentMessageListenerContainer tacoConsumerContainerListener(
@Qualifier("tacoProducerConfig") Map<String, Object> tacoProducerConfig,
@Qualifier("tacoConsumerFactory") DefaultKafkaConsumerFactory tacoConsumerFactory) {
var containerProperties = new ContainerProperties(new String[]{(String) tacoProducerConfig.get("topic")});
containerProperties.setGroupId("taco-id");
containerProperties.setMessageListener(tacoConsumerListener());
return new ConcurrentMessageListenerContainer<>(tacoConsumerFactory, containerProperties);
}
}
выполнил mvn clean package и перезапустил приложение. Это проблему не исправляет, ошибка та же и все выглядит так будто не работает @Qualifier
Так что пробовал указывать конкретное свойство в бинах @Bean(name="..")
Так же пробовал удалить имена бинов оставив только @Bean и добавив @Qualifier над каждым из методов, ошибка все так же:
- orderConsumerFactory: defined by method 'orderConsumerFactory' in class path resource [com/example/tacocloud/config/KafkaConfig.class]
- tacoConsumerFactory: defined by method 'tacoConsumerFactory' in class path resource [com/example/tacocloud/config/KafkaConfig.class]
Что можно еще попробовать сделать?