Spring Boot, WebSocket + RabbitMq архитектура проекта
Мы с другом хотим сделать проект, который был у нас на хакатоне – сервис, похожий на check-host.net. Мой стек – Spring Boot для бэкенда, RabbitMq для очередей, а его – React для фронтенда (Когда проект презентовали, в требованиях было использовать RabbitMq или Redis для очередей, а также веб-сокеты).
Я планирую написать основной бэкенд, а также агентов, которые могут располагаться в разных странах и будут выполнять необходимые проверки доменов (Ping, HTTP, Traceroute и т.д.). Когда основной бэкенд получает запрос, он записывает данные в очередь задач (по одной очереди на агента). Затем агенты считывают данные из своих очередей, выполняют различные запросы к доменам, записывают результаты в общую очередь результатов, которую бэкенд затем считывает и отправляет фронтенду через веб-сокеты (одна из целей – обновлять статус задачи агента в реальном времени).
Я решил использовать чистые веб-сокеты, а не STOMP или SockJS, поскольку нашёл информацию о том, что эти технологии устарели(поправьте меня, если я ошибаюсь).
Это должно выглядеть примерно так: клиент отправляет запрос к /api/check/http, указав домен в теле запроса, и получает ответ 202 вместе с UUID задачи, которая была создана и помещена в очередь задач. Затем клиент подключается к /ws/task/{taskId} и ожидает результатов этой задачи, которые поступают асинхронно.
Вот пример RabbitConfig основного бэкенда:
@Configuration
@EnableRabbit
public class RabbitConfig {
public static final String TASK_EXCHANGE = "tasks-exchange";
public static final String RESULT_EXCHANGE = "results-exchange";
public static final String RESULT_QUEUE = "results-queue";
public static final String RESULT_ROUTING_KEY = "results";
@Bean
public FanoutExchange taskExchange() {
return new FanoutExchange(TASK_EXCHANGE);
}
@Bean
public DirectExchange resultExchange() {
return new DirectExchange(RESULT_EXCHANGE);
}
@Bean
public Queue resultQueue() {
return new Queue(RESULT_QUEUE, true);
}
@Bean
public Binding resultBinding(Queue resultQueue, DirectExchange resultExchange) {
return BindingBuilder.bind(resultQueue)
.to(resultExchange)
.with(RESULT_ROUTING_KEY);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplateTask(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}
}
Сохранение задачи в очередь:
@Repository
@RequiredArgsConstructor
public class RabbitRepository {
private final RabbitTemplate rabbitTemplate;
public void save(Task task) {
try {
rabbitTemplate.convertAndSend(
RabbitConfig.TASK_EXCHANGE,
"",
task
);
System.out.println("Task published: " + task.getId());
} catch (Exception e) {
e.printStackTrace();
}
}
}
Также, RabbitConfig агента:
@Configuration
@EnableRabbit
public class RabbitConfig {
public static final String TASK_EXCHANGE = "tasks-exchange";
public static final String RESULT_EXCHANGE = "results-exchange";
public static final String RESULT_ROUTING_KEY = "results";
@Bean
public FanoutExchange taskExchange() {
return new FanoutExchange(TASK_EXCHANGE);
}
@Bean
public DirectExchange resultExchange() {
return new DirectExchange(RESULT_EXCHANGE);
}
@Bean
public Queue taskQueue() {
return new AnonymousQueue();
}
@Bean
public Binding taskBinding(Queue taskQueue, FanoutExchange taskExchange) {
return BindingBuilder.bind(taskQueue).to(taskExchange);
}
@Bean
public MessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
return converter;
}
}
Сохранение результата агента в очередь:
@Repository
@RequiredArgsConstructor
public class RabbitRepository {
private final RabbitTemplate rabbitTemplate;
public void sendResult(AbstractCheckResult result) {
try {
rabbitTemplate.convertAndSend(
RabbitConfig.RESULT_EXCHANGE,
RESULT_ROUTING_KEY,
result
);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Listener агента:
@Override
@RabbitListener(queues = "#{taskQueue.name}")
public void performCheck(Task task) {
System.out.println("taskId: " + task.getId() + ", url: " + task.getUrl() + ", type: " + task.getCheckType().toString());
try {
Thread.sleep(500);
rabbitService.sendResult(new IntermediateCheckResult(
task.getId(),
agent,
new HttpAgentResult(
TaskStatus.IN_PROGRESS
)
));
Instant start = Instant.now();
ResponseEntity<String> response = restTemplate.getForEntity(task.getUrl()).toString(), String.class);
rabbitService.sendResult(new HttpCheckResult(
task.getId(),
agent,
new HttpAgentResult(
response.getStatusCode().value(),
response.getHeaders().toSingleValueMap(),
Duration.between(start, Instant.now()).toMillis(),
null,
TaskStatus.SUCCESS
)
));
}
Listener основного бекенда:
@Service
@RequiredArgsConstructor
public class TaskResultListenerImpl {
private final TaskResultWebSocketHandler wsHandler;
private final ObjectMapper mapper;
@RabbitListener(queues = RabbitConfig.RESULT_QUEUE)
public void startListening(Map<String, Object> data) throws JsonProcessingException {
System.out.println(data);
String taskId = (String) data.get("id");
if (wsHandler.isClientConnected(taskId)) {
wsHandler.sendResultToClient(taskId, mapper.writeValueAsString(data));
} else {
System.out.printf("client for taskId %s not connected", taskId);
}
}
}
Проблема в том, что я не совсем понимаю, как интегрировать эту архитектуру с веб-сокетами. В моём случае основной listener на бекенде получает сообщения из очереди результатов и отправляет их в WS-сессию. Но что произойдёт, если WS-соединение ещё не установлено, а сообщение придёт? Оно не будет доставлено клиенту, поскольку ACK уже получен. Поэтому пока в качестве заглушки я написал Thread.sleep(500) в listener агента, чтобы гарантировать подключение клиента, и это работает. Но я не думаю, что это хорошее решение, поскольку у разных клиентов задержки будут разными. Скорее всего, моя архитектура неверна, и использование WS здесь избыточно. Мне интересно ваше мнение, я хочу знать, как подобные приложения должны разрабатываться.
Спасибо, буду рад любым ответам!