Почему подключение по socket возвращает данные с прошлого подключения?

Описание программы

Имеется Java программа (Spring Boot), которая должна получать данные с удалённых весов.

Модель весов: MERA PVM-3-30-P-ETH.

Подключение к весам происходит по socket. Весы возвращают данные в виде бесконечного потока строк следующего вида: +020.000#+001.000D (левая часть - караты, правая часть - граммы). Поток бесконечен, считать его полностью невозможно. Когда показания весов колеблются (например, при установке груза на весы или при давлении на весы рукой), поток отсутсвует. При стабилизации показаний весов поток возобновляется. Таким образом, поток, приходящий с весов, выглядит следующим образом:

+020.000#+001.000D
+020.000#+001.000D
+020.000#+001.000D
+020.000#+001.000D
+020.000#+001.000D
... поток прекращается, потому что с весов убирают груз
... поток возобновляется, потому что показания на весах стабилизировались
+000.000#+000.000D
+000.000#+000.000D
+000.000#+000.000D
+000.000#+000.000D
+000.000#+000.000D

Для получения показаний с весов используются следующий код:

private static final int SCALE_RESPONSE_ANALYZE_MAX_TRIES_COUNT = 20;
private static final Pattern SCALE_POSITIVE_WEIGHT_RESPONSE_PATTERN =
        Pattern.compile("^\\+\\d{3}\\.\\d{3}#\\+(\\d{3}\\.\\d{3})D$");
private static final Pattern SCALE_NEGATIVE_WEIGHT_RESPONSE_PATTERN =
        Pattern.compile("^-\\d{3}\\.\\d{3}#-(\\d{3}\\.\\d{3})D$");
private static final int WEIGHT_MATCH_GROUP = 1;
private static final int GRAMS_IN_KILOGRAM = 1000;
private static final int WEIGHT_SCALE = 0;
private static final int REQUEST_TIMEOUT_IN_MILLISECONDS = 5000;
private static final String WEIGHT_GETTING_FAILED_ERROR_MSG = "Weight getting was failed";

public int getWeight(String host, Integer port) {
    var executorService = Executors.newCachedThreadPool();
    var inetSocketAddress = new InetSocketAddress(host, port);
    try (var socket = new Socket()) {
        socket.connect(inetSocketAddress, REQUEST_TIMEOUT_IN_MILLISECONDS);
        socket.setSoTimeout(REQUEST_TIMEOUT_IN_MILLISECONDS);
        socket.setTcpNoDelay(false);
        return executorService.submit(getCurrentScaleValueProvider(socket))
                              .get(REQUEST_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS);
    } catch (IOException | ExecutionException | InterruptedException e) {
        if (e instanceof InterruptedException) {
            Thread.currentThread().interrupt();
        }
        throw new RuntimeException("%s: %s".formatted(WEIGHT_GETTING_FAILED_ERROR_MSG, e.getMessage()));
    } catch (TimeoutException e) {
        throw new RuntimeException("""
                                 %s, scale's connection timeout arising: %s\
                                 """.formatted(WEIGHT_GETTING_FAILED_ERROR_MSG, e.getMessage()));
    } finally {
        executorService.shutdown();
    }
}

private static Callable<Integer> getCurrentScaleValueProvider(Socket socket) {
    return () -> {
        try (var socketInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
            for (int i = 0; i < SCALE_RESPONSE_ANALYZE_MAX_TRIES_COUNT; i++) {
                val scaleResponse = socketInputStream.readLine();
                val positiveWeightMatcher = SCALE_POSITIVE_WEIGHT_RESPONSE_PATTERN.matcher(scaleResponse);

                if (positiveWeightMatcher.matches()) {
                    val weightInKilograms = positiveWeightMatcher.group(WEIGHT_MATCH_GROUP);
                    return parseWeight(weightInKilograms);
                }
            }
        }
        throw new RuntimeException(
                "%s, scale's response can not be parsed".formatted(WEIGHT_GETTING_FAILED_ERROR_MSG)
        );
    };
}

private static int parseWeight(String weightInKilograms) {
    if (weightInKilograms == null) {
        throw new RuntimeException("The passed weight is null and can not be parsed");
    }
    return new BigDecimal(weightInKilograms).multiply(new BigDecimal(GRAMS_IN_KILOGRAM))
                                            .setScale(WEIGHT_SCALE, RoundingMode.HALF_UP)
                                            .intValue();
}

Workflow:

  1. Программа начинает работу с метода getWeight().
  2. Создаётся socket.
  3. Сокет передаётся в метод getCurrentScaleValueProvider(), который создаёт task.
  4. Полученный task выполняется при помощи executorService.
  5. В рамках task идёт цикличное считывание строк из потока данных, отправляемых весами. Почему не считывается самое первое значение? Потому что иногда строка начинает считываться с середины, из-за чего её становится невозможно распарсить.
  6. После нахождения первой же строки, соответствующей паттерну, данная строка парсится, полученный результат возвращается.

Проблема

Было замечено, что иногда программа возвращает вес предыдущего замера. Причём, предыдущий замер мог быть произведён как минуту, так и два дня назад. Пример:

6-го числа был произведён замер и с весов было получено значение 2205 грамм (при ожидаемом весе груза в ~2176 грамм). 8-го числа, через два дня, был произведён замер другого груза (ожидаемый вес примерно 2340 грамм), для него также было получено значение в 2205 грамм.

Дополнительные детали

Для нахождения проблемы в метод getCurrentScaleValueProvider() было добавлено логирование:

private static Callable<Integer> getCurrentScaleValueProvider(Socket socket) {
    return () -> {
        try (var socketInputStream = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
            for (int i = 0; i < SCALE_RESPONSE_ANALYZE_MAX_TRIES_COUNT; i++) {
                val scaleResponse = socketInputStream.readLine();
                val positiveWeightMatcher = SCALE_POSITIVE_WEIGHT_RESPONSE_PATTERN.matcher(scaleResponse);

                log.warn("(scale): {}", scaleResponse); // добавлено логирование

                if (i > 5 && positiveWeightMatcher.matches()) { // добавлено условие, чтобы пропускались первые 6 строк
                    val weightInKilograms = positiveWeightMatcher.group(WEIGHT_MATCH_GROUP);
                    return parseWeight(weightInKilograms);
                }
            }
        }
        throw new RuntimeException(
                "%s, scale's response can not be parsed".formatted(WEIGHT_GETTING_FAILED_ERROR_MSG)
        );
    };
}

Теперь программа не берёт первую удовлетворяющую паттерну строку, а считывает и логирует 6 строк и берёт 7-ю (если она соответствует паттерну). В логах видно следующее:

2024-04-18 04:58:27.863: (scale): +044.275#+002.214D
2024-04-18 04:58:27.964: (scale): +044.275#+002.214D
2024-04-18 04:58:28.073: (scale): +044.275#+002.214D
2024-04-18 04:58:28.176: (scale): +044.275#+002.214D
2024-04-18 04:58:28.274: (scale): +044.275#+002.214D
2024-04-18 04:58:28.382: (scale): +044.275#+002.214D
2024-04-19 05:28:40.869: (scale): +044.275#+002.214D
2024-04-19 05:28:40.900: (scale): +015.685#+000.784D
2024-04-19 05:28:41.009: (scale): +015.685#+000.784D
2024-04-19 05:28:41.110: (scale): +015.685#+000.784D
2024-04-19 05:28:41.211: (scale): +015.685#+000.784D
2024-04-19 05:28:41.315: (scale): +015.685#+000.784D

Было выполнено два замера: 18-го и 19-го числа, для каждого замера было считано по 6 строк. Однако, при замере 19-го числа первая считанная строка выглядит так: +044.275#+002.214D, а все следующие выглядят так: +015.685#+000.784D. Первая строка содержит данные для замера от 18-го числа, тогда как текущий замер проводится спустя день.

По какой-то причине первой строкой идёт "застрявшее" с прошлого замера значение.

Возможные причины

Сам я вижу, что у такого поведения программы возможны две причины:

  1. Где-то в программе не происходит очистки данных. Я не могу увидеть потенциальное место утечки, потому что socket закрывается, все потоки закрываются по завершению. Статические переменные не используются, все используемые переменные существует лишь в области видимости методов (кроме Pattern). И в пользу маловероятности такого сценария говорят два момент: во-первых между считываниями может быть промежуток до двух дней, а во-вторых проблема возникает далеко не всегда, а когда возникает, то только с определёнными весами (программа обслуживает до трёх удалённых весов).
  2. Особенность работы весов. Здесь ничего исправить не получится, но чтобы быть уверенным в таком варианте, требуется быть уверенным в том, что первый вариант точно исключается.

Решение

В коде, в котором добавлено логирование, проблемы уже нет, потому что старое "застрявшее" значение считывается в лог. Но я хочу выяснить, в чём изначальная причина проблемы.

Пожалуйста, пишите ваши гипотезы, может быть вы увидите в коде ошибку, которую я пропустил, а может быть я не учитываю какие-то особенности работы socket-подключения.


Ответы (0 шт):