Как правильно закрывать connection и возвращать его в пул при асинхронной работе с базой данных?

Идея была в том, чтоб распараллелить некоторую логику для выигрыша времени при сложных запросах.

Я добавил только completableFuture и тут понеслось. Изначально метод был в спринг-овой транзакции, после добавления многопоточности я стал активно терять соединения, при пуле в 20, соединения заканчивались на 7-ой запрос и запрос зависал. Я полагаю, что при работе в другом потоке (методы fetchPacksAsync и fetchCounterpartiesAsync), каждый поток создает отдельную транзакцию\соединение с БД и действует в своем контексте.

Ранее я получал ошибки от спринг-а, что была обнаружена утечка соединений, но они позже вернулись в пул. Однако, по факту, как будто не вернулись, и на 7-й запрос всё зависало.

Дошло до того, что в коде, который я скинул, я решил вручную закрывать все транзакции\соединения и вручную управлять транзакциями.

Это тоже не решило проблему, даже наоборот - теперь конекшены расходоваться стали за 4 запроса и на 5ый - приложение виснет.

Я не до конца понимаю все хитросплетения многопоточки запросов к БД, но гугл-ил уже 3 дня, и методом тыка кажется все перепробовал - но решения не нашёл.

Логировал Hikari, дейвствительно с каждый запросом кол-во активных соединений увеличивается +3, дойдя до 20 - приложение зависает.

Пока, что нет вообще идей, из-за чего это происходит. Буду рад любой помощи.

Исходный код выглядит почти также, просто без completableFuture над запросами к БД - customCounterpartyRepository.counterpartyByShipmentIds() и customPacksRepository.packByIds(), и соответсвенно без ручного управления транзакциями.

Исходный код работает исправно.

Код:

public PaginationRsFindTrackingNumberRs findAllByCondition(SearchRq searchRq) {  
    List<FindTrackingNumberRs> result = findTracks(boxMapper::entityToFindTrackRs, searchRq);  
    log.info(FIND_ALL_TRACK_BY_CONDITION_WITH_RESPONSE, result);  
    return new PaginationRsFindTrackingNumberRs(searchRq.getLimit(), searchRq.getOffset(), shipmentService.findCountByCondition(searchRq), result);  
}  
  
/**  
 * Метод из фасада */  
 * 
private <R> List<R> findTracks(Function<BoxesAndShipmentsDto, R> mappingMethod, SearchRq searchRq) {  
    List<Shipment> shipments = shipmentService.findAllByCondition(searchRq);  
    EntityManager entityManager = entityManagerFactory.createEntityManager();  
    EntityTransaction transaction = entityManager.getTransaction();  
    Connection connection = null;  
    try {  
        connection = hikariDataSource.getConnection();  
    } catch (SQLException e) {  
        throw new RuntimeException(e);  
    }  
  
    List<R> result = new ArrayList<>();  
    try {  
        transaction.begin();  
        QBox qBox = QBox.box;  
        QTrackable qTrackable = QTrackable.trackable;  
        var shipmentTrackBoxes = factory.selectFrom(qBox)  
                .join(qBox.trackingNumbers, qTrackable)  
                .where(qTrackable.trackType.eqTrackType.SHIPMENT)  
                .transform(GroupBy.groupByqTrackable.trackingNumber.as(GroupBy.list(qBox)));  
        transaction.commit();  
  
        result = shipments.stream()  
                .map(shipment -> mappingMethod.apply(new BoxesAndShipmentsDto(shipmentTrackBoxes.get(shipment.getTrackingNumber()), List.of(shipment))))  
                .collect(Collectors.toList());  
    } catch (Exception e) {  
        log.error("Ошибка в методе фасада в запросе к бокс-репозитории {}", e.getStackTrace());  
    } finally {  
        if (entityManager != null) {  
            entityManager.close();  
        }  
        if (connection != null) {  
            try {  
                connection.close();  
            } catch (SQLException e) {  
                throw new RuntimeException("Ошибка при закрытии возварте конекшена в пул конекшенов {}", e.fillInStackTrace());  
            }  
        }  
    }  
    return result;  
}  
  
/**  
 * ветки с многопоточкой */ 
 * 
public List<Shipment> findAllByCondition(Predicate predicate, Pageable pageable) {  
    List<Shipment> shipments = new ArrayList<>();  
    try {  
        EntityManager entityManager = entityManagerFactory.createEntityManager();  
        EntityTransaction transaction = entityManager.getTransaction();  
        Connection connection = hikariDataSource.getConnection();  
        try {  
            transaction.begin();  
            JPAQuery<Shipment> query = factory  
                    .select(qShipment)  
                    .from(qShipment)  
                    .where(predicate)  
                    .offset(pageable.getOffset())  
                    .limit(pageable.getPageSize());  
            PathBuilder<Object> path = new PathBuilder<>(qShipment.getType(), qShipment.getMetadata().getName());  
            pageable.getSort().forEach(order ->  
                    query.orderBy(new OrderSpecifier<>(  
                            order.isAscending() ? Order.ASC : Order.DESC,  
                            path.getComparable(order.getProperty(), Comparable.class)  
                    ))  
            );  
            shipments = query.fetch();  
        } catch(Exception e) {  
            log.error("Ошибка при выполнении транзакции в кастом-репозитори-сервис {]", e.getStackTrace());  
        } finally {  
            if (entityManager != null) {  
                entityManager.close();  
            }  
            if (connection != null) {  
                try {  
                    connection.close();  
                } catch (SQLException e) {  
                    throw new RuntimeException("Ошибка при  возварте конекшена в пул конекшенов {}", e.fillInStackTrace());  
                }  
            }  
        }  
  
  
        List<String> shipmentIds = shipments.stream().map(Shipment::getObjectId).collect(Collectors.toList());  
  
        CompletableFuture<Map<String, List<Counterparty>>> counterpartiesFuture = fetchCounterpartiesAsync(shipmentIds);  
        CompletableFuture<Map<String, List<Pack>>> packsFutures = fetchPacksAsync(shipmentIds);  
        CompletableFuture.allOf(counterpartiesFuture, packsFutures).join();  
  
        Map<String, List<Counterparty>> counterpartiesByShipmentId = counterpartiesFuture.join();  
        Map<String, List<Pack>> packsByShipmentId = packsFutures.join();  
        List<CompletableFuture<Void>> futures = shipments.stream()  
                .map(shipment -> CompletableFuture.runAsync(() -> {  
                    shipment.setCounterparties(counterpartiesByShipmentId.get(shipment.getObjectId()));  
                    shipment.setPacks(packsByShipmentId.get(shipment.getObjectId()));  
                }))  
                .collect(Collectors.toList());  
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();  
        return shipments;  
    } catch (Exception e) {  
        log.error("Exception trace {}", e.getStackTrace());  
    }  
    return shipments;  
}  
  
public CompletableFuture<Map<String, List<Counterparty>>> fetchCounterpartiesAsync(List<String> shipmentIds) {  
    return CompletableFuture.supplyAsync(() -> {  
  
                EntityManager entityManager = entityManagerFactory.createEntityManager();  
                EntityTransaction transaction = entityManager.getTransaction();  
                Connection connection = null;  
                try {  
                    connection = hikariDataSource.getConnection();  
                } catch (SQLException e) {  
                    throw new RuntimeException(e);  
                }  
  
                Map<String, List<Counterparty>> counterpaties = new HashMap<>();  
                try {  
                    transaction.begin();  
                    counterpaties = customCounterpartyRepository.counterpartyByShipmentIds(shipmentIds)  
                            .stream()  
                            .collect(Collectors.groupingBy(counterparty -> counterparty.getShipment().getObjectId()));  
                    transaction.commit();  
                    return counterpaties;  
                } catch (Exception e) {  
  
                    log.error("Ошибка при работе с многопоточным запросом к БД {}", e.getStackTrace());  
                } finally {  
                    if (entityManager != null) {  
                        entityManager.close();  
                    }  
                    if (connection != null) {  
                        try {  
                            connection.close();  
                        } catch (SQLException e) {  
                            throw new RuntimeException("ОШбика при щакрытии возварте конекшена в пул конекшенов {}", e.fillInStackTrace());  
                        }  
                    }  
                }  
                return counterpaties;  
            }  
  
    );  
}  
  
public CompletableFuture<Map<String, List<Pack>>> fetchPacksAsync(List<String> shipmentIds) {  
    return CompletableFuture.supplyAsync(() -> {  
                EntityManager entityManager = entityManagerFactory.createEntityManager();  
                EntityTransaction transaction = entityManager.getTransaction();  
                Connection connection = null;  
                try {  
                    connection = hikariDataSource.getConnection();  
                } catch (SQLException e) {  
                    throw new RuntimeException(e);  
                }  
  
                Map<String, List<Pack>> packs = new HashMap<>();  
                try {  
                    transaction.begin();  
                    packs = customPacksRepository.packByIds(shipmentIds)  
                            .stream()  
                            .collect(Collectors.groupingBy(pack -> pack.getShipment().getObjectId()));  
                    return packs;  
                } catch (Exception e) {  
  
                    log.error("Ошибка при запросе к пак-репозитории {}", e.getStackTrace());  
                } finally {  
                    if (entityManager != null) {  
                        entityManager.close();  
                    }  
                    if (connection != null) {  
                        try {  
                            connection.close();  
                        } catch (SQLException e) {  
                            throw new RuntimeException("ОШбика при щакрытии возварте конекшена в пул конекшенов {}", e.fillInStackTrace());  
                        }  
                    }  
  
                }  
                return packs;  
            }  
    );  
}

Дополняю вопрос - увеличение кол-ва соединений в пуле в конфигурации Hikari до 40 и 50 (максимальное кол-во) результата не дало - соединения также утекают, только возврасло кол-во успешных запросов пропорционально кол-ву добавленных соединений в пул.

Мой конфиг Hikari:

hikari:
  maximum-poll-size: 20
  minimum-idle: 10
  leak-detection-threshold: 5000
  connection-timeout: 30000
  idle-timeout: 600000
  max-lifetime: 1800000

Выделенные фрагменты кода, где я пытаюсь принудительно вернуть соединения в пул:

                EntityManager entityManager = entityManagerFactory.createEntityManager();  
                EntityTransaction transaction = entityManager.getTransaction();  
                Connection connection = null;  
                try {  
                    connection = hikariDataSource.getConnection();  
                } catch (SQLException e) {  
                    throw new RuntimeException(e);  
                }  

 
                try {  
                    transaction.begin();  

//some code...
                    counterpaties = customCounterpartyRepository.counterpartyByShipmentIds(shipmentIds)  
                            .stream()  
                            .collect(Collectors.groupingBy(counterparty -> counterparty.getShipment().getObjectId())); 
 
                    transaction.commit();  
                    return counterpaties;  
                } catch (Exception e) {  

                    log.error("Ошибка при работе с многопоточным запросом к БД {}", e.getStackTrace());  
                } finally {  
                    if (entityManager != null) {  
                        entityManager.close();  
                    }  
                    if (connection != null) {  
                        try {  
                            connection.close();  
                        } catch (SQLException e) {  
                            throw new RuntimeException("ОШбика при щакрытии возварте конекшена в пул конекшенов {}", e.fillInStackTrace());  
                        }  
                    }  
                }  
                return counterpaties;  
            }  

Основная задача - возвращать соединения в пул при отработке метода. Для того, чтобы этот метод можно было выполнять сколько нужно. Многопоточность на большой дате реально даёт ощутимый прирост в производительности, и я не хочу откзаываться от этого - это нужно по задаче.

Если кто-то объяснит детально, как там все под капотом работает, и в чем может быть ошибка, и как ее решать - буду очень благодарен, выручите.

Каюсь, что сам что-то сложнее CRUD-ов и исправления ошибок, не писал.


Ответы (1 шт):

Автор решения: Roman C

Правильно закрывать connection нужно автоматически, поскольку объект этого типа имплементирует AutoClosable по умолчанию.

Вот здесь происходит утечка соединений:

try {  
  connection = hikariDataSource.getConnection();  
} catch (SQLException e) {  
  throw new RuntimeException(e);  
}  

Чтобы это поменять, надо использовать try-with-resources для того, чтобы быть уверенным, что соединение будет закрыто и возвращено в пул при выходе из блока, который следует обрамлять на весь код, а не на одно только открытие соединения.

try (connection = hikariDataSource.getConnection();) {
  // здесь весь код, который использует connection
  ...
} catch (SQLException e) {  
  throw new RuntimeException(e);  
}  

Также, если используется JPA, то имеет смысл получать connection непосредственно из него. Вот как это делается на английском сайте: Getting Database connection in pure JPA setup.

→ Ссылка