тесты на exception Kafka
Я создала такие exception, прокинула их. Далее мне нужно их протестировать интеграционными тестами. Подскажите как это делать, ни когда не писала тесты.
@KafkaListener(topics = "${kafka-topics.adapter-direction-create}")
public void listenerAdapterDirectionCreate(ConsumerRecord<String, JsonNode> myRecord) {
log.info("Get message in topic adapter-direction-create, key {} value {}", myRecord.key(), myRecord.value());
DirectionRequest directionRequest = null;
try {
directionRequest = objectMapper.treeToValue(myRecord.value(), DirectionRequest.class);
} catch (JsonProcessingException e) {
log.error("Error reading message: {}", e.getMessage());
throw new KafkaCreateException(e.getMessage());
}
log.info("Created request to create direction: " + directionRequest.getName());
kafkaSender.sendMessage(platformDirectionCreate, "Create direction", myRecord.value());
}
@KafkaListener(topics = "${kafka-topics.adapter-direction-update}")
public void listenerAdapterDirectionUpdate(ConsumerRecord<String, JsonNode> myRecord) {
log.info("Get message in topic adapter-direction-update, key {} value {}", myRecord.key(), myRecord.value());
DirectionRequest directionRequest = null;
try {
directionRequest = objectMapper.treeToValue(myRecord.value(), DirectionRequest.class);
} catch (JsonProcessingException e) {
log.error("Error reading message: {}", e.getMessage());
throw new KafkaUpDateException(e.getMessage());
}
log.info("Created request to update direction: " + directionRequest.getName());
kafkaSender.sendMessage(platformDirectionUpdate, "Update direction", myRecord.value());
}
@KafkaListener(topics = "${kafka-topics.adapter-direction-delete}")
public void listenerAdapterDirectionDelete(ConsumerRecord<String, JsonNode> myRecord) {
log.info("Get message in topic adapter-direction-delete, key {} value {}", myRecord.key(), myRecord.value());
String id = null;
try {
id = objectMapper.treeToValue(myRecord.value(), String.class);
} catch (JsonProcessingException e) {
log.error("Error reading message: {}", e.getMessage());
throw new KafkaDeleteException(e.getMessage());
}
log.info("Created request to delete direction with id: " + id);
kafkaSender.sendMessage(platformDirectionDelete, "Delete direction", myRecord.value());
}