Flink兼容Apache Hadoop MapReduce的接口,因此可以使用面向MapReduce的代码。
你可以:
Writable
数据类型(Data type).InputFormat
作为数据源(DataSource).OutputFormat
作为 数据落地(DataSink).Mapper
作为 FlatMapFunction.Reducer
作为 GroupReduceFunction.这篇文档展示如何在Flink中使用现存的Hadoop MapReduce代码。可以参考 连接其他系统 来了解如何从Hadoop支持的文件系统中读取数据。
支持Hadoop的输入输出(input/output)格式是flink-java
和flink-scala
的maven模块的一部分,这两部分是在编写Flink任务时经常需要用到的。 mapred
和mapreduce
的api代码分别在org.apache.flink.api.java.hadoop
和org.apache.flink.api.scala.hadoop
以及一个额外的子包中。
对Hadoop MapReduce的支持是在flink-hadoop-compatibility
的maven模块中。代码具体在org.apache.flink.hadoopcompatibility
包中。
如果想要重复使用Mappers and Reducers
, 需要在maven中的pom.xml中添加下面依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
Flink支持所有的Hadoop Writable
和 WritableComparable
数据类型, 不用额外添加Hadoop Compatibility 依赖。 可以参考Programming Guide了解如何使用Hadoop数据类型(Hadoop data type)。
可以使用Hadoop输入格式来创建数据源,具体是调用 ExecutionEnvironment 的 readHadoopFile 或 createHadoopInput方法。 前者用于来自FileInputFormat的输入格式, 后者用于普通的输入格式。
创建的数据集包含的是一个“键-值”2元组,“值”是从Hadoop输入格式获得的数值。
下面的例子介绍如何使用Hadoop的 TextInputFormat
。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<LongWritable, Text>> input =
env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath);
// Do something with the data.
[...]
val env = ExecutionEnvironment.getExecutionEnvironment
val input: DataSet[(LongWritable, Text)] =
env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)
// Do something with the data.
[...]
Flink提供兼容Hadoop输出格式(Hadoop OutputFormat)的封装。支持任何实现org.apache.hadoop.mapred.OutputFormat
接口或者继承org.apache.hadoop.mapreduce.OutputFormat
的类。输出格式的封装需要的输入是“键值对”形式。他们将会交给Hadoop输出格式处理。
下面的例子介绍如何使用Hadoop的 TextOutputFormat
。
// 获取所需数据
DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
// 创建和初始化Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
new HadoopOutputFormat<Text, IntWritable>(
// 设置Hadoop OutputFormat和特定的job作为初始化参数
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// 通过Hadoop TextOutputFormat输出结果
hadoopResult.output(hadoopOF);
// 获取所需数据
val hadoopResult: DataSet[(Text, IntWritable)] = [...]
val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
new TextOutputFormat[Text, IntWritable],
new JobConf)
hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
hadoopResult.output(hadoopOF)
Hadoop Mappers
语法上等价于Flink的FlatMapFunctions
,Hadoop Reducers
语法上等价于Flink的GroupReduceFunctions
。 Flink同样封装了Hadoop MapReduce
的Mapper and Reducer
接口的实现。 用户可以在Flink程序中复用Hadoop的Mappers and Reducers
。 这时,仅仅org.apache.hadoop.mapred
的Mapper and Reducer接口被支持。
The wrappers take a DataSet<Tuple2<KEYIN,VALUEIN>>
as input and produce a DataSet<Tuple2<KEYOUT,VALUEOUT>>
as output where KEYIN
and KEYOUT
are the keys and VALUEIN
and VALUEOUT
are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (HadoopReduceCombineFunction
) and without a Combiner (HadoopReduceFunction
). The wrappers accept an optional JobConf
object to configure the Hadoop Mapper or Reducer.
封装函数用DataSet<Tuple2<KEYIN,VALUEIN>>
作为输入, 产生DataSet<Tuple2<KEYOUT,VALUEOUT>>
作为输出, 其中KEYIN
和KEYOUT
是“键” ,VALUEIN
和VALUEOUT
是“值”,它们是Hadoop函数处理的键值对。 对于Reducers,Flink将GroupReduceFunction封装成HadoopReduceCombineFunction
,但没有Combiner(HadoopReduceFunction
)。 封装函数接收可选的JobConf
对象来配置Hadoop的Mapper or Reducer。
Flink的方法封装有
org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction
,org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction
, andorg.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction
.
他们可以被用于FlatMapFunctions或GroupReduceFunctions.下面的例子介绍如何使用Hadoop的Mapper
和Reducer
。
// 获取待处理数据
DataSet<Tuple2<Text, LongWritable>> text = [...]
DataSet<Tuple2<Text, LongWritable>> result = text
// 使用Hadoop Mapper (Tokenizer)作为Map函数
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// 使用Hadoop Reducer (Counter)作为Reduce函数
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
需要注意: Reducer封装处理由Flink中的groupBy()定义的groups。 它并不考虑任何在JobConf定义的自定义的分区器(partitioners), 排序(sort)或分组(grouping)的比较器。
下面给出一个完整的使用Hadoop 数据类型, InputFormat/OutputFormat/Mapper/Reducer的示例。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 创建和初始化Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
new HadoopInputFormat<LongWritable, Text>(
new TextInputFormat(), LongWritable.class, Text.class, job
);
TextInputFormat.addInputPath(job, new Path(inputPath));
// 从Hadoop TextInputFormat读取数据.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);
DataSet<Tuple2<Text, LongWritable>> result = text
// 使用Hadoop Mapper (Tokenizer)作为Map函数
.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
new Tokenizer()
))
.groupBy(0)
// 使用Hadoop Reducer (Counter)作为Reduce函数
.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
new Counter(), new Counter()
));
// 创建和初始化Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
new HadoopOutputFormat<Text, IntWritable>(
new TextOutputFormat<Text, IntWritable>(), job
);
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));
// 使用Hadoop TextOutputFormat输出结果.
result.output(hadoopOF);
// 执行程序
env.execute("Hadoop WordCount");