spark与Elasticsearch整合

maven添加依赖

   org.apache.sparkspark-core_2.112.4.3org.apache.sparkspark-sql_2.112.4.3org.apache.sparkspark-streaming_2.112.4.3org.apache.sparkspark-mllib_2.112.4.3com.alibabafastjson1.2.58org.elasticsearchelasticsearch-spark-20_2.117.2.0

spark 从 Elasticsearch读取数据

es官方文档

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._object ESToSpark {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("hello world").setMaster("local[*]")conf.set("es.index.auto.create", "true")conf.set("es.nodes","127.0.0.1")conf.set("es.port","9200")val sc = new SparkContext(conf)val query: String =s"""{"query": {"term": {"name": {"value": "鲁仲连"}}}}"""val rdd = sc.esRDD("phonebills",query)rdd.collect().foreach(println)println(rdd.count()+" -----------")sc.stop()}
}

spark 向 Elasticsearch中写入数据

1.先在es中新建索引指定字段
spark字段es 字段
Stringtext/keyword
Longlong
Stringtext/keyword
Integernumber
String/Longdate
Doublenumber
Intnumber

其中向es中写入date类型时,需要先创建索引并指定mapping的format,不然会被当做text类型处理

PUT xxx
{"mappings": {"properties": {"a": {"type": "keyword"},"b": {"type": "keyword"},"c": {"type": "long"},"time": {"type": "date","format":["yyyy-MM-dd HH:mm:ss"]},"d": {"type": "text"},"e": {"type": "keyword"},"f": {"type": "text"},"g": {"type": "keyword"}}}
}
    val spark = SparkSession.builder().master("local[8]").config("es.index.auto.create", "true").config("es.nodes", "127.0.0.1").config("es.port", "9200").appName("log").getOrCreate()val sc = spark.sparkContext// val rdd = sc.textFile(path)// case class xx()// val log = rdd.map(x=>xx())val rlog = spark.createDataFrame(log)rlog.saveToEs("dblog")


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部