Как правильно закрывать connection и возвращать его в пул при асинхронной работе с базой данных?
Идея была в том, чтоб распараллелить некоторую логику для выигрыша времени при сложных запросах.
Я добавил только completableFuture
и тут понеслось.
Изначально метод был в спринг-овой транзакции, после добавления многопоточности я стал активно терять соединения, при пуле в 20, соединения заканчивались на 7-ой запрос и запрос зависал. Я полагаю, что при работе в другом потоке (методы fetchPacksAsync
и fetchCounterpartiesAsync
), каждый поток создает отдельную транзакцию\соединение с БД и действует в своем контексте.
Ранее я получал ошибки от спринг-а, что была обнаружена утечка соединений, но они позже вернулись в пул. Однако, по факту, как будто не вернулись, и на 7-й запрос всё зависало.
Дошло до того, что в коде, который я скинул, я решил вручную закрывать все транзакции\соединения и вручную управлять транзакциями.
Это тоже не решило проблему, даже наоборот - теперь конекшены расходоваться стали за 4 запроса и на 5ый - приложение виснет.
Я не до конца понимаю все хитросплетения многопоточки запросов к БД, но гугл-ил уже 3 дня, и методом тыка кажется все перепробовал - но решения не нашёл.
Логировал Hikari, дейвствительно с каждый запросом кол-во активных соединений увеличивается +3, дойдя до 20 - приложение зависает.
Пока, что нет вообще идей, из-за чего это происходит. Буду рад любой помощи.
Исходный код выглядит почти также, просто без completableFuture
над запросами к БД - customCounterpartyRepository.counterpartyByShipmentIds()
и customPacksRepository.packByIds()
,
и соответсвенно без ручного управления транзакциями.
Исходный код работает исправно.
Код:
public PaginationRsFindTrackingNumberRs findAllByCondition(SearchRq searchRq) {
List<FindTrackingNumberRs> result = findTracks(boxMapper::entityToFindTrackRs, searchRq);
log.info(FIND_ALL_TRACK_BY_CONDITION_WITH_RESPONSE, result);
return new PaginationRsFindTrackingNumberRs(searchRq.getLimit(), searchRq.getOffset(), shipmentService.findCountByCondition(searchRq), result);
}
/**
* Метод из фасада */
*
private <R> List<R> findTracks(Function<BoxesAndShipmentsDto, R> mappingMethod, SearchRq searchRq) {
List<Shipment> shipments = shipmentService.findAllByCondition(searchRq);
EntityManager entityManager = entityManagerFactory.createEntityManager();
EntityTransaction transaction = entityManager.getTransaction();
Connection connection = null;
try {
connection = hikariDataSource.getConnection();
} catch (SQLException e) {
throw new RuntimeException(e);
}
List<R> result = new ArrayList<>();
try {
transaction.begin();
QBox qBox = QBox.box;
QTrackable qTrackable = QTrackable.trackable;
var shipmentTrackBoxes = factory.selectFrom(qBox)
.join(qBox.trackingNumbers, qTrackable)
.where(qTrackable.trackType.eqTrackType.SHIPMENT)
.transform(GroupBy.groupByqTrackable.trackingNumber.as(GroupBy.list(qBox)));
transaction.commit();
result = shipments.stream()
.map(shipment -> mappingMethod.apply(new BoxesAndShipmentsDto(shipmentTrackBoxes.get(shipment.getTrackingNumber()), List.of(shipment))))
.collect(Collectors.toList());
} catch (Exception e) {
log.error("Ошибка в методе фасада в запросе к бокс-репозитории {}", e.getStackTrace());
} finally {
if (entityManager != null) {
entityManager.close();
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException("Ошибка при закрытии возварте конекшена в пул конекшенов {}", e.fillInStackTrace());
}
}
}
return result;
}
/**
* ветки с многопоточкой */
*
public List<Shipment> findAllByCondition(Predicate predicate, Pageable pageable) {
List<Shipment> shipments = new ArrayList<>();
try {
EntityManager entityManager = entityManagerFactory.createEntityManager();
EntityTransaction transaction = entityManager.getTransaction();
Connection connection = hikariDataSource.getConnection();
try {
transaction.begin();
JPAQuery<Shipment> query = factory
.select(qShipment)
.from(qShipment)
.where(predicate)
.offset(pageable.getOffset())
.limit(pageable.getPageSize());
PathBuilder<Object> path = new PathBuilder<>(qShipment.getType(), qShipment.getMetadata().getName());
pageable.getSort().forEach(order ->
query.orderBy(new OrderSpecifier<>(
order.isAscending() ? Order.ASC : Order.DESC,
path.getComparable(order.getProperty(), Comparable.class)
))
);
shipments = query.fetch();
} catch(Exception e) {
log.error("Ошибка при выполнении транзакции в кастом-репозитори-сервис {]", e.getStackTrace());
} finally {
if (entityManager != null) {
entityManager.close();
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException("Ошибка при возварте конекшена в пул конекшенов {}", e.fillInStackTrace());
}
}
}
List<String> shipmentIds = shipments.stream().map(Shipment::getObjectId).collect(Collectors.toList());
CompletableFuture<Map<String, List<Counterparty>>> counterpartiesFuture = fetchCounterpartiesAsync(shipmentIds);
CompletableFuture<Map<String, List<Pack>>> packsFutures = fetchPacksAsync(shipmentIds);
CompletableFuture.allOf(counterpartiesFuture, packsFutures).join();
Map<String, List<Counterparty>> counterpartiesByShipmentId = counterpartiesFuture.join();
Map<String, List<Pack>> packsByShipmentId = packsFutures.join();
List<CompletableFuture<Void>> futures = shipments.stream()
.map(shipment -> CompletableFuture.runAsync(() -> {
shipment.setCounterparties(counterpartiesByShipmentId.get(shipment.getObjectId()));
shipment.setPacks(packsByShipmentId.get(shipment.getObjectId()));
}))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return shipments;
} catch (Exception e) {
log.error("Exception trace {}", e.getStackTrace());
}
return shipments;
}
public CompletableFuture<Map<String, List<Counterparty>>> fetchCounterpartiesAsync(List<String> shipmentIds) {
return CompletableFuture.supplyAsync(() -> {
EntityManager entityManager = entityManagerFactory.createEntityManager();
EntityTransaction transaction = entityManager.getTransaction();
Connection connection = null;
try {
connection = hikariDataSource.getConnection();
} catch (SQLException e) {
throw new RuntimeException(e);
}
Map<String, List<Counterparty>> counterpaties = new HashMap<>();
try {
transaction.begin();
counterpaties = customCounterpartyRepository.counterpartyByShipmentIds(shipmentIds)
.stream()
.collect(Collectors.groupingBy(counterparty -> counterparty.getShipment().getObjectId()));
transaction.commit();
return counterpaties;
} catch (Exception e) {
log.error("Ошибка при работе с многопоточным запросом к БД {}", e.getStackTrace());
} finally {
if (entityManager != null) {
entityManager.close();
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException("ОШбика при щакрытии возварте конекшена в пул конекшенов {}", e.fillInStackTrace());
}
}
}
return counterpaties;
}
);
}
public CompletableFuture<Map<String, List<Pack>>> fetchPacksAsync(List<String> shipmentIds) {
return CompletableFuture.supplyAsync(() -> {
EntityManager entityManager = entityManagerFactory.createEntityManager();
EntityTransaction transaction = entityManager.getTransaction();
Connection connection = null;
try {
connection = hikariDataSource.getConnection();
} catch (SQLException e) {
throw new RuntimeException(e);
}
Map<String, List<Pack>> packs = new HashMap<>();
try {
transaction.begin();
packs = customPacksRepository.packByIds(shipmentIds)
.stream()
.collect(Collectors.groupingBy(pack -> pack.getShipment().getObjectId()));
return packs;
} catch (Exception e) {
log.error("Ошибка при запросе к пак-репозитории {}", e.getStackTrace());
} finally {
if (entityManager != null) {
entityManager.close();
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException("ОШбика при щакрытии возварте конекшена в пул конекшенов {}", e.fillInStackTrace());
}
}
}
return packs;
}
);
}
Дополняю вопрос - увеличение кол-ва соединений в пуле в конфигурации Hikari до 40 и 50 (максимальное кол-во) результата не дало - соединения также утекают, только возврасло кол-во успешных запросов пропорционально кол-ву добавленных соединений в пул.
Мой конфиг Hikari:
hikari:
maximum-poll-size: 20
minimum-idle: 10
leak-detection-threshold: 5000
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
Выделенные фрагменты кода, где я пытаюсь принудительно вернуть соединения в пул:
EntityManager entityManager = entityManagerFactory.createEntityManager();
EntityTransaction transaction = entityManager.getTransaction();
Connection connection = null;
try {
connection = hikariDataSource.getConnection();
} catch (SQLException e) {
throw new RuntimeException(e);
}
try {
transaction.begin();
//some code...
counterpaties = customCounterpartyRepository.counterpartyByShipmentIds(shipmentIds)
.stream()
.collect(Collectors.groupingBy(counterparty -> counterparty.getShipment().getObjectId()));
transaction.commit();
return counterpaties;
} catch (Exception e) {
log.error("Ошибка при работе с многопоточным запросом к БД {}", e.getStackTrace());
} finally {
if (entityManager != null) {
entityManager.close();
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException("ОШбика при щакрытии возварте конекшена в пул конекшенов {}", e.fillInStackTrace());
}
}
}
return counterpaties;
}
Основная задача - возвращать соединения в пул при отработке метода. Для того, чтобы этот метод можно было выполнять сколько нужно. Многопоточность на большой дате реально даёт ощутимый прирост в производительности, и я не хочу откзаываться от этого - это нужно по задаче.
Если кто-то объяснит детально, как там все под капотом работает, и в чем может быть ошибка, и как ее решать - буду очень благодарен, выручите.
Каюсь, что сам что-то сложнее CRUD-ов и исправления ошибок, не писал.
Ответы (1 шт):
Правильно закрывать connection нужно автоматически, поскольку объект этого типа имплементирует AutoClosable
по умолчанию.
Вот здесь происходит утечка соединений:
try {
connection = hikariDataSource.getConnection();
} catch (SQLException e) {
throw new RuntimeException(e);
}
Чтобы это поменять, надо использовать try-with-resources
для того, чтобы быть уверенным, что соединение будет закрыто и возвращено в пул при выходе из блока, который следует обрамлять на весь код, а не на одно только открытие соединения.
try (connection = hikariDataSource.getConnection();) {
// здесь весь код, который использует connection
...
} catch (SQLException e) {
throw new RuntimeException(e);
}
Также, если используется JPA, то имеет смысл получать connection непосредственно из него. Вот как это делается на английском сайте: Getting Database connection in pure JPA setup.