FlinkML 旨在从您的数据中学习一个简单的过程,抽象出来通常带有大数据学习任务的复杂性。 在这个快速入门指南中,我们将展示使用 FlinkML 解决一个简单的监督学习问题是多么的容易。 但是首先要介绍一些基础知识,如果你已经熟悉机器学习(ML),请随时跳过接下来的几行。
如 Murphy [1] 所定义的,机器学习(ML)用于检测数据中的模式,并使用这些学习到的模式来预测未来。 我们可以将大多数机器学习(ML)算法分为两大类:监督学习和无监督学习。
监督学习 涉及从一个输入(特征)集合到一个输出集合学习一个函数(映射)。 学习是通过使用我们用来近似映射函数的(输入,输出)对训练集来完成的。监督学习问题进一步分为分类问题和回归问题。在分类问题中,我们尝试预测样例属于的类,例如用户是否要点击广告。另一方面,回归问题是要预测(实际的)数值,这个数值通常称为因变量,例如明天的温度是多少。
无监督学习 用来发现数据中的模式和规律。 一个例子是聚类,我们尝试从描述性的特征中发现数据分组。 无监督学习也可用于特征选择,例如通过 主成分分析(principal components analysis) 进行特征选择。
为了在您的项目中使用 FlinkML ,首先您必须建立一个 Flink 程序(//flink.iteblog.com/dev/linking_with_flink.html)。 .
接下来,您必须将 FlinkML 的依赖添加到项目的 pom.xml
中:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ml_2.10</artifactId>
<version>1.3-SNAPSHOT</version>
</dependency>
要加载与 FlinkML 一起使用的数据,我们可以使用 Flink 的 ETL 功能,或者使用处理诸如 LibSVM 格式的格式化数据的专门方法。 对于监督学习问题,通常使用 LabeledVector
类来表示 (标记,特征)
样例。 LabeledVector
对象将具有表示样例特征的 FlinkML Vector
成员,以及表示标记的 Double
成员,该标记可能是分类问题中的类,也可以是回归问题的因变量。
例如,我们可以使用 Haberman’s Survival 数据集,您可以从 UCI 机器学习数据库下载这个数据集。 该数据集“包含了对乳腺癌手术患者的存活进行研究的病例”。 数据来自逗号分隔的文件,前3列是特征,最后一列是类,第4列表示患者是否存活5年以上(标记1),或者5年内死亡(标记2)。 您可以查看 UCI 页面了解有关数据的更多信息。
我们可以先把数据加载为一个 DataSet[String]
:
import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
val survival = env.readCsvFile[(String, String, String, String)]("/path/to/haberman.data")
我们现在可以将数据转换成 DataSet[LabeledVector]
。 这将允许我们使用 FlinkML 分类算法的数据集。 我们知道数据集的第四个元素是类标记,其余的是特征,所以我们可以像这样构建 LabeledVector
元素:
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.DenseVector
val survivalLV = survival
.map{tuple =>
val list = tuple.productIterator.toList
val numList = list.map(_.asInstanceOf[String].toDouble)
LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
}
然后,我们可以使用这些数据来训练一个学习器。然而,我们将使用另一个数据集来示例建立学习器;这将让我们展示如何导入其他数据集格式。
LibSVM 文件
机器学习数据集的通用格式是 LibSVM 格式,并且可以在 LibSVM 数据集网站中找到使用该格式的多个数据集。 FlinkML 提供了通过 MLUtils
对象的 readLibSVM
函数加载 LibSVM 格式的数据集的实用程序。 您还可以使用 writeLibSVM
函数以 LibSVM 格式保存数据集。 让我们导入 svmguide1 数据集。 您可以在这里下载训练集和测试集。 这是一个二进制分类数据集,由 Hsu 等人 [3] 在他们的实用支持向量机(SVM)指南中使用。 它包含4个数字特征和它的类标记。
我们可以简单地使用下面的代码导入数据集:
import org.apache.flink.ml.MLUtils
val astroTrain: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "/path/to/svmguide1")
val astroTest: DataSet[(Vector, Double)] = MLUtils.readLibSVM(env, "/path/to/svmguide1.t")
.map(x => (x.vector, x.label))
它给了我们两个 DataSet
对象,我们会在下面的章节中使用这两个对象来生成一个分类器。
一旦我们导入了数据集,我们可以训练一个 预测模型
,如线性 SVM 分类器。 我们可以为分类器设置多个参数。 这里我们设置 Blocks
参数,它用于通过底层CoCoA算法 [2] 来分割输入。 正则化参数确定应用的 $l_2$ 正则化值,用于避免过拟合。 步长确定权重向量更新到下一个权重向量值的贡献。 此参数设置初始步长。
import org.apache.flink.ml.classification.SVM
val svm = SVM()
.setBlocks(env.getParallelism)
.setIterations(100)
.setRegularization(0.001)
.setStepsize(0.1)
.setSeed(42)
svm.fit(astroTrain)
我们现在可以对测试集进行预测,并使用 evaluate
函数创建(真值,预测)对。
val evaluationPairs: DataSet[(Double, Double)] = svm.evaluate(astroTest)
接下来,我们将看到我们如何预处理我们的数据,并使用 FlinkML 的机器学习管道功能。
在使用 SVM 分类时,经常被鼓励 [3] 的预处理步骤是将输入特征缩放到 [0,1] 范围,以避免极值特征的影响。 FlinkML 有一些转换器
,例如被用于预处理数据的 MinMaxScaler
。 FlinkML 的一个关键特征是将 转换器
和预测模型
链接在一起的能力。 这样我们可以运行相同的转换流程,并且以直接的和类型安全的方式对训练和测试数据进行预测。您可以在管道文档中阅读更多关于FlinkML管道系统的信息。
我们首先为数据集中的特征创建一个归一化转换,并将其链接到一个新的 SVM 分类器。
import org.apache.flink.ml.preprocessing.MinMaxScaler
val scaler = MinMaxScaler()
val scaledSVM = scaler.chainPredictor(svm)
我们现在可以使用我们新创建的管道来对测试集进行预测。 首先我们再次调用 fit 函数来训练缩放器和 SVM 分类器。 然后测试集的数据将被自动收敛,之后传递给 SVM 进行预测。
scaledSVM.fit(astroTrain)
val evaluationPairsScaled: DataSet[(Double, Double)] = scaledSVM.evaluate(astroTest)
收敛的输入应该会给我们更好的预测表现。
这个快速入门指南是一个对于 FlinkML 基础概念的介绍,但是你能做更多的事情。我们建议您查看 FlinkML 文档,尝试不同的算法。一个入门的好方法是用自己喜欢的来自于 UCI 机器学习库的数据集和 LibSVM 数据集进行试验。通过与其他数据科学家竞赛,从 Kaggle 或 DrivenData 这样的网站处理一个有趣的问题也是一种极好的学习方式。如果您想提供一些新的算法,请查看我们的贡献指南。
参考文献
[1] Murphy, Kevin P. Machine learning: a probabilistic perspective. MIT press, 2012.
[2] Jaggi, Martin, et al. Communication-efficient distributed dual coordinate ascent. Advances in Neural Information Processing Systems. 2014.
[3] Hsu, Chih-Wei, Chih-Chung Chang, and Chih-Jen Lin. A practical guide to support vector classification. 2003.