Spring Boot, WebSocket + RabbitMq архитектура проекта

Мы с другом хотим сделать проект, который был у нас на хакатоне – сервис, похожий на check-host.net. Мой стек – Spring Boot для бэкенда, RabbitMq для очередей, а его – React для фронтенда (Когда проект презентовали, в требованиях было использовать RabbitMq или Redis для очередей, а также веб-сокеты).

Я планирую написать основной бэкенд, а также агентов, которые могут располагаться в разных странах и будут выполнять необходимые проверки доменов (Ping, HTTP, Traceroute и т.д.). Когда основной бэкенд получает запрос, он записывает данные в очередь задач (по одной очереди на агента). Затем агенты считывают данные из своих очередей, выполняют различные запросы к доменам, записывают результаты в общую очередь результатов, которую бэкенд затем считывает и отправляет фронтенду через веб-сокеты (одна из целей – обновлять статус задачи агента в реальном времени).

Я решил использовать чистые веб-сокеты, а не STOMP или SockJS, поскольку нашёл информацию о том, что эти технологии устарели(поправьте меня, если я ошибаюсь).

Это должно выглядеть примерно так: клиент отправляет запрос к /api/check/http, указав домен в теле запроса, и получает ответ 202 вместе с UUID задачи, которая была создана и помещена в очередь задач. Затем клиент подключается к /ws/task/{taskId} и ожидает результатов этой задачи, которые поступают асинхронно.

Вот пример RabbitConfig основного бэкенда:

@Configuration
@EnableRabbit
public class RabbitConfig {

    public static final String TASK_EXCHANGE = "tasks-exchange";
    public static final String RESULT_EXCHANGE = "results-exchange";

    public static final String RESULT_QUEUE = "results-queue";
    public static final String RESULT_ROUTING_KEY = "results";

    @Bean
    public FanoutExchange taskExchange() {
        return new FanoutExchange(TASK_EXCHANGE);
    }

    @Bean
    public DirectExchange resultExchange() {
        return new DirectExchange(RESULT_EXCHANGE);
    }

    @Bean
    public Queue resultQueue() {
        return new Queue(RESULT_QUEUE, true);
    }

    @Bean
    public Binding resultBinding(Queue resultQueue, DirectExchange resultExchange) {
        return BindingBuilder.bind(resultQueue)
                .to(resultExchange)
                .with(RESULT_ROUTING_KEY);
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplateTask(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }
}

Сохранение задачи в очередь:

@Repository
    @RequiredArgsConstructor
    public class RabbitRepository {
        private final RabbitTemplate rabbitTemplate;
    
        public void save(Task task) {
            try {
                rabbitTemplate.convertAndSend(
                        RabbitConfig.TASK_EXCHANGE,
                        "",
                        task
                );
                System.out.println("Task published: " + task.getId());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

Также, RabbitConfig агента:

@Configuration
@EnableRabbit
public class RabbitConfig {

    public static final String TASK_EXCHANGE = "tasks-exchange";
    public static final String RESULT_EXCHANGE = "results-exchange";
    public static final String RESULT_ROUTING_KEY = "results";

    @Bean
    public FanoutExchange taskExchange() {
        return new FanoutExchange(TASK_EXCHANGE);
    }

    @Bean
    public DirectExchange resultExchange() {
        return new DirectExchange(RESULT_EXCHANGE);
    }

    @Bean
    public Queue taskQueue() {
        return new AnonymousQueue();
    }

    @Bean
    public Binding taskBinding(Queue taskQueue, FanoutExchange taskExchange) {
        return BindingBuilder.bind(taskQueue).to(taskExchange);
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        return converter;
    }
}

Сохранение результата агента в очередь:

@Repository
@RequiredArgsConstructor
public class RabbitRepository {
    private final RabbitTemplate rabbitTemplate;


    public void sendResult(AbstractCheckResult result) {
        try {
            rabbitTemplate.convertAndSend(
                    RabbitConfig.RESULT_EXCHANGE,
                    RESULT_ROUTING_KEY,
                    result
            );
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Listener агента:

@Override
@RabbitListener(queues = "#{taskQueue.name}")
public void performCheck(Task task) {
    System.out.println("taskId: " + task.getId() + ", url: " + task.getUrl() + ", type: " + task.getCheckType().toString());
    try {
        Thread.sleep(500);

        rabbitService.sendResult(new IntermediateCheckResult(
                task.getId(),
                agent,
                new HttpAgentResult(
                        TaskStatus.IN_PROGRESS
                )
        ));
            Instant start = Instant.now();
            ResponseEntity<String> response = restTemplate.getForEntity(task.getUrl()).toString(), String.class);
            rabbitService.sendResult(new HttpCheckResult(
                    task.getId(),
                    agent,
                    new HttpAgentResult(
                            response.getStatusCode().value(),
                            response.getHeaders().toSingleValueMap(),
                            Duration.between(start, Instant.now()).toMillis(),
                            null,
                            TaskStatus.SUCCESS
                    )
            ));
}

Listener основного бекенда:

@Service
@RequiredArgsConstructor
public class TaskResultListenerImpl {
    private final TaskResultWebSocketHandler wsHandler;
    private final ObjectMapper mapper;

    @RabbitListener(queues = RabbitConfig.RESULT_QUEUE)
    public void startListening(Map<String, Object> data) throws JsonProcessingException {
        System.out.println(data);
        String taskId = (String) data.get("id");

        if (wsHandler.isClientConnected(taskId)) {
            wsHandler.sendResultToClient(taskId, mapper.writeValueAsString(data));
        } else {
            System.out.printf("client for taskId %s not connected", taskId);
        }
    }
}

Проблема в том, что я не совсем понимаю, как интегрировать эту архитектуру с веб-сокетами. В моём случае основной listener на бекенде получает сообщения из очереди результатов и отправляет их в WS-сессию. Но что произойдёт, если WS-соединение ещё не установлено, а сообщение придёт? Оно не будет доставлено клиенту, поскольку ACK уже получен. Поэтому пока в качестве заглушки я написал Thread.sleep(500) в listener агента, чтобы гарантировать подключение клиента, и это работает. Но я не думаю, что это хорошее решение, поскольку у разных клиентов задержки будут разными. Скорее всего, моя архитектура неверна, и использование WS здесь избыточно. Мне интересно ваше мнение, я хочу знать, как подобные приложения должны разрабатываться.

Спасибо, буду рад любым ответам!


Ответы (0 шт):