тесты на exception Kafka

Я создала такие exception, прокинула их. Далее мне нужно их протестировать интеграционными тестами. Подскажите как это делать, ни когда не писала тесты.

 @KafkaListener(topics = "${kafka-topics.adapter-direction-create}")
    public void listenerAdapterDirectionCreate(ConsumerRecord<String, JsonNode> myRecord) {
        log.info("Get message in topic adapter-direction-create, key {} value {}", myRecord.key(), myRecord.value());

        DirectionRequest directionRequest = null;
        try {
            directionRequest = objectMapper.treeToValue(myRecord.value(), DirectionRequest.class);
        } catch (JsonProcessingException e) {
           log.error("Error reading message: {}", e.getMessage());
           throw new KafkaCreateException(e.getMessage());
        }
        log.info("Created request to create direction: " + directionRequest.getName());

        kafkaSender.sendMessage(platformDirectionCreate, "Create direction", myRecord.value());
    }

    @KafkaListener(topics = "${kafka-topics.adapter-direction-update}")
    public void listenerAdapterDirectionUpdate(ConsumerRecord<String, JsonNode> myRecord) {
        log.info("Get message in topic adapter-direction-update, key {} value {}", myRecord.key(), myRecord.value());

        DirectionRequest directionRequest = null;

        try {
            directionRequest = objectMapper.treeToValue(myRecord.value(), DirectionRequest.class);
        } catch (JsonProcessingException e) {
            log.error("Error reading message: {}", e.getMessage());
            throw new KafkaUpDateException(e.getMessage());
        }
        log.info("Created request to update direction: " + directionRequest.getName());

        kafkaSender.sendMessage(platformDirectionUpdate, "Update direction", myRecord.value());
    }

    @KafkaListener(topics = "${kafka-topics.adapter-direction-delete}")
    public void listenerAdapterDirectionDelete(ConsumerRecord<String, JsonNode> myRecord) {
        log.info("Get message in topic adapter-direction-delete, key {} value {}", myRecord.key(), myRecord.value());

        String id = null;

        try {
            id = objectMapper.treeToValue(myRecord.value(), String.class);
        } catch (JsonProcessingException e) {
             log.error("Error reading message: {}", e.getMessage());
            throw new KafkaDeleteException(e.getMessage());
        }
        log.info("Created request to delete direction with id: " + id);

        kafkaSender.sendMessage(platformDirectionDelete, "Delete direction", myRecord.value());
    }

Ответы (0 шт):