本章包含了一系列关于flink 编程人员如何处理常见问题的最佳实践
大多数Flink应用,包含批量计量和流式计算都依赖外部配置参数。
例如 指定输入和输出来源(像路径和地址)、系统参数(并行、运行时配置)和应用参数(经常在用户的函数中使用到).
从0.9版本我们就提供了一个简单的基本工具:ParameterTool
,用于解决这些问题。
你也可以不使用这里提到的ParameterTool
工具。其他框架,例如Commons CLI,
argparse4j 和flink也可以集成得很好。
ParameterTool
的配置项ParameterTool
提供了一系列预定义好的读取配置项的表态方法。这个工具内部只需要一个Map<String, String>
参数,所以它非常容易和你的配置风格集成。
.properties
的文件下面方法将读取一个Properties文件,然后提供键/值对:
String propertiesFile = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
下面例子将从命令行得到--input hdfs:///mydata --elements 42
参数
public static void main(String[] args) {
ParameterTool parameter = ParameterTool.fromArgs(args);
// .. regular code ..
当启动jvm时,你可以设置系统属性:-Dinput=hdfs:///mydata
。你也可以使用如下系统属性初始化ParameterTool
:
ParameterTool parameter = ParameterTool.fromSystemProperties();
现在我们已经知道如何从多种途径来获取参数(参考上面的例子),
直接了解ParameterTool
ParameterTool
本身有获取这些值的方法
ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.
你可以在main()方法中直接使用这些方法的返回值(=客户端提交应用). 例如你可以像这样设置使用方的并发数:
ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);
因为ParameterTool
是可序列化的,所以你可以像这样把他传递给函数本身:
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
然后在方法内部使用命令行上的值
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));
下例将展示如何将参数作为配置对象传递给用户定义的方法。
ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())
在Tokenizer
里,可以通过open(Configuration conf)
方法来获取这个对象。
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void open(Configuration parameters) throws Exception {
parameters.getInteger("myInt", -1);
// .. do
在ExecutionConfig
把参数注册为全局任务参数,你可以通过任务管理的web接口和用户定义的方法来获取这些配置
注册全局参数
ParameterTool parameters = ParameterTool.fromArgs(args);
//建立执行环境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
在用户方法中获取这些全局参数
public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
parameters.getRequired("input");
// .. do more ..
在多字段的数据类型中,强烈推荐使用POJOs(Plain old Java objects)代替TupleX
。POJOs也可以用来给大Tuple
类型命名。
案例
在使用 ~~~java Tuple11<String, String, …, String> var = new …; ~~~
我们更倾向于创建一个扩展了大Tuple类型的自定义类型: ~~~java CustomType var = new …;
public static class CustomType extends Tuple11<String, String, …, String> { // constructor matching super } ~~~
注:本手册适用于Flink 0.10后的版本
Apache Flink在代码中使用slf4j作日志抽象接口。我们也建议用户在他们的用户方法中使用sfl4j。
Sfl4j是一个可以在运行时使用不同日志实现(例如:log4j 或 Logback)的编译时日志接口。
Flink 默认依赖Log4j。本章将介绍如何使用Logback。用户也能使用本手册通过Graylog建立中心化的日志。
使用如下代码,获取日志实例:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyClass implements MapFunction {
private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
// ...
在通过依赖管理软件如Maven配置类路径执行类的情况下,Flink将把log4j添加到类路径。
因此,你需要把log4j从Flink的依赖中排除掉,下面的配置假设是一个从Flink quickstart创建出来的Maven项目。
你可以像这样来修改项目的pom.xml
文件:
<dependencies>
<!-- Add the two required logback dependencies -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.3</version>
</dependency>
<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
Hadoop is logging to log4j! -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
下列<dependencies>
部份已经修改完成
从Flink依赖中排除所有log4j
的依赖:
Maven将忽略Flink中log4j的相关依赖
从Flink依赖中排除slf4j-log4j12
部份:
因为我们将绑定slf4j和logback,必须把slf4j和log4j的绑定关系删除掉
添加Logback依赖:logback-core
和 logback-classic
添加log4j-over-slf4j的依赖。log4j-over-slf4j
是一个允许应用程序直接使用Log4j接口来使用Slf4j接口的工具。Flink依赖的Hadoop直接使用Log4j来记录日志。所以我们必须重新所有的日志请求从Log4j到Slf4j,然后再记录到Logback。
你必须手动在所有你正在添加到pom文件中的Flink新依赖项中排除掉这些排除项。
你也需要检查下非Flink的其他依赖是否绑定了log4j.你可以通过mvn dependency:tree
来分析项目中的依赖。
本手册同样适用于以独立集群的方式在YARN上运行Flink
为了在Flink中使用Logback代替Log4j,你需要在lib/
目录下删除log4j-1.2.xx.jar
和 sfl4j-log4j12-xxx.jar
然后,你需要在lib/
目录下添加如下jar文件:
logback-classic.jar
logback-core.jar
log4j-over-slf4j.jar
:请注意,在使用YARN集群时,你需要显式设置lib/
目录
在YARN上使用Flink,设置自定义的日志命令是:./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>