SecurityContextHolder пустой при получении сообщения по Kafka

Метод getCurrentUserInfo вызывается внутри метода formingReestrContent, который, в свою очередь, вызывается вследствие получения сообщения из Kafka.

При вызове метода getCurrentUserInfo пытаюсь получить principal, и он всегда оказывается пустой т. к. SecurityContextHolder.getContext() = null.

Однако при вызове этого же метода Rest запросом SecurityContextHolder.getContext() имеет все данные о текущем юзере и удаётся достать необходимую информацию из principal.

Полагаю, что SecurityContextHolder.getContext() = null при обращении по Kafka, т. к. там не передаётся никакой информации, содержащей данных об аутентификации.

Возможно ли по Kafka передать необходимые данные, чтобы при вычитке сообщения SecurityContextHolder.getContext() был заполнен так же, как для вызова по Rest?

CurrentEmployeeService:

public class CurrentEmployeeService {

    private final UserRestService userRestService;
    private final EmployeeRestService employeeRestService;

    public UserInfoDto getCurrentUserInfo() {
        Object principal = SecurityContextHolder.getContext().getAuthentication().getPrincipal();
        Long userAccountId = ((DQPrincipal)principal).getUserAccountId();
        Long userId = userRestService.getUserByParams(userAccountId.toString()).getUserId();

        return userRestService.getUserInfoByParams(userId.toString());
    }

    public EmployeeDto getCurrentEmployee() {
        return employeeRestService.getEmplyeeByParams(getCurrentUserInfo().getObjectId().toString());
    }

}

Продюсер:

public class PostalReestrProducer {

    public static final String QPPNPF_POSTAL_REESTR_PENSION_APPLICATION_REPLY = "qppnpf-postal-reestr-pension-application-reply";
    public static final String HEADER_KEY_EXTERNAL_ID = "externalId";

    private final KafkaTemplate<byte[], byte[]> kafkaTemplate;
    private final ObjectMapper objectMapper;

    /* Отправление Заявлений в Почтовый Реестр */
    public void sendApplicationListToPostalReestr(String postalReestrId, List<PensionApplicationToPostalReestrMessage> messageToKafka) {
        log.info("Send kafka message: topic={}, message={}", QPPNPF_POSTAL_REESTR_PENSION_APPLICATION_REPLY, messageToKafka);

        try {
            byte[] bytes = objectMapper.writeValueAsBytes(messageToKafka);
            ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(QPPNPF_POSTAL_REESTR_PENSION_APPLICATION_REPLY, bytes);
            producerRecord.headers().add(HEADER_KEY_EXTERNAL_ID, postalReestrId.getBytes(StandardCharsets.UTF_8));
            kafkaTemplate.send(producerRecord);
        } catch (Exception ex) {
            log.error("Error send kafka message: topic={}, error={}", QPPNPF_POSTAL_REESTR_PENSION_APPLICATION_REPLY, ex.getMessage());
            throw new RuntimeException("Ошибка при отправке заявлений в Почтовый Реестр");
        }
    }

}

Консюмер:

public class PensionApplicationListener {

    public static final String QPPNPF_POSTAL_REESTR_PENSION_APPLICATION_REPLY = "qppnpf-postal-reestr-pension-application-reply";
    public static final String HEADER_KEY_EXTERNAL_ID = "externalId";

    private final ObjectMapper objectMapper;
    private final PensionApplicationReestrContentService pensionApplicationReestrContentService;

    @KafkaListener(topics = QPPNPF_POSTAL_REESTR_PENSION_APPLICATION_REPLY, groupId = "forming-postal-reestr-group")
    public void receivePensionApplicationList(ConsumerRecord<byte[], byte[]> consumerRecord) {
        String message = new String(consumerRecord.value(), StandardCharsets.UTF_8);
        log.info("Message from {}, headers = {}, value = {}",QPPNPF_POSTAL_REESTR_PENSION_APPLICATION_REPLY, consumerRecord.headers(), message);

        try {
            String postalReestrId = StringUtils.EMPTY;
            for (Header header : consumerRecord.headers()) {
                if (header.key().equals(HEADER_KEY_EXTERNAL_ID)) {
                    postalReestrId = new String(header.value());
                }
            }

            if (StringUtils.isEmpty(postalReestrId)) {
                throw new RuntimeException("Не передан обязательный параметр postalReestrId");
            }

            List<PensionApplicationToPostalReestrMessage> toPostalReestrMessages = objectMapper
                .readValue(message, objectMapper.getTypeFactory().constructCollectionType(List.class, 
                PensionApplicationToPostalReestrMessage.class));

            pensionApplicationReestrContentService.
                formingReestrContent(Long.valueOf(postalReestrId),
                toPostalReestrMessages);

        } catch (Exception ex) {
            log.info("Error during mapping message from {} topic, error = {}", QPPNPF_POSTAL_REESTR_PENSION_APPLICATION_REPLY, ex.getMessage());
            log.error("При вычитке сообщения из кафки возникла ошибка, ex = {}", ex.getMessage());
            ex.printStackTrace();
            throw new CustomRuntimeException(ex.getMessage());
        }
    }
}

Ответы (0 шт):