FlinkML是Flink内部的机器学习工具库。它是Flink生态圈的新组件,社区成员不断向它贡献新的算法。 FlinkML目标是提供可扩展的机器学习算法,良好的API和工具来使构建端对端的机器学习系统的工作量最小化。 你可以查阅路线图了解更多关于FlinkML的目标和趋势。
Flink目前支持以下算法:
你可以通过我们的快速入门指南中的例子了解概况。
如果你想直接实践, 你可需要创建一个Flink程序.
然后后,在你项目的pom.xml
中加入FlinkML的依赖。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ml_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
需要注意的是FlinkML目前没有编译进二进制文件版。 点击这里.了解如何在集群中链接不在二进制文件中的库。
至此,你可以开始你的分析任务了。 下面的代码片段展示了使用FlinkML可以非常简单地训练一个多元线性回归模型。
// LabeledVector是一个带有标签的特征向量,标签可以使一个类或者是一个实值。
val trainingData: DataSet[LabeledVector] = ...
val testingData: DataSet[Vector] = ...
//可以选用Splitter来将一个数据集分割成训练集和测试集。
val dataSet: DataSet[LabeledVector] = ...
val trainTestData: DataSet[TrainTestDataSet] = Splitter.trainTestSplit(dataSet)
val trainingData: DataSet[LabeledVector] = trainTestData.training
val testingData: DataSet[Vector] = trainTestData.testing.map(lv => lv.vector)
val mlr = MultipleLinearRegression()
.setStepsize(1.0)
.setIterations(100)
.setConvergenceThreshold(0.001)
mlr.fit(trainingData)
// 调试好的模型可以用来做预测。
val predictions: DataSet[LabeledVector] = mlr.predict(testingData)
FlinkML的一个关键概念是它基于scikit-learn 的管道机制。 它能帮助你快速建立复杂的数据分析管道,这是每一位数据分析师日常工作中不可或缺的部分。 你可以在这里了解Flink Pipeline的详细情况。
下面的代码片段展示了使用FlinkML可以非常简单地创建数据分析管道。
val trainingData: DataSet[LabeledVector] = ...
val testingData: DataSet[Vector] = ...
val scaler = StandardScaler()
val polyFeatures = PolynomialFeatures().setDegree(3)
val mlr = MultipleLinearRegression()
// 构建标准化,多项式特征和多元线性回归的管道
val pipeline = scaler.chainTransformer(polyFeatures).chainPredictor(mlr)
// 训练管道
pipeline.fit(trainingData)
// 预测计算
val predictions: DataSet[LabeledVector] = pipeline.predict(testingData)
通过方法 chainTransformer
可以将一个Transformer
和另一个或多个Transformer
链接在一起。
而通过方法 chainPredictor
可以将一个 Predictor
和一个或多个Transformer
链接在一起。
Flink社区欢迎所有有志提高Flink及其相关库的贡献者。为了方便快速了解贡献的方法,请参看我们的官方贡献指南.