sparkcore写mysql_spark读写mysql

首先还是pom文件:

UTF-8

1.8

1.8

UTF-8

2.11.12

2.4.5

2.7.7

2.11

org.scala-lang

scala-library

${scala.version}

org.apache.spark

spark-core_2.11

${spark.version}

org.apache.spark

spark-sql_2.11

${spark.version}

org.apache.spark

spark-streaming_2.11

${spark.version}

org.apache.hadoop

hadoop-client

${hadoop.version}

mysql

mysql-connector-java

5.1.45

log4j

log4j

1.2.17

runtime

代码:读mysql

import java.sql.DriverManager

import org.apache.spark.rdd.JdbcRDD

import org.apache.spark.{SparkConf, SparkContext}

object MysqlRDD {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[*]")

val sparkContext = new SparkContext(sparkConf)

val jdbcrdd: JdbcRDD[String] = new JdbcRDD(sparkContext

, ()=>{

Class.forName("com.mysql.jdbc.Driver")

DriverManager.getConnection("jdbc:mysql://hadoop01:3306/transaction", "root", "root")

}

, "select * from orders where realTotalMoney>? and realTotalMoney"

, 150

, 151

, 1

, (r) => {

r.getString(1)+","+

r.getString(2)+","+

r.getString(3)+","+

r.getString(4)+","+

r.getString(5)

}

)

jdbcrdd.foreach(println)

print(jdbcrdd.count())

sparkContext.stop()

}

}

写入mysql,这里有效率问题需要注意:

低效版本:

import java.sql.DriverManager

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object RddToMysql {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]")

val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)

val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1)

rdd.foreach{ case (a: Int, b: String, c: Int) => {

Class.forName("com.mysql.jdbc.Driver")

val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root")

val sql = "insert into student(id,name,age) values(?,?,?)"

val preparedStatement = connection.prepareStatement(sql)

preparedStatement.setInt(1, a)

preparedStatement.setString(2, b)

preparedStatement.setInt(3, c)

preparedStatement.executeUpdate()

preparedStatement.close()

}}

sparkContext.stop()

}

}

效率提升版本:

import java.sql.DriverManager

import org.apache.spark.rdd.RDD

import org.apache.spark.{SparkConf, SparkContext}

object RddToMysql {

def main(args: Array[String]): Unit = {

val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]")

val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)

val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1)

rdd.foreachPartition{case it:Iterator[(Int,String,Int)]=>{

Class.forName("com.mysql.jdbc.Driver")

val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root")

val sql = "insert into student(id,name,age) values(?,?,?)"

it.foreach{case (a:Int,b:String,c:Int)=>{

val preparedStatement = connection.prepareStatement(sql)

preparedStatement.setInt(1, a)

preparedStatement.setString(2, b)

preparedStatement.setInt(3, c)

preparedStatement.executeUpdate()

preparedStatement.close()

}

}

}}

sparkContext.stop()

}

}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部