Почему в очередь RabbitMQ не добавляется сообщение?
Код
package ru.rays.brokerSpring.client;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.web.client.RestTemplate;
import ru.rays.brokerSpring.broker.models.MessageModel;
@SpringBootApplication
@ComponentScan(value = "ru.rays.brokerSpring")
public class App {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(App.class, args);
RestTemplate restTemplate = new RestTemplate();
MessageModel messageModel = new MessageModel("сообщение", "123");
while (true) {
restTemplate.postForObject("http://localhost:8080/rabbit/send", messageModel, MessageModel.class);
Thread.sleep(5000);
}
}
}
package ru.rays.brokerSpring.broker.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost("cow.rmq2.cloudamqp.com");
cachingConnectionFactory.setUsername("pjprgaks");
cachingConnectionFactory.setPassword("QxmTpcidUt7yhge1jxHl7Ypu56ZVEjTe");
cachingConnectionFactory.setVirtualHost("pjprgaks");
cachingConnectionFactory.setPort(5672);
cachingConnectionFactory.setUri("amqp://pjprgaks:QxmTpcidUt7yhge1jxHl7Ypu56ZVEjTe@");
return cachingConnectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("EmailMessage");
}
@Bean
DirectExchange exchangeDirect() {
return new DirectExchange("amq.direct", true, false);
}
@Bean
FanoutExchange exchangeFamout() {
return new FanoutExchange("amq.fanout", true, false);
}
@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue)
.to(exchange)
.with("123");
}
}
package ru.rays.brokerSpring.broker.models;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class MessageModel {
private String message;
private String routingKey;
}
package ru.rays.brokerSpring.broker.controllers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import ru.rays.brokerSpring.broker.models.MessageModel;
import ru.rays.brokerSpring.broker.services.RabbitMQProducerService;
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
private final RabbitMQProducerService rabbitMQProducerService;
@Autowired
public RabbitController(RabbitMQProducerService rabbitMQProducerService) {
this.rabbitMQProducerService = rabbitMQProducerService;
}
@PostMapping("/send")
public void sendMessageToRabbit(@RequestBody MessageModel messageModel) {
rabbitMQProducerService.sendMessage(messageModel.getMessage(), messageModel.getRoutingKey());
}
@GetMapping("/check")
public String healthCheck() {
return "OK";
}
}
package ru.rays.brokerSpring.broker.controllers;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@EnableRabbit
public class RabbitMQConsumer {
@RabbitListener(queues = "EmailMessage")
public void processMyQueue(String message) {
System.out.printf("Received from myQueue : %s\n", message);
}
}
package ru.rays.brokerSpring.broker.services;
public interface RabbitMQProducerService {
void sendMessage(String message, String routingKey);
}
package ru.rays.brokerSpring.broker.services;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class RabbitMQProducerServiceImpl implements RabbitMQProducerService {
private final RabbitTemplate rabbitTemplate;
@Autowired
public RabbitMQProducerServiceImpl(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String message, String routingKey) {
rabbitTemplate.convertAndSend("amq.direct", routingKey, message);
}
}
В RabbitMQ показывается, что connection установлено.
График в Message rates идет
Но график в Queued messages стоит на месте.
Конфигурация Rabbit MQ.
Queues
All queues (2)
Overview Messages Message rates +/-
Virtual host
Name
Type
Features
State
Ready
Unacked
Total
incoming
deliver / get
ack
pjprgaks
EmailMessage
classic D default pjprgaks-max-length idle 0 0 0 0.00/s 0.00/s 0.00/s
pjprgaks
EmailMessage2
classic D default pjprgaks-max-length idle 0 0 0
Exchanges
Exchange: amq.direct
Overview
Message rates last minute
15:03:5015:04:0015:04:1015:04:2015:04:3015:04:400.0 /s1.0 /s
Publish (In)
0.00/s
Publish (Out)
0.00/s
Details
Type direct
Features
durable: true
Policy
Bindings
This exchange
⇓
To Routing key Arguments
EmailMessage
123
График внутри amq.direct тоже идет.
Ответы (1 шт):
Да моя собственная невнимательность ломало мне голову :)
package ru.rays.brokerSpring.broker.controllers;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@EnableRabbit
public class RabbitMQConsumer {
@RabbitListener(queues = "EmailMessage")
public void processMyQueue(String message) {
System.out.printf("Received from myQueue : %s\n", message);
}
}
Сообщение никогда не успеет дойти до очереди при прослушивании события