Spark-SQL编程总结
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._
可以通过spark.sql.dialect选项更改SQL解析器,这个参数可以再SQLContext的setConf方法设置,也可以通过SQL的ky=value语法设计。在SQLContext中dialect只支持一种简单的SQL解析器“sql”。HiveContext默认解析器是“hiveql”,同时支持“sql”,但一般推荐hiveql,因为它语法更全。
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show();
JavaSparkContext sc // An existing SparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create the DataFrame
DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json");// Show the content of the DataFrame
df.show(); // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df.col("name"), df.col("age").plus(1)).show(); // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df.col("age").gt(21)).show(); // age name // 30 Andy // Count people by age df.groupBy("age").count().show(); // age count // null 1 // 19 1 // 30 1
SQLContext sqlContext = ... // An existing SQLContext
DataFrame df = sqlContext.sql("SELECT * FROM table")
// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS() ds.map(_ + 1).collect() // Returns: Array(2, 3, 4) // Encoders are also created for case classes. case class Person(name: String, age: Long) val ds = Seq(Person("Andy", 32)).toDS() // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path).as[Person]
第二种方法通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // or by field name: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // Map("name" -> "Justin", "age" -> 19)
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Import Row. import org.apache.spark.sql.Row; // Import Spark SQL data types import org.apache.spark.sql.types.{StructType,StructField,StringType}; // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD (people) to Rows. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // Apply the schema to the RDD. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) // Register the DataFrames as a table. peopleDataFrame.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val results = sqlContext.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index or by field name. results.map(t => "Name: " + t(0)).collect().foreach(println)
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
默认的saveASTable会创建“managed table”,其数据位置会被metastore维护,被管理的表数据会在表被删除时清空。
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet. people.write.parquet("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
path
└── to└── table├── gender=male│ ├── ...│ ││ ├── country=US│ │ └── data.parquet│ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // Read the partitioned table val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths. // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true)
相同名称的字段的数据类型必须相同,nullable类型被忽略。由于融合的数据类型需要在parquet中有对应的类型,所以nullability类型需要处理。
融合后schema中包含了Hive元数据中定义的值
- 任何只在Parquet schema中出现的字段被抛弃
- 任何旨在Hive元数据中出现的字段作为nullable增加到融合后元数据中
// sqlContext is an existing HiveContext
sqlContext.refreshTable("my_table")
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc) // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path) // The inferred schema can be visualized using the printSchema() method. people.printSchema() // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
若通过Spark SQL操作Hive需要创建HiveContext,增加元数据功能及HiveQL支持。若没有部署Hive环境同样可以创建HiveContext。若没有在hive-site.xml中配置,会自动在当前目录创建metastore_db并在/user/hive/warehouse创建仓储目录,需要给hive对/user/hive/warehouse的写权限。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load()
./sbin/start-thriftserver.sh
export HIVE_SERVER2_THRIFT_PORT=
export HIVE_SERVER2_THRIFT_BIND_HOST=
./sbin/start-thriftserver.sh \ --master \ ...
./sbin/start-thriftserver.sh \--hiveconf hive.server2.thrift.port= \--hiveconf hive.server2.thrift.bind.host= \ --master ...
./bin/beeline
beeline> !connect jdbc:hive2://localhost:10000
hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
./bin/spark-sql
- ByteType:单字节有符号整数
- ShortType:2个字节的有符号整数
- IntegerType:4字节整数
- LongType:8字节整数
- FloatType:4字节单精度浮点数
- DoubleType:8字节双精度浮点数
- DecimalType:任意精度有符号带小数的数值
- TimestampType:时间戳类型
- DateType:日期类型,只包含年月日
- ArrayType:数组类型
- MapType:map类型
- StructType:包含StructField序列的结构体
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
