Metrics

Flink公开了一个指标系统,可以收集和暴露指标给外部系统.

Registering metrics

你可以调用 getRuntimeContext().getMetricGroup()方法来访问任何继承自RichFunction函数的用户函数的指标系统.这个方法返回一个MetricGroup对象,通过这个对象可以创建和注册新的指标.

Metric types

Flink支持的指标类型:Counters,Gauges,HistogramsMeters.

Counter

Counter用作某方面计数,通过调用inc()/inc(long n) 或者 dec()/dec(long n)方法来使当前的值增加或者减少. 通过调用MetricGroupcounter(String name)方法可以创建和注册一个Counter.

public class MyMapper extends RichMapFunction<String, Integer> {
  private Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @public Integer map(String value) throws Exception {
    this.counter.inc();
  }
}

或者你也可以使用自己实现的Counter

public class MyMapper extends RichMapFunction<String, Integer> {
  private Counter counter;

  @Override
  public void open(Configuration config) {
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCustomCounter", new CustomCounter());
  }
}

Gauge

Gauge按需提供任意类型的值,要使用Gauge,你必须首先创建一个类并实现org.apache.flink.metrics.Gauge接口。 这里对返回值的类型没有限制。 可以调用MetricGroupgauge(String name,Gauge gauge)方法来注册一个gauge。

public class MyMapper extends RichMapFunction<String, Integer> {
  private int valueToExpose;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }
}
public class MyMapper extends RichMapFunction[String,Int] {
  val valueToExpose = 5

  override def open(parameters: Configuration): Unit = {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
  }
  ...
}

请注意reporters会将暴露的对象转化成String型,这意味着需要去实现一个有意义的toString()方法

Histogram

Histogram用于度量长值分布情况, 你可以通过调用MetricGrouphistogram(String name, Histogram histogram)方法来注册一个Histogram

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @public Integer map(Long value) throws Exception {
    this.histogram.update(value);
  }
}

Flink没有提供一个默认的Histogram实现。但是提供了一个Wrapper来允许使用 Codahale/DropWizard 直方图。 如需使用此包装器,请在您的pom.xml中添加以下依赖:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.3-SNAPSHOT</version>
</dependency>

你可以注册一个Codahale/DropWizard 直方图类似于:

public class MyMapper extends RichMapFunction<Long, Integer> {
  private Histogram histogram;

  @Override
  public void open(Configuration config) {
    com.codahale.metrics.Histogram histogram =
      new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));

    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new DropwizardHistogramWrapper(histogram));
  }
}

Meter

Meter用于度量平均吞吐量,使用markEvent()方法可以注册一个发生的事件.同时发生的多个事件可以使用markEvent(long n)方法来进行注册。 通过调用MetricGroupmeter(String name, Meter meter)方法来注册一个meter

public class MyMapper extends RichMapFunction