Flink公开了一个指标系统,可以收集和暴露指标给外部系统.
你可以调用 getRuntimeContext().getMetricGroup()
方法来访问任何继承自RichFunction函数的用户函数的指标系统.这个方法返回一个MetricGroup
对象,通过这个对象可以创建和注册新的指标.
Flink支持的指标类型:Counters
,Gauges
,Histograms
和Meters
.
Counter
用作某方面计数,通过调用inc()/inc(long n)
或者 dec()/dec(long n)
方法来使当前的值增加或者减少.
通过调用MetricGroup
的counter(String name)
方法可以创建和注册一个Counter
.
public class MyMapper extends RichMapFunction<String, Integer> {
private Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@public Integer map(String value) throws Exception {
this.counter.inc();
}
}
或者你也可以使用自己实现的Counter
:
public class MyMapper extends RichMapFunction<String, Integer> {
private Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter());
}
}
Gauge
按需提供任意类型的值,要使用Gauge
,你必须首先创建一个类并实现org.apache.flink.metrics.Gauge
接口。
这里对返回值的类型没有限制。
可以调用MetricGroup
的gauge(String name,Gauge gauge)
方法来注册一个gauge。
public class MyMapper extends RichMapFunction<String, Integer> {
private int valueToExpose;
@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
@Override
public Integer getValue() {
return valueToExpose;
}
});
}
}
public class MyMapper extends RichMapFunction[String,Int] {
val valueToExpose = 5
override def open(parameters: Configuration): Unit = {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
}
...
}
请注意reporters会将暴露的对象转化成String
型,这意味着需要去实现一个有意义的toString()
方法
Histogram
用于度量长值分布情况,
你可以通过调用MetricGroup
的histogram(String name, Histogram histogram)
方法来注册一个Histogram
public class MyMapper extends RichMapFunction<Long, Integer> {
private Histogram histogram;
@Override
public void open(Configuration config) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
}
@public Integer map(Long value) throws Exception {
this.histogram.update(value);
}
}
Flink没有提供一个默认的Histogram
实现。但是提供了一个Wrapper来允许使用 Codahale/DropWizard 直方图。
如需使用此包装器,请在您的pom.xml
中添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
你可以注册一个Codahale/DropWizard 直方图类似于:
public class MyMapper extends RichMapFunction<Long, Integer> {
private Histogram histogram;
@Override
public void open(Configuration config) {
com.codahale.metrics.Histogram histogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(histogram));
}
}
Meter
用于度量平均吞吐量,使用markEvent()
方法可以注册一个发生的事件.同时发生的多个事件可以使用markEvent(long n)
方法来进行注册。
通过调用MetricGroup
的meter(String name, Meter meter)
方法来注册一个meter
public class MyMapper extends RichMapFunction