Flink подключение метрик для Prometheus (Java)

мне нужна помощь.

Сделал тестовый сервис, для подсчета количества отработанных сообщений во Flink, метрика отдается в Prometheus. Условия задачи, нужно смотреть не общее количество сообщений, которые были обработаны всего, а через определенный промежуток времени, т.е., например за каждые 10 сек, нужно посмотреть ,сколько было обработано входящих сообщений.

Локально, если запускать через Idea, вижу, что счетчик меняется. Если запускать через Flink, поднятый через docker-compose.yml, для моего кастомного счетчика, всегда стоит 0, не могу понять в чем причина.

Код привожу: 1)Главный класс:

import info.test.operation.CountMessagesAggregateFunction;
import info.test.operation.OutputProcessWindowFunction;
import info.test.operation.PrometheusExporterMapFunction;
import info.test.operation.RandomNumberSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;

public class AppServiceWithWaterMarkExperiment {
    private static long windowSize = 10L;

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Integer> randomNumber = env
                .addSource(new RandomNumberSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.
                        <Integer>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withIdleness(Duration.ofSeconds(windowSize / 2))
                );

        DataStream<Integer> finalStream = randomNumber
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(3)))
                .sum(0);

        finalStream
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
                .aggregate(new CountMessagesAggregateFunction(), new OutputProcessWindowFunction())
                .map(new PrometheusExporterMapFunction())
                .print();

        env.execute("My Service");
    }
}

2)Код класса Gauge:

import org.apache.flink.metrics.Gauge;

public class OutputGauge implements Gauge<Long> {

    private final String nameMetric;

    private Long value;

    public OutputGauge(String nameMetric, Long value) {
        this.nameMetric = nameMetric;
        this.value = value;
    }

    public void setValue(Long value) {
        this.value = value;
    }

    @Override
    public Long getValue() {
        return value == null ? 0L : value;
    }

    public String getNameMetric() {
        return this.nameMetric;
    }
}

3)Код класса MetricsDTO

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OutputMetricsDto implements Serializable {

    private Long incomingMessageCount;
}

4)Класс подсчета значений:

import info.test.Model.OutputMetricsDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.AggregateFunction;

@Slf4j
public class CountMessagesAggregateFunction implements AggregateFunction<Integer, OutputMetricsDto, OutputMetricsDto> {

    @Override
    public OutputMetricsDto createAccumulator() {
        return OutputMetricsDto.builder()
                .incomingMessageCount(0L)
                .build();
    }

    @Override
    public OutputMetricsDto add(Integer value, OutputMetricsDto accumulator) {
        accumulator.setIncomingMessageCount(accumulator.getIncomingMessageCount() + 1);

        return accumulator;
    }

    @Override
    public OutputMetricsDto getResult(OutputMetricsDto accumulator) {
        return accumulator;
    }

    @Override
    public OutputMetricsDto merge(OutputMetricsDto a, OutputMetricsDto b) {
        OutputMetricsDto mergedResult = new OutputMetricsDto();

        mergedResult.setIncomingMessageCount(a.getIncomingMessageCount() + b.getIncomingMessageCount());

        return mergedResult;
    }
}

5)Класс OutputProcessWindowFunction:

import info.test.Model.OutputMetricsDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

@Slf4j
public class OutputProcessWindowFunction extends ProcessAllWindowFunction<OutputMetricsDto, OutputMetricsDto, TimeWindow> {

    @Override
    public void process(Context context, Iterable<OutputMetricsDto> elements, Collector<OutputMetricsDto> out) {
        OutputMetricsDto result = elements.iterator().next();

        out.collect(result);
    }
}

6)Класс для передачи метрик в Prometheus:

import info.test.Model.OutputGauge;
import info.test.Model.OutputMetricsDto;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;

import java.util.ArrayList;
import java.util.List;

public class PrometheusExporterMapFunction extends RichMapFunction<OutputMetricsDto, OutputMetricsDto> {
    private final String incomingMessageCount = "incomingMessageCount";
    private final String outputMessageCount = "outputMessageCount";

    private transient List<OutputGauge> gauges;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        MetricGroup metricGroup = getRuntimeContext().getMetricGroup()
                .addGroup("serviceName");

        gauges = new ArrayList<>();

        gauges.add(metricGroup.gauge("incomingMessageCount", new OutputGauge(incomingMessageCount, 0L)));    
    }

    @Override
    public OutputMetricsDto map(OutputMetricsDto value) throws Exception {
        System.out.println("===separate===");
        gauges.forEach(gauge -> {
            if (gauge.getNameMetric().equals(incomingMessageCount)) {
                gauge.setValue(gauge.getValue());
                System.out.println(incomingMessageCount + " - " + gauge.getValue());
            }
        });
        return value;
    }
}

7)Класс для генерации бесконечной последовательности случайных Integer чисел:

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class RandomNumberSource implements SourceFunction<Integer> {
    private volatile boolean isRunning = true;
    private Random random;

    public RandomNumberSource() {
        this.random = new Random();
    }

    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(random.nextInt(200));
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }
}

8)Файл pom.xml (верхнюю часть не стал вставлять, а начал только с properties):

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.12.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.1</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.36</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>info.test.AppServiceWithWaterMarkExperiment</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Ответы (0 шт):