Почему в очередь RabbitMQ не добавляется сообщение?

Код

package ru.rays.brokerSpring.client;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.web.client.RestTemplate;
import ru.rays.brokerSpring.broker.models.MessageModel;

@SpringBootApplication
@ComponentScan(value = "ru.rays.brokerSpring")
public class App {
    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(App.class, args);
        RestTemplate restTemplate = new RestTemplate();
        MessageModel messageModel = new MessageModel("сообщение", "123");
        while (true) {
            restTemplate.postForObject("http://localhost:8080/rabbit/send", messageModel, MessageModel.class);
            Thread.sleep(5000);
        }
    }
}
package ru.rays.brokerSpring.broker.config;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost("cow.rmq2.cloudamqp.com");
        cachingConnectionFactory.setUsername("pjprgaks");
        cachingConnectionFactory.setPassword("QxmTpcidUt7yhge1jxHl7Ypu56ZVEjTe");
        cachingConnectionFactory.setVirtualHost("pjprgaks");
        cachingConnectionFactory.setPort(5672);
        cachingConnectionFactory.setUri("amqp://pjprgaks:QxmTpcidUt7yhge1jxHl7Ypu56ZVEjTe@");
        return cachingConnectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    public Queue myQueue() {
        return new Queue("EmailMessage");
    }

    @Bean
    DirectExchange exchangeDirect() {
        return new DirectExchange("amq.direct", true, false);
    }

    @Bean
    FanoutExchange exchangeFamout() {
        return new FanoutExchange("amq.fanout", true, false);
    }

    @Bean
    Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue)
                .to(exchange)
                .with("123");
    }
}
package ru.rays.brokerSpring.broker.models;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


@AllArgsConstructor
@NoArgsConstructor
@Data
public class MessageModel {
    private String message;
    private String routingKey;
}
package ru.rays.brokerSpring.broker.controllers;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import ru.rays.brokerSpring.broker.models.MessageModel;
import ru.rays.brokerSpring.broker.services.RabbitMQProducerService;

@RestController
@RequestMapping("/rabbit")
public class RabbitController {

    private final RabbitMQProducerService rabbitMQProducerService;

    @Autowired
    public RabbitController(RabbitMQProducerService rabbitMQProducerService) {
        this.rabbitMQProducerService = rabbitMQProducerService;
    }

    @PostMapping("/send")
    public void sendMessageToRabbit(@RequestBody MessageModel messageModel) {
        rabbitMQProducerService.sendMessage(messageModel.getMessage(), messageModel.getRoutingKey());
    }

    @GetMapping("/check")
    public String healthCheck() {
        return "OK";
    }
}
package ru.rays.brokerSpring.broker.controllers;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@EnableRabbit
public class RabbitMQConsumer {
    @RabbitListener(queues = "EmailMessage")
    public void processMyQueue(String message) {
        System.out.printf("Received from myQueue : %s\n",  message);
    }
}
package ru.rays.brokerSpring.broker.services;

public interface RabbitMQProducerService {
    void sendMessage(String message, String routingKey);
}
package ru.rays.brokerSpring.broker.services;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RabbitMQProducerServiceImpl implements RabbitMQProducerService {

    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMQProducerServiceImpl(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendMessage(String message, String routingKey) {
        rabbitTemplate.convertAndSend("amq.direct", routingKey, message);
    }

}

В RabbitMQ показывается, что connection установлено. График в Message rates идет Но график в Queued messages стоит на месте. Конфигурация Rabbit MQ. Queues All queues (2) Overview Messages Message rates +/- Virtual host Name Type Features
State Ready Unacked Total incoming deliver / get ack pjprgaks
EmailMessage classic D default pjprgaks-max-length idle 0 0 0 0.00/s 0.00/s 0.00/s pjprgaks
EmailMessage2 classic D default pjprgaks-max-length idle 0 0 0

Exchanges Exchange: amq.direct Overview Message rates last minute 15:03:5015:04:0015:04:1015:04:2015:04:3015:04:400.0 /s1.0 /s Publish (In) 0.00/s Publish (Out) 0.00/s Details Type direct Features
durable: true Policy
Bindings This exchange

To Routing key Arguments
EmailMessage 123

График внутри amq.direct тоже идет.


Ответы (1 шт):

Автор решения: Raysand

Да моя собственная невнимательность ломало мне голову :)

package ru.rays.brokerSpring.broker.controllers;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@EnableRabbit
public class RabbitMQConsumer {
    @RabbitListener(queues = "EmailMessage")
    public void processMyQueue(String message) {
        System.out.printf("Received from myQueue : %s\n",  message);
    }
}

Сообщение никогда не успеет дойти до очереди при прослушивании события

→ Ссылка