Spring Boot + RabbitMQ: how to convert received object using convertSendAndReceive() method?

Как мне распарсить сообщение, используя convertSendAndReceive() метод? Вываливается NullPointerException из-за невозможности найти нужный для дессериализации класс в другом пакете. Пакеты сверху в коде обозначил.

Слушатель принял и отправил сообщение нормально

package org.dneversky.user;

@EnableRabbit
@Component
public class TestListener {

  private static final Logger logger = LoggerFactory.getLogger(TestListener.class);

  @Autowired
  private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = RabbitMQConfig.RECEIVE_QUEUE)
    public UserReplyMessage doGet(UserReplyMessage message) {
        logger.info("Received message: {}", message);
        UserReplyMessage response = new UserReplyMessage();
        logger.info("Sending message: {}", response);
        return response;
    }
}

Конфигурация слушателя

package org.dneversky.user.config;

@Configuration
public class RabbitMQConfig {

  public static final String RECEIVE_QUEUE = "rpc_queue";
  public static final String REPLY_QUEUE = "reply_queue";
  public static final String RPC_EXCHANGE = "rpc_exchange";

  @Bean
  public TopicExchange rpcExchange() {
    return new TopicExchange(RPC_EXCHANGE);
  }

  @Bean
  public Queue receiveQueue() {
    return new Queue(RECEIVE_QUEUE);
  }

  @Bean
  public Queue replyQueue() {
    return new Queue(REPLY_QUEUE);
  }

  @Bean
  public Binding receiveBinding() {
    return BindingBuilder.bind(receiveQueue()).to(rpcExchange()).with(RECEIVE_QUEUE);
  }

  @Bean
  public MessageConverter jsonMessageConverter() {
    return new Jackson2JsonMessageConverter();
  }
}

Отправитель отправляет нормально, но не может распарсить возвращаемое сообщение (логи)

package org.dneversky.gateway.servie.impl;

@Service
public class UserServiceImpl {

  private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);

  @Autowired
  private RabbitTemplate rabbitTemplate;

  public UserPrincipal getUserByUsername(String username) {
    UserResponse message = new UserResponse(username);
    logger.info("Sending created message: {}", message);
    UserResponse result = (UserResponse) rabbitTemplate.convertSendAndReceive(RabbitMQConfig.RPC_EXCHANGE, RabbitMQConfig.RPC_QUEUE, message);
    logger.info("Getting response: {}", result);

    return null;
  }
}

Конфигурация отправителя

package org.dneversky.gateway.config;

@Configuration
public class RabbitMQConfig {

  public static final String RPC_QUEUE = "rpc_queue";
  public static final String REPLY_QUEUE = "reply_queue";
  public static final String RPC_EXCHANGE = "rpc_exchange";

  @Bean
  public Queue rpcQueue() {
    return new Queue(RPC_QUEUE);
  }

  @Bean
  public Queue replyQueue() {
    return new Queue(REPLY_QUEUE);
  }

  @Bean
  public TopicExchange rpcExchange() {
    return new TopicExchange(RPC_EXCHANGE);
  }

  @Bean
  public Binding binding() {
    return BindingBuilder.bind(replyQueue()).to(rpcExchange()).with(REPLY_QUEUE);
  }

  @Bean
  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setExchange(RPC_EXCHANGE);
    rabbitTemplate.setReplyAddress(REPLY_QUEUE);
    rabbitTemplate.setReplyTimeout(6000);
    rabbitTemplate.setMessageConverter(messageConverter());

    return rabbitTemplate;
  }

  @Bean
  public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
  }

  @Bean
  public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(REPLY_QUEUE);
    container.setMessageListener(rabbitTemplate(connectionFactory));
    return container;
  }
}

Лог ошибки

2022-05-22 17:12:31.344 ERROR 16920 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.amqp.support.converter.MessageConversionException: failed to resolve class name. Class not found [org.dneversky.user.model.UserReplyMessage]] with root cause

java.lang.ClassNotFoundException: org.dneversky.user.model.UserReplyMessage

Как оно вообще работает. Допустим с помощью rabbitTemplate на отправителе я шлю сообщение получателю, а как получатель возвращает ответ обратно, с помощью дефолтного AmqpTemplate?


Ответы (1 шт):

Автор решения: Митяй

Решение нашел здесь: https://reflectoring.io/amqp-request-response/

  • Добавил модель UserRequest
  • Изменил тип обменника на DirectExchange
  • Избавился от переопределения RabbitTemplate
  • Теперь используется лишь одна очередь для отправки сообщения
  • Заменил convertSendAndReceive() на convertSendAndReceiveAsType()

Теперь мой код имеет следующую структуру:

Отправитель

@Service
public class UserServiceImpl {

    private static final Logger logger = LoggerFactory.getLogger(UserServiceImpl.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public UserPrincipal getUserByUsername(String username) {
        UserRequest request = new UserRequest(username);
        logger.info("Sending created message: {}", request);
        UserResponse response = rabbitTemplate.convertSendAndReceiveAsType(
                RabbitMQConfig.RPC_EXCHANGE, RabbitMQConfig.ROUTING_KEY, request,
                new ParameterizedTypeReference<UserResponse>() {});
        logger.info("Getting response: {}", response);

        return null;
    }
}

Конфигурация отправителя

@Configuration
public class RabbitMQConfig {

    public static final String RPC_EXCHANGE = "rpc_exchange";
    public static final String ROUTING_KEY = "rpc_key";

    @Bean
    public DirectExchange rpcExchange() {
        return new DirectExchange(RPC_EXCHANGE);
    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

Получатель

@Component
public class TestListener {

    private static final Logger logger = LoggerFactory.getLogger(TestListener.class);

    @RabbitListener(queues = RabbitMQConfig.RECEIVE_QUEUE)
    public UserReplyMessage doGet(UserReplyMessage message) {
        logger.info("Received message: {}", message);
        UserReplyMessage response = new UserReplyMessage();
        logger.info("Sending message: {}", response);
        return response;
    }
}

Конфигурация получателя

@Configuration
public class RabbitMQConfig {

    public static final String RECEIVE_QUEUE = "rpc_queue";
    public static final String RPC_EXCHANGE = "rpc_exchange";
    public static final String ROUTING_KEY = "rpc_key";

    @Bean
    public DirectExchange rpcExchange() {
        return new DirectExchange(RPC_EXCHANGE);
    }

    @Bean
    public Queue receiveQueue() {
        return new Queue(RECEIVE_QUEUE);
    }

    @Bean
    public Binding receiveBinding() {
        return BindingBuilder.bind(receiveQueue()).to(rpcExchange()).with(ROUTING_KEY);
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

Когда применяется метод convertSendAndReceiveAsType() Spring AMQP берет на себя обязанности по конфигурации доставки/принятия сообщений, автоматически создает callback очередь (с каким-то названием) и генерирует correlation ID для отправляемого сообщения.

На сервере важно указать биндингу routing_key иначе он не будет слушать очередь. Когда приходит сообщение, у него берутся correlationId и название callback очереди, которые используются в ответе.

Также необходимо настроить MessageConverter в конфигурации обеих сторон, который позволить осуществлять конвертацию тела сообщений.

→ Ссылка