Flink подключение метрик для Prometheus (Java)
мне нужна помощь.
Сделал тестовый сервис, для подсчета количества отработанных сообщений во Flink, метрика отдается в Prometheus. Условия задачи, нужно смотреть не общее количество сообщений, которые были обработаны всего, а через определенный промежуток времени, т.е., например за каждые 10 сек, нужно посмотреть ,сколько было обработано входящих сообщений.
Локально, если запускать через Idea, вижу, что счетчик меняется. Если запускать через Flink, поднятый через docker-compose.yml, для моего кастомного счетчика, всегда стоит 0, не могу понять в чем причина.
Код привожу: 1)Главный класс:
import info.test.operation.CountMessagesAggregateFunction;
import info.test.operation.OutputProcessWindowFunction;
import info.test.operation.PrometheusExporterMapFunction;
import info.test.operation.RandomNumberSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
public class AppServiceWithWaterMarkExperiment {
private static long windowSize = 10L;
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> randomNumber = env
.addSource(new RandomNumberSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.
<Integer>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withIdleness(Duration.ofSeconds(windowSize / 2))
);
DataStream<Integer> finalStream = randomNumber
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(3)))
.sum(0);
finalStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.aggregate(new CountMessagesAggregateFunction(), new OutputProcessWindowFunction())
.map(new PrometheusExporterMapFunction())
.print();
env.execute("My Service");
}
}
2)Код класса Gauge:
import org.apache.flink.metrics.Gauge;
public class OutputGauge implements Gauge<Long> {
private final String nameMetric;
private Long value;
public OutputGauge(String nameMetric, Long value) {
this.nameMetric = nameMetric;
this.value = value;
}
public void setValue(Long value) {
this.value = value;
}
@Override
public Long getValue() {
return value == null ? 0L : value;
}
public String getNameMetric() {
return this.nameMetric;
}
}
3)Код класса MetricsDTO
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class OutputMetricsDto implements Serializable {
private Long incomingMessageCount;
}
4)Класс подсчета значений:
import info.test.Model.OutputMetricsDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.AggregateFunction;
@Slf4j
public class CountMessagesAggregateFunction implements AggregateFunction<Integer, OutputMetricsDto, OutputMetricsDto> {
@Override
public OutputMetricsDto createAccumulator() {
return OutputMetricsDto.builder()
.incomingMessageCount(0L)
.build();
}
@Override
public OutputMetricsDto add(Integer value, OutputMetricsDto accumulator) {
accumulator.setIncomingMessageCount(accumulator.getIncomingMessageCount() + 1);
return accumulator;
}
@Override
public OutputMetricsDto getResult(OutputMetricsDto accumulator) {
return accumulator;
}
@Override
public OutputMetricsDto merge(OutputMetricsDto a, OutputMetricsDto b) {
OutputMetricsDto mergedResult = new OutputMetricsDto();
mergedResult.setIncomingMessageCount(a.getIncomingMessageCount() + b.getIncomingMessageCount());
return mergedResult;
}
}
5)Класс OutputProcessWindowFunction:
import info.test.Model.OutputMetricsDto;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
@Slf4j
public class OutputProcessWindowFunction extends ProcessAllWindowFunction<OutputMetricsDto, OutputMetricsDto, TimeWindow> {
@Override
public void process(Context context, Iterable<OutputMetricsDto> elements, Collector<OutputMetricsDto> out) {
OutputMetricsDto result = elements.iterator().next();
out.collect(result);
}
}
6)Класс для передачи метрик в Prometheus:
import info.test.Model.OutputGauge;
import info.test.Model.OutputMetricsDto;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import java.util.ArrayList;
import java.util.List;
public class PrometheusExporterMapFunction extends RichMapFunction<OutputMetricsDto, OutputMetricsDto> {
private final String incomingMessageCount = "incomingMessageCount";
private final String outputMessageCount = "outputMessageCount";
private transient List<OutputGauge> gauges;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MetricGroup metricGroup = getRuntimeContext().getMetricGroup()
.addGroup("serviceName");
gauges = new ArrayList<>();
gauges.add(metricGroup.gauge("incomingMessageCount", new OutputGauge(incomingMessageCount, 0L)));
}
@Override
public OutputMetricsDto map(OutputMetricsDto value) throws Exception {
System.out.println("===separate===");
gauges.forEach(gauge -> {
if (gauge.getNameMetric().equals(incomingMessageCount)) {
gauge.setValue(gauge.getValue());
System.out.println(incomingMessageCount + " - " + gauge.getValue());
}
});
return value;
}
}
7)Класс для генерации бесконечной последовательности случайных Integer чисел:
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
public class RandomNumberSource implements SourceFunction<Integer> {
private volatile boolean isRunning = true;
private Random random;
public RandomNumberSource() {
this.random = new Random();
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (isRunning) {
ctx.collect(random.nextInt(200));
}
}
@Override
public void cancel() {
this.isRunning = false;
}
}
8)Файл pom.xml (верхнюю часть не стал вставлять, а начал только с properties):
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.36</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>info.test.AppServiceWithWaterMarkExperiment</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>