Flink公开了一个指标系统,可以收集和暴露指标给外部系统.
你可以调用 getRuntimeContext().getMetricGroup()
方法来访问任何继承自RichFunction函数的用户函数的指标系统.这个方法返回一个MetricGroup
对象,通过这个对象可以创建和注册新的指标.
Flink支持的指标类型:Counters
,Gauges
,Histograms
和Meters
.
Counter
用作某方面计数,通过调用inc()/inc(long n)
或者 dec()/dec(long n)
方法来使当前的值增加或者减少.
通过调用MetricGroup
的counter(String name)
方法可以创建和注册一个Counter
.
public class MyMapper extends RichMapFunction<String, Integer> {
private Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@public Integer map(String value) throws Exception {
this.counter.inc();
}
}
或者你也可以使用自己实现的Counter
:
public class MyMapper extends RichMapFunction<String, Integer> {
private Counter counter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCustomCounter", new CustomCounter());
}
}
Gauge
按需提供任意类型的值,要使用Gauge
,你必须首先创建一个类并实现org.apache.flink.metrics.Gauge
接口。
这里对返回值的类型没有限制。
可以调用MetricGroup
的gauge(String name,Gauge gauge)
方法来注册一个gauge。
public class MyMapper extends RichMapFunction<String, Integer> {
private int valueToExpose;
@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
@Override
public Integer getValue() {
return valueToExpose;
}
});
}
}
public class MyMapper extends RichMapFunction[String,Int] {
val valueToExpose = 5
override def open(parameters: Configuration): Unit = {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
}
...
}
请注意reporters会将暴露的对象转化成String
型,这意味着需要去实现一个有意义的toString()
方法
Histogram
用于度量长值分布情况,
你可以通过调用MetricGroup
的histogram(String name, Histogram histogram)
方法来注册一个Histogram
public class MyMapper extends RichMapFunction<Long, Integer> {
private Histogram histogram;
@Override
public void open(Configuration config) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
}
@public Integer map(Long value) throws Exception {
this.histogram.update(value);
}
}
Flink没有提供一个默认的Histogram
实现。但是提供了一个Wrapper来允许使用 Codahale/DropWizard 直方图。
如需使用此包装器,请在您的pom.xml
中添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
你可以注册一个Codahale/DropWizard 直方图类似于:
public class MyMapper extends RichMapFunction<Long, Integer> {
private Histogram histogram;
@Override
public void open(Configuration config) {
com.codahale.metrics.Histogram histogram =
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new DropwizardHistogramWrapper(histogram));
}
}
Meter
用于度量平均吞吐量,使用markEvent()
方法可以注册一个发生的事件.同时发生的多个事件可以使用markEvent(long n)
方法来进行注册。
通过调用MetricGroup
的meter(String name, Meter meter)
方法来注册一个meter
public class MyMapper extends RichMapFunction<Long, Integer> {
private Meter meter;
@Override
public void open(Configuration config) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
}
@public Integer map(Long value) throws Exception {
this.meter.markEvent();
}
}
Flink提供了一个 Wrapper来允许使用 Codahale/DropWizard meters. 要使用此包装器,请在您的pom.xml
中添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
您可以注册Codahale / DropWizard meter类似于这样:
public class MyMapper extends RichMapFunction<Long, Integer> {
private Meter meter;
@Override
public void open(Configuration config) {
com.codahale.metrics.Meter meter = new com.codahale.metrics.Meter();
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(meter));
}
}
每个指标被分配一个标识符,该标识符将基于3个组件进行汇报:注册指标时用户提供的名称,可选的用户自定义域和系统提供的域。例如,如果A.B
是系统域,C.D
是用户域,E
是名称,那么指标的标识符将是A.B.C.D.E
.
你可以通过设置conf/flink-conf.yam
里面的metrics.scope.delimiter
参数来配置标识符的分隔符(默认:.
).
你可以通过调用MetricGroup#addGroup(String name)
和MetricGroup#addGroup(int name)
来定义一个用户域
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter");
系统域包含关于这个指标的上下文信息,例如其注册的任务或该任务属于哪个作业.
可以通过在conf/flink-conf.yaml
中设置以下关键字来配置它的上下文信息.这些关键字的每一个都期望可以包含常量的格式字符串(例如:“taskmanager”)和将在运行时被替换的变量(例如:”<task_id>“)
metrics.scope.jm
metrics.scope.jm.job
metrics.scope.tm
metrics.scope.tm.job
metrics.scope.task
metrics.scope.operator
这里对变量的数量和顺序没有限制,变量区分大小写.
运算指标的默认域将导致类似的标识符:
localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric
如果你想包含任务名称,但省略task manager的信息,你可以指定以下格式:
metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>
这可以创建标识符localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric
.
注意对于此格式的字符串,如果同一作业同时运行多次,则可能会发生标识符冲突,导致指标标准数据不一致.因此,建议使用格式字符串,通过包括ID(例如 <job_id>)或通过为作业和操作符分配唯一的名称来保证一定程度上的唯一性.
通过在conf/flink-conf.yaml
中配置一个或者一些reporters,能够让指标暴露给一个外部系统.这些reporters将在每个job和task manager启动时被实例化.
metrics.reporters
: reporters的名称列表.metrics.reporter.<name>.<config>
: 给定reporter名称<name>
的通用设置.metrics.reporter.<name>.class
: 给定reporter名称<name>
的reporter类 .metrics.reporter.<name>.interval
: 给定reporter名称<name>
的reporter间隔.metrics.reporter.<name>.scope.delimiter
: 给定reporter名称<name>
所使用的分割符标识(默认值用:metrics.scope.delimiter
)所有的reporters必须至少具备class
属性,有些允许指定一个reporting的interval
,以下,我们将列举更多针对每个reporter的设置.
举例说明指定多个reporters的配置
metrics.reporters: my_jmx_reporter,my_other_reporter
metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
重要提示:当Flink启动的时候,通过放入到/lib目录下包含reporter的jar文件必须可访问.
你可以通过实现org.apache.flink.metrics.reporter.MetricReporter
接口来定义自己的Reporter
, 如果这个Reporter必须定期发送报告,那你也必须同时实现Scheduled
接口.
下面的章节列举了支持的reporters.
不必包含其他依赖关系,因为JMX reporter默认可用,但是并没有被激活
参数:
- port
- (可选) JMX侦听连接的端口,也可以是端口范围。当指定范围时,相关job或者task manager 日志将显示实际端口。如果设置了此设置,Flink将为给定的端口/范围启动一个额外的JMX连接器。在默认本地JMX接口上指标始终可用.
示例配置:
metrics.reporters: jmx
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789
通过JMX暴露出来的指标由一个域和一个键-属性列表来标识,它们一起形成对象名称。
域始终以 org.apache.flink打头,后面跟着通用指标标识。相对于常用的标识符,它不受域格式影响,不包含任何变量,并且作业之间是不变的,这样域的一个列子是:
org.apache.flink.job.task.numBytesOut
.
键-属性列表包含所有变量的值,与配置的域格式无关,它们与给定的指标相关联,这样列表的列子是:
host=localhost,job_name=MyJob,task_name=MyTask
.
因此域识别一个指标类,而键-属性性列表识别一个(或多个)指标实例
为了使用这个reporter,你必须将/opt/flink-metrics-ganglia-1.3-SNAPSHOT.jar
拷贝到Flink发行版本下的/lib
文件夹中
参数:
host
- 在gmond.conf
中的udp_recv_channel.bind
下配置的gmond主机地址port
- 在gmond.conf
中的udp_recv_channel.port
下配置的gmond端口tmax
- 旧指标能够保留软性限制的最长时间dmax
- 旧指标能够保留硬性限制的最长时间ttl
- 传输UDP包的生存时间addressingMode
- UDP使用的寻址模式(UNICAST/MULTICAST)示例配置:
metrics.reporters: gang
metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST
为了使用这个reporter,你必须将/opt/flink-metrics-graphite-1.3-SNAPSHOT.jar
拷贝到Flink发行版本下的/lib
文件夹中
参数:
host
- Graphite 服务器地址port
- Graphite 服务器端口protocol
- 使用的协议 (TCP/UDP)示例配置:
metrics.reporters: grph
metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP
为了使用reporter,你必须将/opt/flink-metrics-statsd-1.3-SNAPSHOT.jar
拷贝到Flink发行版本下/lib
文件夹中
参数:
host
- the StatsD 服务器地址port
- the StatsD 服务器端口示例配置
metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125
默认情况下,Flink收集了几个能够深入了解当前状态的指标,本章节是所有这些指标的一个参考 以下表格通常有4列:
“Scope”列描述了用于生成系统域的域格式,例如,如果单元格包含“Operator”,则使用“metric.scope.operator”的作用域格式,如果单元格包含以斜杠分割的多个值,则会根据不同的实体报告多个指标,例如job-和taskmanagers.
“Infix”(可选) 列描述了哪些中缀附加到系统域中.
“Metrics” 列中列出了给定域和中缀的所有注册指标的名称.
“Description” 列提供有关给定指标度量的相关信息.
请注意,中缀/指标名称列中的所有点仍然遵循“metrics.delimiter”设置,因此,为了推断指标标识符:
Scope | Infix | Metrics | Description |
---|---|---|---|
Job-/TaskManager | Status.JVM.CPU | Load | JVM最近CPU使用情况. |
Time | JVM使用的CPU时间. |
Scope | Infix | Metrics | Description |
---|---|---|---|
Job-/TaskManager | Status.JVM.Memory | Memory.Heap.Used | 当前使用的堆内存大小. |
Heap.Committed | 保证JVM可用的堆内存大小. | ||
Heap.Max | 可用于内存管理的堆内存最大值. | ||
NonHeap.Used | 当前使用的非堆内存大小. | ||
NonHeap.Committed | 保证JVM可用的非堆内存大小. | ||
NonHeap.Max | 可用于内存管理的非堆内存最大值. | ||
Direct.Count | 直接缓冲池中的缓冲区数量. | ||
Direct.MemoryUsed | JVM中用于直接缓冲池的内存大小. | ||
Direct.TotalCapacity | 直接缓冲池中所有缓冲区的总容量. | ||
Mapped.Count | 映射缓冲池中缓冲区的数量. | ||
Mapped.MemoryUsed | JVM中用于映射缓冲池的内存大小. | ||
Mapped.TotalCapacity | 映射缓冲池中缓冲区的数量. |
Scope | Infix | Metrics | Description |
---|---|---|---|
Job-/TaskManager | Status.JVM.ClassLoader | Threads.Count | 存活线程总数. |
Scope | Infix | Metrics | Description |
---|---|---|---|
Job-/TaskManager | Status.JVM.GarbageCollector | <GarbageCollector>.Count | 已发生的回收总数. |
<GarbageCollector>.Time | 执行垃圾回收花费的总时间. |
Scope | Infix | Metrics | Description |
---|---|---|---|
Job-/TaskManager | Status.JVM.ClassLoader | ClassesLoaded | 自JVM启动以来加载类的总数. |
ClassesUnloaded | 自JVM启动以来卸载类的总数. |
Scope | Infix | Metrics | Description |
---|---|---|---|
TaskManager | Status.Network | AvailableMemorySegments | 未使用的内存段数. |
TotalMemorySegments | 已分配的内存段数. | ||
Task | buffers | inputQueueLength | 队列输入缓冲区的数量. |
outputQueueLength | 队列输出缓冲区的数量. | ||
inPoolUsage | 输入缓冲区使用情况评估. | ||
outPoolUsage | 输出缓冲区使用情况评估. | ||
Network.<Input|Output>.<gate> (only available if taskmanager.net.detailed-metrics config option is set) |
totalQueueLen | 所有输入/输出通道中队列缓冲区的总数. | |
minQueueLen | 所有输入/输出通道中队列缓冲区的最小数目. | ||
maxQueueLen | 所有输入/输出通道中队列缓冲区的最大数目. | ||
avgQueueLen | 所有输入/输出通道中队列缓冲区的平均数目. |
Scope | Metrics | Description |
---|---|---|
JobManager | numRegisteredTaskManagers | 已注册taskmanagers的数量. |
numRunningJobs | 正在运行jobs的数量. | |
taskSlotsAvailable | 可用task slots的数量 | |
taskSlotsTotal | task slots总数. |
Scope | Metrics | Description |
---|---|---|
Job (only available on JobManager) | lastCheckpointDuration | 完成上一次检测点所花费的时间. |
lastCheckpointSize | 上一次检测点的总大小. | |
lastCheckpointExternalPath | 上一个检测点存储的路径. | |
Task | checkpointAlignmentTime | 最后一个障碍对齐所需的纳秒时间,或者当前对齐已经花费了多长时间. |
Scope | Metrics | Description |
---|---|---|
Task | currentLowWatermark | 该任务已经获得的最低水位. |
numBytesInLocal | 该任务从本地源读取的字节总数. | |
numBytesInLocalPerSecond | 该任务从本地源每秒读取的字节数. | |
numBytesInRemote | 该任务从远端读取的字节总数. | |
numBytesInRemotePerSecond | 该任务从远端每秒读取的字节数. | |
numBytesOut | 该任务已发出的字节总数. | |
numBytesOutPerSecond | 该任务每秒发出的字节数. | |
Task/Operator | numRecordsIn | 该任务/操作已收到的条目总数. |
numRecordsInPerSecond | 该任务/操作每秒收到的条目数. | |
numRecordsOut | 该操作/任务已发出的条目总数. | |
numRecordsOutPerSecond | 该操作/任务每秒发出的条目数. | |
Operator | latency | 所有输入源的延迟分布. |
numSplitsProcessed | 数据源已经处理的输入分片总数(如果操作是一个数据源). |
Flink允许去跟踪条目在整个系统中运行的延迟,为了开启延迟跟踪,latencyTrackingInterval
(毫秒)必须在ExecutionConfig
中设置为一个正值.
在latencyTrackingInterval
,源端将周期性的发送一个特殊条目,叫做LatencyMarker
,这个标记包含一个从源端发出记录时的时间戳。延迟标记不能超过常规的用户条目,因此如果条目在一个操作的前面排队,将会通过这个标记添加延迟跟踪.
请注意延迟标记不记录用户条目在操作中所花费的时间,而是绕过它们。特别是这个标记是不用于记录在窗口缓冲区中的时间条目。只有当操作不能够接受新的条目时,它们才会排队,用这个标记测量的延迟将会反映出这一点.
所有中间操作通过保留每个源的最后n
个延迟的列表,来计算一个延迟的分布。落地操作保留每个源的列表,然后每个并行源实例允许检测由单个机器所引起的延迟问题.
目前,Flink认为集群中所有机器的时钟是同步的。我们建议建立一个自动时钟同步服务(类似于NTP),以避免虚假的延迟结果.
为每个任务或者操作所收集到的指标,也可以展示在在仪表盘上。在一个作业的主页面,选择Metrics
选项,在顶部图选择一个任务后,可以使用Add Metrics
下拉菜单来选择要展示的指标值
* 任务指标被列为 <subtask_index>.<metric_name>
.
* 操作指标被列为 <subtask_index>.<operator_name>.<metric_name>
.
每个指标被可视化为一个单独的图表,用x轴表示时间和y轴表示测量值。所有的图表每10秒自动更新一次,并在导航到另一页时继续执行.
这里对可视化指标的数量没有限制;但是只有数值型指标可以可视化。