spark与Elasticsearch整合
maven添加依赖
org.apache.spark spark-core_2.11 2.4.3 org.apache.spark spark-sql_2.11 2.4.3 org.apache.spark spark-streaming_2.11 2.4.3 org.apache.spark spark-mllib_2.11 2.4.3 com.alibaba fastjson 1.2.58 org.elasticsearch elasticsearch-spark-20_2.11 7.2.0
spark 从 Elasticsearch读取数据
es官方文档
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._object ESToSpark {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("hello world").setMaster("local[*]")conf.set("es.index.auto.create", "true")conf.set("es.nodes","127.0.0.1")conf.set("es.port","9200")val sc = new SparkContext(conf)val query: String =s"""{"query": {"term": {"name": {"value": "鲁仲连"}}}}"""val rdd = sc.esRDD("phonebills",query)rdd.collect().foreach(println)println(rdd.count()+" -----------")sc.stop()}
}
spark 向 Elasticsearch中写入数据
1.先在es中新建索引指定字段
| spark字段 | es 字段 |
|---|---|
| String | text/keyword |
| Long | long |
| String | text/keyword |
| Integer | number |
| String/Long | date |
| Double | number |
| Int | number |
其中向es中写入date类型时,需要先创建索引并指定mapping的format,不然会被当做text类型处理
PUT xxx
{"mappings": {"properties": {"a": {"type": "keyword"},"b": {"type": "keyword"},"c": {"type": "long"},"time": {"type": "date","format":["yyyy-MM-dd HH:mm:ss"]},"d": {"type": "text"},"e": {"type": "keyword"},"f": {"type": "text"},"g": {"type": "keyword"}}}
}
val spark = SparkSession.builder().master("local[8]").config("es.index.auto.create", "true").config("es.nodes", "127.0.0.1").config("es.port", "9200").appName("log").getOrCreate()val sc = spark.sparkContext// val rdd = sc.textFile(path)// case class xx()// val log = rdd.map(x=>xx())val rlog = spark.createDataFrame(log)rlog.saveToEs("dblog")
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
