educoder-Spark机器学习
第1关:基于物品的推荐算法
给用户2推荐2个商品。利用spark.mllib中的矩阵计算库,构建用户与物品的打分矩阵,然后计算物品之间的相似分数,进行推荐。实现基于用户(User CF)的协同过滤算法。
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.SparseVector
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, IndexedRow, MatrixEntry, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object ItemBasedCF {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//读入数据val conf = new SparkConf().setAppName("ItemBasedCFModel").setMaster("local")val sc = new SparkContext(conf)val data = sc.textFile("/root/data/als/ratingdata.txt")/*MatrixEntry代表一个分布式矩阵中的每一行(Entry)* 这里的每一项都是一个(i: Long, j: Long, value: Double) 指示行列值的元组tuple。* 其中i是行坐标,j是列坐标,value是值。*/val parseData: RDD[MatrixEntry] =data.map(_.split(",") match { case Array(user, item, rate) => MatrixEntry(user.toLong, item.toLong, rate.toDouble) })//CoordinateMatrix是Spark MLLib中专门保存user_item_rating这种数据样本的val ratings = new CoordinateMatrix(parseData)/* 由于CoordinateMatrix没有columnSimilarities方法,所以我们需要将其转换成RowMatrix矩阵,调用他的columnSimilarities计算其相似性* RowMatrix的方法columnSimilarities是计算,列与列的相似度,现在是user_item_rating,与基于用户的CF不同的是,这里不需要进行矩阵的转置,直接就是物品的相似*/val matrix: RowMatrix = ratings.toRowMatrix()//需求:为某一个用户推荐商品。基本的逻辑是:首先得到某个用户评价过(买过)的商品,然后计算其他商品与该商品的相似度,并排序;从高到低,把不在用户评价过//商品里的其他商品推荐给用户。//例如:为用户2推荐商品//第一步:得到用户2评价过(买过)的商品 take(5)表示取出所有的5个用户 2:表示第二个用户//解释:SparseVector:稀疏矩阵val user2pred = matrix.rows.take(5)(2)val prefs: SparseVector = user2pred.asInstanceOf[SparseVector]val uitems = prefs.indices //得到了用户2评价过(买过)的商品的IDval ipi = (uitems zip prefs.values) //得到了用户2评价过(买过)的商品的ID和评分,即:(物品ID,评分)//计算物品的相似性,并输出val similarities = matrix.columnSimilarities()val indexdsimilar = similarities.toIndexedRowMatrix().rows.map {case IndexedRow(idx, vector) => (idx.toInt, vector)}//ij表示:其他用户购买的商品与用户2购买的该商品的相似度val ij = sc.parallelize(ipi).join(indexdsimilar).flatMap {case (i, (pi, vector: SparseVector)) => (vector.indices zip vector.values)}/********** begin **********///ij1表示:其他用户购买过,但不在用户2购买的商品的列表中的商品和评分val ij1 = ij.filter { case (item, pref) => !uitems.contains(item) }//将这些商品的评分求和,并降序排列,并推荐前两个物品val ij2 = ij1.reduceByKey(_ + _).sortBy(_._2, false).take(2)/********** end **********/// crgjl//取消以下1行注释for (id <- ij2) print(id._1 + " ")sc.stop()}
}
第2关:基于用户的推荐算法
根据提示,在右侧编辑器补充代码.实现:找出与用户1最相似的2个用户。
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry, RowMatrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object UserBasedCF {def main(args: Array[String]): Unit = {Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)// 创建一个SparkContextval conf = new SparkConf().setAppName("UserBasedCF").setMaster("local")val sc = new SparkContext(conf)// 读入数据val data = sc.textFile("/root/data/als/ratingdata.txt")// 解析出评分矩阵的每一行val parseData: RDD[MatrixEntry] = data.map(_.split(",")match { case Array(user, item, rate) =>MatrixEntry(user.toLong, item.toLong, rate.toDouble)})// 构建关联矩阵val ratings = new CoordinateMatrix(parseData)// 转置矩阵以计算列(用户)的相似性val matrix: RowMatrix = ratings.transpose().toRowMatrix()// 计算得到用户的相似度矩阵val similarities = matrix.columnSimilarities()// 得到某个用户对所有物品的评分val ratingOfUser1 = ratings.entries.filter(_.i == 1).map(x => (x.j, x.value)).sortBy(_._1).map(_._1).collect().toList.toArray// 得到用户1相对于其他用户的相似性val similarityOfUser1 = similarities.entries.filter(_.i == 1).sortBy(_.value, false).map(_.value).collect// 需求:为用户1推荐2个商品// 思路:找到与用户1相似性最高的两个用户,将这两个用户评过分的物品,用户1没有评过分的物品推荐给用户1/********** begin **********///找到与用户1相似性最高的两个用户val similarityTopUser = similarities.entries.filter(_.i == 1).sortBy(_.value, false).map(x=>(x.j, x.value)).collect.take(2)//println("与用户1最相似的两个用户如下:")//取消以下2行注释for (s <- similarityTopUser) print(s._1 + " ")for (s <- similarityTopUser) {// 找到这两个用户评过分的商品,与用户1没有评过分的物品val userId = s._1val ratingOfTemp = ratings.entries.filter(_.i == userId).map(x => (x.j, x.value)).sortBy(_._1).map(_._1).collect().toList.toArray// 用户1与当前用户求差集val dis = ratingOfTemp diff ratingOfUser1//println("用户" + userId + "要推荐给用户1的商品id为")for (id <- dis) print(id + " ")}/********** end **********/sc.stop()}
}
第3关:基于ALS的推荐算法
根据提示,在右侧编辑器补充代码。创建一个ALS模型,使用调用fit方法,使用training训练生成model。
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.{DataFrame, SparkSession}object ALS {case class Rating(userId: Int, movieId: Int, rating: Float)def parseRating(str: String): Rating = {val fields = str.split(",")assert(fields.size == 3)Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)}def main(args: Array[String]) {val spark = SparkSession.builder.master("local").appName("ALS").getOrCreate()import spark.implicits._val ratings = spark.read.textFile("data/als/ratingdata.txt").map(parseRating).toDF()val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))/********** begin **********/// Build the recommendation model using ALS on the training dataval als = new ALS() .setMaxIter(5) .setRegParam(0.01) .setUserCol("userId") .setItemCol("movieId") .setRatingCol("rating") val model = als.fit(training) /********** end **********/// "Evaluate the model by computing the RMSE on the test data"// "Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics"//取消以下3行注释model.setColdStartStrategy("drop")val predictions = model.transform(test)evaluatingRMSE(predictions)spark.stop()}def evaluatingRMSE(predictions:DataFrame):Unit = {val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")val rmse = evaluator.evaluate(predictions)if (rmse <= 2){print("\n" + "good")}else{println()predictions.show(false)}}
}
第4关:基于随机森林预测贷款风险
编写一个预测贷款风险的随机森林二分类模型。
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame,SparkSession}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.mllib.evaluation.RegressionMetrics
object Credit {case class Credit(creditability: Double,balance: Double, duration: Double, history: Double, purpose: Double, amount: Double,savings: Double, employment: Double, instPercent: Double, sexMarried: Double, guarantors: Double,residenceDuration: Double, assets: Double, age: Double, concCredit: Double, apartment: Double,credits: Double, occupation: Double, dependents: Double, hasPhone: Double, foreign: Double)def parseCredit(line: Array[Double]): Credit = {Credit(line(0),line(1) - 1, line(2), line(3), line(4), line(5),line(6) - 1, line(7) - 1, line(8), line(9) - 1, line(10) - 1,line(11) - 1, line(12) - 1, line(13), line(14) - 1, line(15) - 1,line(16) - 1, line(17) - 1, line(18) - 1, line(19) - 1, line(20) - 1)}def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {rdd.map(_.split(",")).map(_.map(_.toDouble))}def evaluatingAUC(predictedResultDF:DataFrame, labelstring:String):Unit = {val evaluator = new BinaryClassificationEvaluator().setLabelCol(labelstring).setRawPredictionCol("prediction")val predictionAUC = evaluator.setMetricName("areaUnderROC").evaluate(predictedResultDF)if(predictionAUC > 0.6){print("\n" + "good")}else{print(s"areaUnderROC: $predictionAUC")}}def main(args: Array[String]) {val spark = SparkSession.builder.appName("Credit").master("local").getOrCreate()import spark.implicits._val creditDF = parseRDD(spark.sparkContext.textFile("/root/data/germancredit.csv")).map(parseCredit).toDF()creditDF.createTempView("credit")val featureCols = Array("balance", "duration", "history", "purpose", "amount","savings", "employment", "instPercent", "sexMarried", "guarantors","residenceDuration", "assets", "age", "concCredit", "apartment","credits", "occupation", "dependents", "hasPhone", "foreign")/********** begin **********/// 合并特征列。val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features") val df2 = assembler.transform(creditDF)/********** end **********///取消以下4行注释val labelIndexer = new StringIndexer().setInputCol("creditability").setOutputCol("label")val df3 = labelIndexer.fit(df2).transform(df2)val splitSeed = 5043val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)/********** begin **********///调用随机森林API,使用trainingData训练生成模型modelval classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(5).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043) val model = classifier.fit(trainingData) /********** end **********///取消以下2行注释val predictions = model.transform(testData)evaluatingAUC(predictions,"label")spark.stop()}
}
第5关:基于多层感知器的手机短信分类
编写一个短信文本分类的程序。使用Spark.ml中的多层感知器(MLP,Multi Layer Perceptron Classifier)API——MultilayerPerceptronClassifer。
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, Word2Vec}
import org.apache.spark.sql.{DataFrame,SparkSession}object SMSClassifier {final val VECTOR_SIZE = 100def evaluatingAUC(predictedResultDF:DataFrame, labelcol: String):Unit = {val evaluator = new BinaryClassificationEvaluator().setLabelCol(labelcol).setRawPredictionCol("prediction")val predictionAUC = evaluator.setMetricName("areaUnderROC").evaluate(predictedResultDF)if(predictionAUC > 0.8){print("\n" + "good")}else{print(s"areaUnderROC: $predictionAUC")}}def main(args: Array[String]) {val spark = SparkSession.builder.master("local").appName("SMS Message Classification (HAM or SPAM)").getOrCreate()val parsedRDD = spark.sparkContext.textFile("data/smsspamcollection/SMSSpamCollection").map(_.split("\t")).map(eachRow => {(eachRow(0),eachRow(1).split(" "))})val msgDF = spark.createDataFrame(parsedRDD).toDF("label","message")val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(msgDF)/********** begin **********/val word2Vec = new Word2Vec() .setInputCol("message") .setOutputCol("features") .setVectorSize(VECTOR_SIZE) .setMinCount(1) /********** end **********/val layers = Array[Int](VECTOR_SIZE,6,5,2)/********** begin **********/val mlpc = new MultilayerPerceptronClassifier() .setLayers(layers) .setBlockSize(512) .setSeed(1234L) .setMaxIter(128) .setFeaturesCol("features") .setLabelCol("indexedLabel") .setPredictionCol("prediction") /********** end **********/val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)val Array(trainingData, testData) = msgDF.randomSplit(Array(0.8, 0.2))/********** begin **********/val pipeline = new Pipeline().setStages(Array(labelIndexer,word2Vec,mlpc,labelConverter)) val model = pipeline.fit(trainingData) /********** end **********///取消以下两行注释val predictionResultDF = model.transform(testData)evaluatingAUC(predictionResultDF,"indexedLabel")spark.stop()}
}
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
