Hadoop Compatibility Beta

Flink兼容Apache Hadoop MapReduce的接口,因此可以使用面向MapReduce的代码。

你可以:

这篇文档展示如何在Flink中使用现存的Hadoop MapReduce代码。可以参考 连接其他系统 来了解如何从Hadoop支持的文件系统中读取数据。

项目配置

支持Hadoop的输入输出(input/output)格式是flink-javaflink-scala的maven模块的一部分,这两部分是在编写Flink任务时经常需要用到的。 mapredmapreduce 的api代码分别在org.apache.flink.api.java.hadooporg.apache.flink.api.scala.hadoop以及一个额外的子包中。

对Hadoop MapReduce的支持是在flink-hadoop-compatibility的maven模块中。代码具体在org.apache.flink.hadoopcompatibility包中。

如果想要重复使用Mappers and Reducers, 需要在maven中的pom.xml中添加下面依赖:

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-hadoop-compatibility_2.10</artifactId>
	<version>1.3-SNAPSHOT</version>
</dependency>

使用Hadoop数据类型

Flink支持所有的Hadoop Writable 和 WritableComparable 数据类型, 不用额外添加Hadoop Compatibility 依赖。 可以参考Programming Guide了解如何使用Hadoop数据类型(Hadoop data type)。

使用Hadoop输入格式

可以使用Hadoop输入格式来创建数据源,具体是调用 ExecutionEnvironment 的 readHadoopFile 或 createHadoopInput方法。 前者用于来自FileInputFormat的输入格式, 后者用于普通的输入格式。

创建的数据集包含的是一个“键-值”2元组,“值”是从Hadoop输入格式获得的数值。

下面的例子介绍如何使用Hadoop的 TextInputFormat

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<LongWritable, Text>> input =
    env.readHadoopFile(new TextInputFormat(), LongWritable.class, Text.class, textPath);

// Do something with the data.
[...]
val env = ExecutionEnvironment.getExecutionEnvironment

val input: DataSet[(LongWritable, Text)] =
  env.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath)

// Do something with the data.
[...]

使用Hadoop输出格式

Flink提供兼容Hadoop输出格式(Hadoop OutputFormat)的封装。支持任何实现org.apache.hadoop.mapred.OutputFormat接口或者继承org.apache.hadoop.mapreduce.OutputFormat的类。输出格式的封装需要的输入是“键值对”形式。他们将会交给Hadoop输出格式处理。

下面的例子介绍如何使用Hadoop的 TextOutputFormat

// 获取所需数据
DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]

// 创建和初始化Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
  new HadoopOutputFormat<Text, IntWritable>(
    // 设置Hadoop OutputFormat和特定的job作为初始化参数
    new TextOutputFormat<Text, IntWritable>(), job
  );
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// 通过Hadoop TextOutputFormat输出结果
hadoopResult.output(hadoopOF);
// 获取所需数据
val hadoopResult: DataSet[(Text, IntWritable)] = [...]

val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
  new TextOutputFormat[Text, IntWritable],
  new JobConf)

hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))

hadoopResult.output(hadoopOF)

使用Hadoop Mappers和Reducers

Hadoop Mappers 语法上等价于Flink的FlatMapFunctionsHadoop Reducers语法上等价于Flink的GroupReduceFunctions。 Flink同样封装了Hadoop MapReduceMapper and Reducer接口的实现。 用户可以在Flink程序中复用Hadoop的Mappers and Reducers。 这时,仅仅org.apache.hadoop.mapred的Mapper and Reducer接口被支持。

The wrappers take a DataSet<Tuple2<KEYIN,VALUEIN>> as input and produce a DataSet<Tuple2<KEYOUT,VALUEOUT>> as output where KEYIN and KEYOUT are the keys and VALUEIN and VALUEOUT are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (HadoopReduceCombineFunction) and without a Combiner (HadoopReduceFunction). The wrappers accept an optional JobConf object to configure the Hadoop Mapper or Reducer.

封装函数用DataSet<Tuple2<KEYIN,VALUEIN>>作为输入, 产生DataSet<Tuple2<KEYOUT,VALUEOUT>>作为输出, 其中KEYINKEYOUT是“键” ,VALUEINVALUEOUT 是“值”,它们是Hadoop函数处理的键值对。 对于Reducers,Flink将GroupReduceFunction封装成HadoopReduceCombineFunction,但没有Combiner(HadoopReduceFunction)。 封装函数接收可选的JobConf对象来配置Hadoop的Mapper or Reducer。

Flink的方法封装有

  • org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction,
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction, and
  • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction. 他们可以被用于FlatMapFunctionsGroupReduceFunctions.

下面的例子介绍如何使用Hadoop的MapperReducer

// 获取待处理数据
DataSet<Tuple2<Text, LongWritable>> text = [...]

DataSet<Tuple2<Text, LongWritable>> result = text
  // 使用Hadoop Mapper (Tokenizer)作为Map函数
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
    new Tokenizer()
  ))
  .groupBy(0)
  // 使用Hadoop Reducer (Counter)作为Reduce函数
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
    new Counter(), new Counter()
  ));

需要注意: Reducer封装处理由Flink中的groupBy()定义的groups。 它并不考虑任何在JobConf定义的自定义的分区器(partitioners), 排序(sort)或分组(grouping)的比较器。

完整Hadoop WordCount示例

下面给出一个完整的使用Hadoop 数据类型, InputFormat/OutputFormat/Mapper/Reducer的示例。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 创建和初始化Hadoop TextInputFormat.
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopIF =
  new HadoopInputFormat<LongWritable, Text>(
    new TextInputFormat(), LongWritable.class, Text.class, job
  );
TextInputFormat.addInputPath(job, new Path(inputPath));

// 从Hadoop TextInputFormat读取数据.
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);

DataSet<Tuple2<Text, LongWritable>> result = text
  // 使用Hadoop Mapper (Tokenizer)作为Map函数
  .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
    new Tokenizer()
  ))
  .groupBy(0)
  // 使用Hadoop Reducer (Counter)作为Reduce函数
  .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
    new Counter(), new Counter()
  ));

// 创建和初始化Hadoop TextOutputFormat.
HadoopOutputFormat<Text, IntWritable> hadoopOF =
  new HadoopOutputFormat<Text, IntWritable>(
    new TextOutputFormat<Text, IntWritable>(), job
  );
hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
TextOutputFormat.setOutputPath(job, new Path(outputPath));

// 使用Hadoop TextOutputFormat输出结果.
result.output(hadoopOF);

// 执行程序
env.execute("Hadoop WordCount");