$$ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \newcommand\rfrac[2]{^{#1}\!/_{#2}} \newcommand{\norm}[1]{\left\lVert#1\right\rVert} $$

快速入门指南

介绍

FlinkML 旨在从您的数据中学习一个简单的过程,抽象出来通常带有大数据学习任务的复杂性。 在这个快速入门指南中,我们将展示使用 FlinkML 解决一个简单的监督学习问题是多么的容易。 但是首先要介绍一些基础知识,如果你已经熟悉机器学习(ML),请随时跳过接下来的几行。

如 Murphy [1] 所定义的,机器学习(ML)用于检测数据中的模式,并使用这些学习到的模式来预测未来。 我们可以将大多数机器学习(ML)算法分为两大类:监督学习和无监督学习。

  • 监督学习 涉及从一个输入(特征)集合到一个输出集合学习一个函数(映射)。 学习是通过使用我们用来近似映射函数的(输入,输出)对训练集来完成的。监督学习问题进一步分为分类问题和回归问题。在分类问题中,我们尝试预测样例属于的类,例如用户是否要点击广告。另一方面,回归问题是要预测(实际的)数值,这个数值通常称为因变量,例如明天的温度是多少。

  • 无监督学习 用来发现数据中的模式和规律。 一个例子是聚类,我们尝试从描述性的特征中发现数据分组。 无监督学习也可用于特征选择,例如通过 主成分分析(principal components analysis) 进行特征选择。

连接 FlinkML

为了在您的项目中使用 FlinkML ,首先您必须建立一个 Flink 程序(//flink.iteblog.com/dev/linking_with_flink.html)。 . 接下来,您必须将 FlinkML 的依赖添加到项目的 pom.xml 中:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-ml_2.10</artifactId>
  <version>1.3-SNAPSHOT</version>
</dependency>

加载数据

要加载与 FlinkML 一起使用的数据,我们可以使用 Flink 的 ETL 功能,或者使用处理诸如 LibSVM 格式的格式化数据的专门方法。 对于监督学习问题,通常使用 LabeledVector 类来表示 (标记,特征) 样例。 LabeledVector 对象将具有表示样例特征的 FlinkML Vector 成员,以及表示标记的 Double 成员,该标记可能是分类问题中的类,也可以是回归问题的因变量。

例如,我们可以使用 Haberman’s Survival 数据集,您可以从 UCI 机器学习数据库下载这个数据集。 该数据集“包含了对乳腺癌手术患者的存活进行研究的病例”。 数据来自逗号分隔的文件,前3列是特征,最后一列是类,第4列表示患者是否存活5年以上(标记1),或者5年内死亡(标记2)。 您可以查看 UCI 页面了解有关数据的更多信息。

我们可以先把数据加载为一个 DataSet[String]

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

val survival = env.readCsvFile[(String, String, String, String)]("/path/to/haberman.data")

我们现在可以将数据转换成 DataSet[LabeledVector] 。 这将允许我们使用 FlinkML 分类算法的数据集。 我们知道数据集的第四个元素是类标记,其余的是特征,所以我们可以像这样构建 LabeledVector 元素:

import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector

val survivalLV = survival
  .map{tuple =>
    val list = tuple.productIterator.toList
    val numList = list.map(_.asInstanceOf[String].toDouble)
    LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
  }

然后,我们可以使用这些数据来训练一个学习器。然而,我们将使用另一个数据集来示例建立学习器;这将让我们展示如何导入其他数据集格式。

LibSVM 文件

机器学习数据集的通用格式是 LibSVM 格式,并且可以在 LibSVM 数据集网站中找到使用该格式的多个数据集。 FlinkML 提供了通过 MLUtils 对象的 readLibSVM 函数加载 LibSVM 格式的数据集的实用程序。 您还可以使用 writeLibSVM 函数以 LibSVM 格式保存数据集。 让我们导入 svmguide1 数据集。 您可以在这里下载训练集测试集。 这是一个二进制分类数据集,由 Hsu 等人 [3] 在他们的实用支持向量机(SVM)指南中使用。 它包含4个数字特征和它的类标记。

我们可以简单地使用下面的代码导入数据集:

import org.apache.flink.ml.MLUtils

val astroTrain: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "/path/to/svmguide1")
val astroTest: DataSet[(Vector, Double)] = MLUtils.readLibSVM(env, "/path/to/svmguide1.t")
      .map(x => (x.vector, x.label))

它给了我们两个 DataSet 对象,我们会在下面的章节中使用这两个对象来生成一个分类器。

分类

一旦我们导入了数据集,我们可以训练一个 预测模型 ,如线性 SVM 分类器。 我们可以为分类器设置多个参数。 这里我们设置 Blocks 参数,它用于通过底层CoCoA算法 [2] 来分割输入。 正则化参数确定应用的 $l_2$ 正则化值,用于避免过拟合。 步长确定权重向量更新到下一个权重向量值的贡献。 此参数设置初始步长。

import org.apache.flink.ml.classification.SVM

val svm = SVM()
  .setBlocks(env.getParallelism)
  .setIterations(100)
  .setRegularization(0.001)
  .setStepsize(0.1)
  .setSeed(42)

svm.fit(astroTrain)

我们现在可以对测试集进行预测,并使用 evaluate 函数创建(真值,预测)对。

val evaluationPairs: DataSet[(Double, Double)] = svm.evaluate(astroTest)

接下来,我们将看到我们如何预处理我们的数据,并使用 FlinkML 的机器学习管道功能。

数据预处理和管道

在使用 SVM 分类时,经常被鼓励 [3] 的预处理步骤是将输入特征缩放到 [0,1] 范围,以避免极值特征的影响。 FlinkML 有一些转换器,例如被用于预处理数据的 MinMaxScaler 。 FlinkML 的一个关键特征是将 转换器预测模型 链接在一起的能力。 这样我们可以运行相同的转换流程,并且以直接的和类型安全的方式对训练和测试数据进行预测。您可以在管道文档中阅读更多关于FlinkML管道系统的信息。

我们首先为数据集中的特征创建一个归一化转换,并将其链接到一个新的 SVM 分类器。

import org.apache.flink.ml.preprocessing.MinMaxScaler

val scaler = MinMaxScaler()

val scaledSVM = scaler.chainPredictor(svm)

我们现在可以使用我们新创建的管道来对测试集进行预测。 首先我们再次调用 fit 函数来训练缩放器和 SVM 分类器。 然后测试集的数据将被自动收敛,之后传递给 SVM 进行预测。

scaledSVM.fit(astroTrain)

val evaluationPairsScaled: DataSet[(Double, Double)] = scaledSVM.evaluate(astroTest)

收敛的输入应该会给我们更好的预测表现。

下一步

这个快速入门指南是一个对于 FlinkML 基础概念的介绍,但是你能做更多的事情。我们建议您查看 FlinkML 文档,尝试不同的算法。一个入门的好方法是用自己喜欢的来自于 UCI 机器学习库的数据集和 LibSVM 数据集进行试验。通过与其他数据科学家竞赛,从 KaggleDrivenData 这样的网站处理一个有趣的问题也是一种极好的学习方式。如果您想提供一些新的算法,请查看我们的贡献指南

参考文献

[1] Murphy, Kevin P. Machine learning: a probabilistic perspective. MIT press, 2012.

[2] Jaggi, Martin, et al. Communication-efficient distributed dual coordinate ascent. Advances in Neural Information Processing Systems. 2014.

[3] Hsu, Chih-Wei, Chih-Chung Chang, and Chih-Jen Lin. A practical guide to support vector classification. 2003.