Неправильное поведение микросервиса при обращении к другому
У меня возникает проблема при общении одного микросервиса с другим. Есть два МС: mscompany и msrefbook.
МС mscompany содержит контроллер:
@RestController
@RequestMapping("/company")
public class CompanyController {
private final MessageFuncActions messageFuncActions;
public CompanyController(MessageFuncActions messageFuncActions) {
this.messageFuncActions = messageFuncActions;
}
@PostMapping("/add")
public ResponseEntity<Company> add(@RequestBody Company company) {
messageFuncActions.sendNewCompanyMessage(company.getRangid());
messageFuncActions.getRefbookResponseSender().asFlux().subscribe(response -> {
boolean found = response.getPayload();
System.out.println("found: "+ found);
});
return new ResponseEntity<>(HttpStatus.OK);
}
}
А также сервис, который позволяет отправить сообщение в другой микросервис и получить от него ответ, который затем обрабатывается в контроллере:
@Service
@Getter
public class MessageFuncActions {
//Для того чтобы считывать данные по требованию (а не постоянно). Создаем поток, откуда данные будут попадать в канал message spring cloud streams
//Для отправки сообщений в другие микросервисы
private Sinks.Many<Message<Long>> companyMessageSender = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false); //С типом Long
//Для приема ответа из других микросервисов
private Sinks.Many<Message<Boolean>> refbookResponseSender = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false); //С типом Boolean
@Bean //Создаем внутреннюю шину или трубу куда добавляем сообщение. Сообщения из него дальше будут попадать в брокер сообщений
public Supplier<Flux<Message<Long>>> newCompanyActionProduce() {
return () -> companyMessageSender.asFlux(); //будет считывать данные из потока Flux как только туда что-то положат
}
//Отправка сообщения в мс refbook, для проверки есть ли в БД мс refbook запись с rangid
//Здесь rangId - это ссылка на refbook.id из микросервиса mis-refbooks
public void sendNewCompanyMessage(Long rangId) {
//Добавление в слушатель нового сообщения
getCompanyMessageSender().emitNext(MessageBuilder.withPayload(rangId).build(), Sinks.EmitFailureHandler.FAIL_FAST);
System.out.println("Сообщение отправлено: " + rangId);
System.out.println(MessageBuilder.withPayload(rangId).build().getPayload());
}
//Получение ответа от мс refbook. Этот метод вызывается далее в контроллере
@Bean
public Consumer<Message<Boolean>> newCompanyResponseConsume() {
return booleanMessage -> {
boolean found = booleanMessage.getPayload();
System.out.println("Запись " + (found ? "найдена" : "не найдена") + " в справочнике");
getRefbookResponseSender().emitNext(MessageBuilder.withPayload(found).build(), Sinks.EmitFailureHandler.FAIL_FAST);
};
}
}
Второй МС msrefbook также содержит сервис, который принимает и отправляет обратно сообщения
@Service
@Getter
public class MessageFuncAction {
private final RefbookService refbookService;
//Для отправки сообщений в мс company
private Sinks.Many<Message<Boolean>> responseSender = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
public MessageFuncAction(RefbookService refbookService) {
this.refbookService = refbookService;
}
@Bean
//Создаем внутреннюю шину или трубу куда добавляем сообщение. Сообщения из него дальше будут попадать в брокер сообщений
public Supplier<Flux<Message<Boolean>>> newCompanyResponseProduce() {
return () -> responseSender.asFlux();
}
//Проверяем есть ли в БД Refbook запись с полученным от мс company rangid и посылаем обратно ответ
@Bean
public Consumer<Message<Long>> newCompanyActionConsume() {
return longMessage -> {
System.out.println(longMessage.getPayload());
Optional<Refbook> refbookOptional = refbookService.findById(longMessage.getPayload());
boolean found = refbookOptional.isPresent();
if (found) {
System.out.println(refbookOptional.get().getText());
}
System.out.println("Справочник с id :" + longMessage.getPayload() + (found ? " найден в БД" : " не найден в БД"));
//getResponseSender().emitNext(MessageBuilder.withPayload(found).build(), Sinks.EmitFailureHandler.FAIL_FAST);
getResponseSender().emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
};
}
}
Проблема в следующем: Во-первых если второй микросервис недоступен, то в контроллера mscompany все равно выводится true, как будто такой ответ получен из другого микросервиса, а во-вторых при каждом выполнении метода add в контроллере found:true выводится много раз, видимо все сообщения копятся и выводится каждый раз при вызове.
Подскажите пожалуйста как правильно реализовать получение сообщения и как вывести сообщение если второй микросервис недоступен?
Пробовал переделать метод add таким образом:
@PostMapping("/add")
public ResponseEntity<Company> add(@RequestBody Company company) {
CompletableFuture<Boolean> refbookResponseFuture = new CompletableFuture<>();
messageFuncActions.sendNewCompanyMessage(company.getRangid());
messageFuncActions.getRefbookResponseSender().asFlux().take(1).subscribe(
response -> {
boolean found = response.getPayload();
System.out.println("found: " + found);
refbookResponseFuture.complete(found);
},
error -> {
System.out.println("Ошибка получения сообщения от мс refbook: " + error.getMessage());
refbookResponseFuture.complete(false);
},
() -> {
System.out.println("Таймаут: ответ от мс refbook не получен");
refbookResponseFuture.complete(false);
}
);
try {
Boolean refbookResponse = refbookResponseFuture.get();
System.out.println(refbookResponse + " получен");
} catch (InterruptedException | ExecutionException e) {
return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
}
return new ResponseEntity<>(HttpStatus.OK);
}
Но все равно при многократном вызове метода, даже если второй микросервис недоступен, все равно выводит found:true.
Ответы (1 шт):
Чтобы обработать случай, когда второй микросервис недоступен, вам нужна механика тайм-аута. Вы можете использовать Mono с тайм-аутом и присоединить его к вашему CompletableFuture. Что-то вроде этого:
@PostMapping("/add")
public ResponseEntity<Company> add(@RequestBody Company company) {
CompletableFuture<Boolean> refbookResponseFuture = new CompletableFuture<>();
// Устанавливаем тайм-аут для будущего ответа
refbookResponseFuture.orTimeout(5, TimeUnit.SECONDS);
messageFuncActions.sendNewCompanyMessage(company.getRangid());
messageFuncActions.getRefbookResponseSender().asFlux()
.take(1)
.timeout(Duration.ofSeconds(5))
.subscribe(
response -> { ... }, // То же самое
error -> refbookResponseFuture.completeExceptionally(error),
() -> refbookResponseFuture.complete(false) // Ничего не найдено
);
try {
Boolean refbookResponse = refbookResponseFuture.get();
// ...
} catch (TimeoutException e) {
System.out.println("Timeout: ответ от msrefbook не получен");
return new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT);
} catch (InterruptedException | ExecutionException e) {
return new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR);
}
// ...
}
Чтобы исправить проблему с многократной обработкой сообщений, убедитесь, что вы подписываетесь на поток Flux таким образом, чтобы каждая подписка отслеживала только одно сообщение. Использование take(1) хорошо, но нужно также убедиться, что нет других случаев множественной подписки на одни и те же события.
Ваш метод sendNewCompanyMessage отправляет сообщение, но не ожидает ответа. Подписка в контроллере добавляется каждый раз при вызове метода add, и это может привести к многократной подписке на обработку одного и того же ответа. Вам может потребоваться механизм для сопоставления запроса с ответом, например, с использованием идентификатора корреляции.
Вы используете emitComplete() в своем сервере msrefbook, который успешно завершает поток и предотвращает получение любых других событий. Вы должны отправлять emitNext с сообщением о состоянии (найден или не найден), вместо того чтобы завершать поток.
Рассмотрите использование стратегии обработки ошибок для Sinks.EmitFailureHandler при отправке сообщений, чтобы обрабатывать неудачные попытки отправки и управлять backpressure.
Убедитесь, что в обоих микросервисах правильно настроены каналы Spring Cloud Stream для обмена сообщениями.
Чтобы корректно обработать случай недоступности msrefbook, добавьте тайм-ауты для ожидания ответа и используйте идентификатор корреляции для сопоставления запросов и ответов. Функционал комбинирования тайм-аута с CompletableFuture и Flux (как показано выше) должен помочь контролировать поведение в случае проблем связи с вторым микросервисом.