基于SparkSQL的电影分析项目实战
在本篇分享中,将介绍一个完整的项目案例,该案例会真实还原企业中SparkSQL的开发流程,手把手教你构建一个基于SparkSQL的分析系统。为了讲解方便,我会对代码进行拆解,完整的代码已上传至GitHub,想看完整代码可以去clone,顺便给个**
Star**。以下是全文,希望本文对你有所帮助。看完记得三连:
分享、
点赞、
在看
https://github.com/jiamx/spark_project_practise
项目介绍
数据集介绍
使用MovieLens的名称为ml-25m.zip的数据集,使用的文件时movies.csv和ratings.csv,上述文件的下载地址为:
http://files.grouplens.org/datasets/movielens/ml-25m.zip
- movies.csv
该文件是电影数据,对应的为维表数据,大小为2.89MB,包括6万多部电影,其数据格式为[movieId,title,genres],分别对应[电影id,电影名称,电影所属分类],样例数据如下所示:逗号分隔
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
- ratings.csv
该文件为定影评分数据,对应为事实表数据,大小为646MB,其数据格式为:[userId,movieId,rating,timestamp],分别对应[用户id,电影id,评分,时间戳],样例数据如下所示:逗号分隔
1,296,5,1147880044
项目代码结构

需求分析
- 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分

- 需求2:查找每个电影类别及其对应的平均评分

- 需求3:查找被评分次数较多的前十部电影

代码讲解
- DemoMainApp
该类是程序执行的入口,主要是获取数据源,转换成DataFrame,并调用封装好的业务逻辑类。
object DemoMainApp {// 文件路径private val MOVIES_CSV_FILE_PATH = "file:///e:/movies.csv"private val RATINGS_CSV_FILE_PATH = "file:///e:/ratings.csv"def main(args: Array[String]): Unit = {// 创建spark sessionval spark = SparkSession.builder.master("local[4]").getOrCreate// schema信息val schemaLoader = new SchemaLoader// 读取Movie数据集val movieDF = readCsvIntoDataSet(spark, MOVIES_CSV_FILE_PATH, schemaLoader.getMovieSchema)// 读取Rating数据集val ratingDF = readCsvIntoDataSet(spark, RATINGS_CSV_FILE_PATH, schemaLoader.getRatingSchema)// 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分val bestFilmsByOverallRating = new BestFilmsByOverallRating//bestFilmsByOverallRating.run(movieDF, ratingDF, spark)// 需求2:查找每个电影类别及其对应的平均评分val genresByAverageRating = new GenresByAverageRating//genresByAverageRating.run(movieDF, ratingDF, spark)// 需求3:查找被评分次数较多的前十部电影val mostRatedFilms = new MostRatedFilmsmostRatedFilms.run(movieDF, ratingDF, spark)spark.close()}/*** 读取数据文件,转成DataFrame** @param spark* @param path* @param schema* @return*/def readCsvIntoDataSet(spark: SparkSession, path: String, schema: StructType) = {val dataSet = spark.read.format("csv").option("header", "true").schema(schema).load(path)dataSet}
}
- Entry
该类为实体类,封装了数据源的样例类和结果表的样例类
class Entry {}case class Movies(movieId: String, // 电影的idtitle: String, // 电影的标题genres: String // 电影类别)case class Ratings(userId: String, // 用户的idmovieId: String, // 电影的idrating: String, // 用户评分timestamp: String // 时间戳)// 需求1MySQL结果表
case class tenGreatestMoviesByAverageRating(movieId: String, // 电影的idtitle: String, // 电影的标题avgRating: String // 电影平均评分)// 需求2MySQL结果表
case class topGenresByAverageRating(genres: String, //电影类别avgRating: String // 平均评分)// 需求3MySQL结果表
case class tenMostRatedFilms(movieId: String, // 电影的idtitle: String, // 电影的标题ratingCnt: String // 电影被评分的次数)
- SchemaLoader
该类封装了数据集的schema信息,主要用于读取数据源是指定schema信息
class SchemaLoader {// movies数据集schema信息private val movieSchema = new StructType().add("movieId", DataTypes.StringType, false).add("title", DataTypes.StringType, false).add("genres", DataTypes.StringType, false)// ratings数据集schema信息private val ratingSchema = new StructType().add("userId", DataTypes.StringType, false).add("movieId", DataTypes.StringType, false).add("rating", DataTypes.StringType, false).add("timestamp", DataTypes.StringType, false)def getMovieSchema: StructType = movieSchemadef getRatingSchema: StructType = ratingSchema
}
- JDBCUtil
该类封装了连接MySQL的逻辑,主要用于连接MySQL,在业务逻辑代码中会使用该工具类获取MySQL连接,将结果数据写入到MySQL中。
object JDBCUtil {val dataSource = new ComboPooledDataSource()val user = "root"val password = "123qwe"val url = "jdbc:mysql://localhost:3306/mydb"dataSource.setUser(user)dataSource.setPassword(password)dataSource.setDriverClass("com.mysql.jdbc.Driver")dataSource.setJdbcUrl(url)dataSource.setAutoCommitOnClose(false)
// 获取连接def getQueryRunner(): Option[QueryRunner]={try {Some(new QueryRunner(dataSource))}catch {case e:Exception =>e.printStackTrace()None}}
}
需求1实现
- BestFilmsByOverallRating
需求1实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。
/*** 需求1:查找电影评分个数超过5000,且平均评分较高的前十部电影名称及其对应的平均评分*/
class BestFilmsByOverallRating extends Serializable {def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {import spark.implicits._// 将moviesDataset注册成表moviesDataset.createOrReplaceTempView("movies")// 将ratingsDataset注册成表ratingsDataset.createOrReplaceTempView("ratings")// 查询SQL语句val ressql1 ="""|WITH ratings_filter_cnt AS (|SELECT| movieId,| count( * ) AS rating_cnt,| avg( rating ) AS avg_rating|FROM| ratings|GROUP BY| movieId|HAVING| count( * ) >= 5000|),|ratings_filter_score AS (|SELECT| movieId, -- 电影id| avg_rating -- 电影平均评分|FROM ratings_filter_cnt|ORDER BY avg_rating DESC -- 平均评分降序排序|LIMIT 10 -- 平均分较高的前十部电影|)|SELECT| m.movieId,| m.title,| r.avg_rating AS avgRating|FROM| ratings_filter_score r|JOIN movies m ON m.movieId = r.movieId""".stripMarginval resultDS = spark.sql(ressql1).as[tenGreatestMoviesByAverageRating]// 打印数据resultDS.show(10)resultDS.printSchema()// 写入MySQLresultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))}/*** 获取连接,调用写入MySQL数据的方法** @param res*/private def insert2Mysql(res: tenGreatestMoviesByAverageRating): Unit = {lazy val conn = JDBCUtil.getQueryRunner()conn match {case Some(connection) => {upsert(res, connection)}case None => {println("Mysql连接失败")System.exit(-1)}}}/*** 封装将结果写入MySQL的方法* 执行写入操作** @param r* @param conn*/private def upsert(r: tenGreatestMoviesByAverageRating, conn: QueryRunner): Unit = {try {val sql =s"""|REPLACE INTO `ten_movies_averagerating`(|movieId,|title,|avgRating|)|VALUES|(?,?,?)""".stripMargin// 执行insert操作conn.update(sql,r.movieId,r.title,r.avgRating)} catch {case e: Exception => {e.printStackTrace()System.exit(-1)}}}
}
需求1结果
- 结果表建表语句
CREATE TABLE `ten_movies_averagerating` (`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id',`movieId` int(11) NOT NULL COMMENT '电影id',`title` varchar(100) NOT NULL COMMENT '电影名称',`avgRating` decimal(10,2) NOT NULL COMMENT '平均评分',`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',PRIMARY KEY (`id`),UNIQUE KEY `movie_id_UNIQUE` (`movieId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 统计结果
平均评分最高的前十部电影如下:
| movieId | title | avgRating |
|---|---|---|
| 318 | Shawshank Redemption, The (1994) | 4.41 |
| 858 | Godfather, The (1972) | 4.32 |
| 50 | Usual Suspects, The (1995) | 4.28 |
| 1221 | Godfather: Part II, The (1974) | 4.26 |
| 527 | Schindler’s List (1993) | 4.25 |
| 2019 | Seven Samurai (Shichinin no samurai) (1954) | 4.25 |
| 904 | Rear Window (1954) | 4.24 |
| 1203 | 12 Angry Men (1957) | 4.24 |
| 2959 | Fight Club (1999) | 4.23 |
| 1193 | One Flew Over the Cuckoo’s Nest (1975) | 4.22 |
上述电影评分对应的电影中文名称为:
| 英文名称 | 中文名称 |
|---|---|
| Shawshank Redemption, The (1994) | 肖申克的救赎 |
| Godfather, The (1972) | 教父1 |
| Usual Suspects, The (1995) | 非常嫌疑犯 |
| Godfather: Part II, The (1974) | 教父2 |
| Schindler’s List (1993) | 辛德勒的名单 |
| Seven Samurai (Shichinin no samurai) (1954) | 七武士 |
| Rear Window (1954) | 后窗 |
| 12 Angry Men (1957) | 十二怒汉 |
| Fight Club (1999) | 搏击俱乐部 |
| One Flew Over the Cuckoo’s Nest (1975) | 飞越疯人院 |
需求2实现
- GenresByAverageRating
需求2实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。
*** 需求2:查找每个电影类别及其对应的平均评分*/
class GenresByAverageRating extends Serializable {def run(moviesDataset: DataFrame, ratingsDataset: DataFrame, spark: SparkSession) = {import spark.implicits._// 将moviesDataset注册成表moviesDataset.createOrReplaceTempView("movies")// 将ratingsDataset注册成表ratingsDataset.createOrReplaceTempView("ratings")val ressql2 ="""|WITH explode_movies AS (|SELECT| movieId,| title,| category|FROM| movies lateral VIEW explode ( split ( genres, "\\|" ) ) temp AS category|)|SELECT| m.category AS genres,| avg( r.rating ) AS avgRating|FROM| explode_movies m| JOIN ratings r ON m.movieId = r.movieId|GROUP BY| m.category| """.stripMarginval resultDS = spark.sql(ressql2).as[topGenresByAverageRating]// 打印数据resultDS.show(10)resultDS.printSchema()// 写入MySQLresultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))}/*** 获取连接,调用写入MySQL数据的方法** @param res*/private def insert2Mysql(res: topGenresByAverageRating): Unit = {lazy val conn = JDBCUtil.getQueryRunner()conn match {case Some(connection) => {upsert(res, connection)}case None => {println("Mysql连接失败")System.exit(-1)}}}/*** 封装将结果写入MySQL的方法* 执行写入操作** @param r* @param conn*/private def upsert(r: topGenresByAverageRating, conn: QueryRunner): Unit = {try {val sql =s"""|REPLACE INTO `genres_average_rating`(|genres,|avgRating|)|VALUES|(?,?)""".stripMargin// 执行insert操作conn.update(sql,r.genres,r.avgRating)} catch {case e: Exception => {e.printStackTrace()System.exit(-1)}}}
}
需求2结果
- 结果表建表语句
CREATE TABLE genres_average_rating (`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',`genres` VARCHAR ( 100 ) NOT NULL COMMENT '电影类别',`avgRating` DECIMAL ( 10, 2 ) NOT NULL COMMENT '电影类别平均评分',`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY ( `id` ),
UNIQUE KEY `genres_UNIQUE` ( `genres` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
- 统计结果
共有20个电影分类,每个电影分类的平均评分为:
| genres | avgRating |
|---|---|
| Film-Noir | 3.93 |
| War | 3.79 |
| Documentary | 3.71 |
| Crime | 3.69 |
| Drama | 3.68 |
| Mystery | 3.67 |
| Animation | 3.61 |
| IMAX | 3.6 |
| Western | 3.59 |
| Musical | 3.55 |
| Romance | 3.54 |
| Adventure | 3.52 |
| Thriller | 3.52 |
| Fantasy | 3.51 |
| Sci-Fi | 3.48 |
| Action | 3.47 |
| Children | 3.43 |
| Comedy | 3.42 |
| (no genres listed) | 3.33 |
| Horror | 3.29 |
电影分类对应的中文名称为:
| 分类 | 中文名称 |
|---|---|
| Film-Noir | 黑色电影 |
| War | 战争 |
| Documentary | 纪录片 |
| Crime | 犯罪 |
| Drama | 历史剧 |
| Mystery | 推理 |
| Animation | 动画片 |
| IMAX | 巨幕电影 |
| Western | 西部电影 |
| Musical | 音乐 |
| Romance | 浪漫 |
| Adventure | 冒险 |
| Thriller | 惊悚片 |
| Fantasy | 魔幻电影 |
| Sci-Fi | 科幻 |
| Action | 动作 |
| Children | 儿童 |
| Comedy | 喜剧 |
| (no genres listed) | 未分类 |
| Horror | 恐怖 |
需求3实现
-
MostRatedFilms
需求3实现的业务逻辑封装。该类有一个run()方法,主要是封装计算逻辑。
/*** 需求3:查找被评分次数较多的前十部电影.*/
class MostRatedFilms extends Serializable {def run(moviesDataset: DataFrame, ratingsDataset: DataFrame,spark: SparkSession) = {import spark.implicits._// 将moviesDataset注册成表moviesDataset.createOrReplaceTempView("movies")// 将ratingsDataset注册成表ratingsDataset.createOrReplaceTempView("ratings")val ressql3 ="""|WITH rating_group AS (| SELECT| movieId,| count( * ) AS ratingCnt| FROM ratings| GROUP BY movieId|),|rating_filter AS (| SELECT| movieId,| ratingCnt| FROM rating_group| ORDER BY ratingCnt DESC| LIMIT 10|)|SELECT| m.movieId,| m.title,| r.ratingCnt|FROM| rating_filter r|JOIN movies m ON r.movieId = m.movieId|""".stripMarginval resultDS = spark.sql(ressql3).as[tenMostRatedFilms]// 打印数据resultDS.show(10)resultDS.printSchema()// 写入MySQLresultDS.foreachPartition(par => par.foreach(insert2Mysql(_)))}/*** 获取连接,调用写入MySQL数据的方法** @param res*/private def insert2Mysql(res: tenMostRatedFilms): Unit = {lazy val conn = JDBCUtil.getQueryRunner()conn match {case Some(connection) => {upsert(res, connection)}case None => {println("Mysql连接失败")System.exit(-1)}}}/*** 封装将结果写入MySQL的方法* 执行写入操作** @param r* @param conn*/private def upsert(r: tenMostRatedFilms, conn: QueryRunner): Unit = {try {val sql =s"""|REPLACE INTO `ten_most_rated_films`(|movieId,|title,|ratingCnt|)|VALUES|(?,?,?)""".stripMargin// 执行insert操作conn.update(sql,r.movieId,r.title,r.ratingCnt)} catch {case e: Exception => {e.printStackTrace()System.exit(-1)}}}}
需求3结果
- 结果表创建语句
CREATE TABLE ten_most_rated_films (`id` INT ( 11 ) NOT NULL AUTO_INCREMENT COMMENT '自增id',`movieId` INT ( 11 ) NOT NULL COMMENT '电影Id',`title` varchar(100) NOT NULL COMMENT '电影名称',`ratingCnt` INT(11) NOT NULL COMMENT '电影被评分的次数',`update_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY ( `id` ),
UNIQUE KEY `movie_id_UNIQUE` ( `movieId` )
) ENGINE = INNODB DEFAULT CHARSET = utf8;
- 统计结果
| movieId | title | ratingCnt |
|---|---|---|
| 356 | Forrest Gump (1994) | 81491 |
| 318 | Shawshank Redemption, The (1994) | 81482 |
| 296 | Pulp Fiction (1994) | 79672 |
| 593 | Silence of the Lambs, The (1991) | 74127 |
| 2571 | Matrix, The (1999) | 72674 |
| 260 | Star Wars: Episode IV - A New Hope (1977) | 68717 |
| 480 | Jurassic Park (1993) | 64144 |
| 527 | Schindler’s List (1993) | 60411 |
| 110 | Braveheart (1995) | 59184 |
| 2959 | Fight Club (1999) | 58773 |
评分次数较多的电影对应的中文名称为:
| 英文名称 | 中文名称 |
|---|---|
| Forrest Gump (1994) | 阿甘正传 |
| Shawshank Redemption, The (1994) | 肖申克的救赎 |
| Pulp Fiction (1994) | 低俗小说 |
| Silence of the Lambs, The (1991) | 沉默的羔羊 |
| Matrix, The (1999) | 黑客帝国 |
| Star Wars: Episode IV - A New Hope (1977) | 星球大战 |
| Jurassic Park (1993) | 侏罗纪公园 |
| Schindler’s List (1993) | 辛德勒的名单 |
| Braveheart (1995) | 勇敢的心 |
| Fight Club (1999) | 搏击俱乐部 |
总结
本文主要是基于SparkSQL对MovieLens数据集进行统计分析,完整实现了三个需求,并给对每个需求都给出了详细的代码实现和结果分析。本案例还原了企业使用SparkSQL进行实现数据统计的基本流程,通过本文,或许你对SparkSQL的应用有了更加深刻的认识,希望本文对你有所帮助。
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
