Spark 实战

(一)基于 Spark ML 的文本分类

Posted by Stephen on August 7, 2017

基于 Spark ML 的文本分类


文本分类是一个典型的机器学习问题,其主要目标是通过对已有语料库文本数据训练得到分类模型,进而对新文本进行类别标签的预测。这在很多领域都有现实的应用场景,如新闻网站的新闻自动分类,垃圾邮件检测,非法信息过滤等。

传统的文本分类,可以通过将文本用TF-Idf或者Bow等方式处理成数值特征,用SVM或者Naive Bayes等分类算法进行解决。但是传统的方法会有一些不足,比如说词的维度会非常多,我在用TF-idf进行文本的数值化时,遇到的最大问题就是特征维度过多,导致单机处理不了。

幸运的是,Word2Vec的出现,对于文本的特征表示有了很有效的解决。具体的Word2Vec后面会单独写一篇文章来介绍。这篇文章主要是介绍用Spark ML中的Word2Vec和MultiLayers Perceptron Classifier来对短信进行分类。


MultiLayers Perceptron Classifier (多层感知器)

MultiLayers Perceptron Classifier

关于多层感知器,在之前的Tensorflow实战系列中已经有过实现,这里主要介绍下Spark中的可调参数:

  • featuresCol : 输入数据 DataFrame 中指标特征列的名称。
  • labelCol:输入数据 DataFrame 中标签列的名称。
  • layers : 这个参数是一个整型数组类型,第一个元素需要和特征向量的维度相等,最后一个元素需要训练数据的标签取值个数相等,如 2 分类问题就写2。中间的元素有多少个就代表神经网络有多少个隐层,元素的取值代表了该层的神经元的个数。例如 val layers = Array[Int](100,6,5,2)。
  • maxIter:优化算法求解的最大迭代次数。默认值是 100。
  • predictionCol : 预测结果的列名称。
  • tol : 优化算法迭代求解过程的收敛阀值。默认值是 1e-4。不能为负数。
  • blockSize : 该参数被前馈网络训练器用来将训练样本数据的每个分区都按照 blockSize 大小分成不同组,并且每个组内的每个样本都会被叠加成一个向量,以便于在各种优化算法间传递。该参数的推荐值是 10-1000,默认值是 128。

目标数据集预览

因为这里要做的就是,利用Spark ML下的多层感知器预测短信是否为垃圾短信,所以就是一个二分类的问题。用到的数据是搜集到的短信文本数据,包含正常的短信和一些垃圾短信。

ham-spam


案例分析与实现

在处理文本短信息分类预测问题的过程中,笔者首先是将原始文本数据按照 8:2 的比例分成训练和测试数据集。整个过程分为下面几个步骤:

  1. 从 HDFS 上读取原始数据集,并创建一个 DataFrame。
  2. 使用 StringIndexer 将原始的文本标签 (“Ham”或者“Spam”) 转化成数值型的表型,以便 Spark ML 处理。
  3. 使用 Word2Vec 将短信文本转化成数值型词向量。
  4. 使用 MultilayerPerceptronClassifier 训练一个多层感知器模型。
  5. 使用 LabelConverter 将预测结果的数值标签转化成原始的文本标签。
  6. 最后在测试数据集上测试模型的预测精确度。

我们可以看到,整个过程我们依然是基于 Spark ML Pipeline 的思想,构建了一个机器学习的工作流。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, Word2Vec}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}

object SMSClassify {
 final val VECTOR_SIZE = 100
 def main(args: Array[String]) {
 
 System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3")
 val conf = new SparkConf().setAppName("SMSClassify").setMaster("local[2]")
 conf.set("spark.testing.memory", "2147480000")
 val sqlCtx = new SQLContext(sc)
 val parsedRDD = sc.textFile("file:///C:\Users\jrlimingyang\IdeaProjects\sparktest\src\main\scala\data\data.txt").map(_.split("\t")).map(eachRow => {
 (eachRow(0),eachRow(1).split(" "))
 })
 val msgDF = sqlCtx.createDataFrame(parsedRDD).toDF("label","message")
 val labelIndexer = new StringIndexer()
 .setInputCol("label")
 .setOutputCol("indexedLabel")
 .fit(msgDF)

 val word2Vec = new Word2Vec()
 .setInputCol("message")
 .setOutputCol("features")
 .setVectorSize(VECTOR_SIZE)
 .setMinCount(1)

 val layers = Array[Int](VECTOR_SIZE,6,5,2)
 val mlpc = new MultilayerPerceptronClassifier()
 .setLayers(layers)
 .setBlockSize(512)
 .setSeed(1234L)
 .setMaxIter(128)
 .setFeaturesCol("features")
 .setLabelCol("indexedLabel")
 .setPredictionCol("prediction")

 val labelConverter = new IndexToString()
 .setInputCol("prediction")
 .setOutputCol("predictedLabel")
 .setLabels(labelIndexer.labels)

 val Array(trainingData, testData) = msgDF.randomSplit(Array(0.8, 0.2))

 val pipeline = new Pipeline().setStages(Array(labelIndexer,word2Vec,mlpc,labelConverter))
 val model = pipeline.fit(trainingData)

 val predictionResultDF = model.transform(testData)
 //below 2 lines are for debug use
 predictionResultDF.printSchema 
 predictionResultDF.select("message","label","predictedLabel").show(30)

 val evaluator = new MulticlassClassificationEvaluator()
 .setLabelCol("indexedLabel")
 .setPredictionCol("prediction")
 .setMetricName("accuracy")
 val predictionAccuracy = evaluator.evaluate(predictionResultDF)
 println("Testing Accuracy is %2.4f".format(predictionAccuracy * 100) + "%")
 sc.stop
 }
}

结果预览:

pipeline result

最终在测试集上的测试准确度为: Testing Accuracy is 100.0000%

最终获得这样的准确度,难道真的是Word2Vec + MultiLayer Perceptron对于这个分类问题有足够的能力?