Отправка сообщений из базы данных в Kafka c обязательной сохранностью очерёдности

Мне нужно отправить данные из базы данных в Кафку. Обязательным условием является сохранность очерёдности сообщений как они расположены в базе данных. После того как сообщения отправлены, я должен удалить их из базы данных. Выполнившись, данная задача повторятся вновь (она выполняется по расписанию через @Scheduler).

Я пришёл к выводу, что сохранность очерёдности сообщений требует следующее: прежде чем отправить новое сообщение, мне нужно удостовериться, что предыдущее было успешно доставлено Кафка-брокеру. В итоге, решение получается получается синхронным. Вот мой пример кода:

public List<String> sendMessages(String topicName, List<Object> data) {
    List<String> successIds = new ArrayList<>();
    for (Object value : data) {
        ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topicName, value.getSiebelId(), value);
        try {
            listenableFuture.get(3, TimeUnit.SECONDS);
        } catch (Exception e) {
            log.warn("todo");
            break;
        }
        successIds.add(value.getId());
    }
    return successIds;
}

successIds содержит id сообщений, которые заведомо отправлены и благополучно доставлены брокеру. Далее я их использую чтобы удалить соответствующие данные в базе данных. Если в процессе операции отправки сообщений из List<Object> data какое-то сообщение не было по какой-то причине доставлено брокеру, то мы заканчиваем итерацию раньше, и удаляем ровно то, что успело попасть в successIds. В следующей итерации, мы начнём с тех сообщений, которые не вошли в successIds, т.к. они не были удалены из базы данных.

Из данного решения следует, что гарантия очерёдности сообщений при отправке из БД в кафку требует отказа от асинхронности, что наверняка приведёт к понижению производительности. Я новичок в Кафке, поэтому хотел бы получить экспертное мнение. Является ли данное решение оптимальным?


Ответы (0 шт):