【 RDD持久化】

文章目录

  • RDD持久化
    • 操作:
    • 总结:
    • cache vs persist 区别:
    • 存储级别选择:
    • 移除rdd持久化 :
    • 2.血缘关系
    • 依赖关系分类:
    • 4.shuffle 算子:
    • 案例分析:

spark_05_RDD持久化

RDD持久化

rdda => rddb =>rddc    action 
rdda => rddb  =>rdde   action

rddb 持久化操作 =》 调优的

操作:

1.persist() or cache() methods
2.触发action之后 会对rdd数据进行持久化的

总结:

1.cache()  不是action算子 是lazy 是懒加载的
  1. rdda => action job
    rdda => cache => action job => rdd持久化 生效
    rdda => action job rdda的数据从 rdd持久化的地方加载数据

    rdda => rddb => rddc
    rdda => rddb => rdde
    rdda => rddb => rddf

    rddb.cache 之后 rddb之后数据就不用从头开机计算 提升计算效率

    补充:
    对rdd做持久化 就是对rdd里面的分区做持久化

    好处:
    1.much faster 【计算效率】
    2. reuse 复用

cache vs persist 区别:

1.cache底层就是调用 persist算子 
2.spark-core 持久化 默认存储级别:StorageLevel.MEMORY_ONLY

StorageLevel:
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1

val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

存储级别选择:

1.MEMORY_ONLY  首选 
2.MEMORY_ONLY_SER 次选 1.Java serialization: By default,2.Kryo: 1.注册  class

移除rdd持久化 :

1.lru 
2.手动 : RDD.unpersist(true)   立即执行的 eager 

2.血缘关系

lineage:rdda => rddb => rddc  

一个rdd是如何从父rdd计算得来的

textFile(path) => map => fiter => … => collect
每一个转换都会形成一个rdd

好处:
容错(性能 + 容错)
容错:
1.假如说RDDB 分区 6 8 元素 在计算的时候 挂了

一个链路 200个转换 算到 第199个转换 数据坏了 ,如果 从头计算 也是挺麻烦的一件事情 :
1.spark-core 提供cache
而且 持久化的数据集 也支持 容错

3.依赖关系:
rdda => rddb
不同的依赖 会导致 生成rdd分区数发生变化的

依赖关系分类:

1.宽依赖:1.一个父rdd的parition会被子rdd的parition使用多次 2.会产生shuffle 会有新的stage产生
2.窄依赖:1.一个父rdd的parition至多被子rdd的partition使用一次2.不会产生shuffle,都是在一个stage里面完成的shuffle:数据重新洗牌

和算子:转换算子
1.窄依赖算子
2.宽依赖算子

补充:
通过宽窄依赖可以知道 spark-core里面 转换算子 哪些算子 可能引起shuffle

宽依赖:
xxxbykey shuffle
其他: join reduce
窄依赖:
map filter xxx
spark: stage是如何划分? ****
spark-core 产生 宽依赖 就会划分stage
算子:引起shuffle 就会划分stage
一个shuffle算子 会划分2个stage

两个shuffle算子 会产生几个stage???
3

4.shuffle 算子:

“生产上能使用窄依赖算子 就不使用宽依赖算子”:
1.不准确
1.生产上大部分 需求 必须使用宽依赖的

2.容错的角度 : 1.如果经过宽依赖之后的rdd的某一个分区数据挂掉 需要去父RDD分区重新计算 会把父rdd里面的所有分区都会算一下才行 


引起shuffle的算子:
​ 1.xxxbykey =》
​ 2. repartition and coalesce【不准确】
​ 3. join:
​ map join 不会引发shuffle
​ reduce join /common join =》 引起shuffle

生产上调整 计算的并行度?
coalesce: 一般用于 减少rdd的分区数 =》 窄依赖 =》 不会引起shuffle
repartition:增加rdd的分区数
coalesce(num,shuffle=true)

思考:
可不可以使用coalesce 增加rdd分区数 ? 可以的
repartition 减少rdd的分区数? 不能
思考:
coalesce 增加rdd分区数 ? 走不走shuffle? 必然走shuffle

注意:
生产上用于调整 计算的并行度

思考:
rdd =》 hdfs 200个小文件
rdd.coalesce(10) 10个文件

application :
driver:
executor:
yarn container

rdd: => executor
code => executor 判断code 操作对象是不是rdd里面的元素
=> driver

案例分析:

网站访问量排名:
domain uid flow
www.baidu.com,uid01,1
www.baidu.com,uid01,10
www.baidu.com,uid02,3
www.baidu.com,uid02,5
www.github.com,uid01,11
www.github.com,uid01,10
www.github.com,uid02,30
www.github.com,uid02,50
www.bibili.com,uid01,110
www.bibili.com,uid01,10
www.bibili.com,uid02,2
www.bibili.com,uid02,3

需求:
每个域名每个用户的访问量的top3

sql=> 分组+聚合+ 开窗
code =》 分组+聚合 + topn

((www.bibili.com,uid02),5)
((www.github.com,uid02),80)
((www.baidu.com,uid02),8)
((www.github.com,uid01),21)
((www.baidu.com,uid01),11)
((www.bibili.com,uid01),120)

top2:
uid02 www.github.com 80
uid02 www.bibili.com 5

存在安全隐患:
x.toList.

sparkcore 进行数据分析:
rdd进行操作 不要使用scala里面的结合进行存储

思想:
分而治之 类似 mr 分组

sparkcore: 没讲的
1.广播变量 =》 sss
2.累加器 =》 项目

需求:
spark-core =》 wc
input:hdfs
todo:wc
output:hdfs

idea:
spark code =》 jar =》 服务器上

部署spark作业:
1.jar
2.spark-submit 提交作业

// val in = “hdfs://bigdata32:9000/input/”
// val out = “hdfs://bigdata32:9000/output/”

提交spark作业:
spark-submit
–class com.dl2262.sparkcore.day02.WCApp
–master local[2]
–name wordcount
/home/hadoop/project/spark/spark-2262-1.0.jar
hdfs://bigdata32:9000/input/1 hdfs://bigdata32:9000/output/

作业:
1.yarn 提交?

2.–conf 任意Spark配置属性

–properties-file 要从中加载额外文件的文件路径

spark-submit \
--class com.dl2262.sparkcore.day02.WCApp \
--master  yarn \
--deploy-mode client \
--name wordcount \
--driver-memory 1G
/home/hadoop/project/spark/spark-2262-1.0.jar \
hdfs://bigdata32:9000/input/1 hdfs://bigdata32:9000/output/

spark-submit
–class com.dl2262.sparkcore.day02.WCApp
–master local[2]
–name wordcount
–conf spark.input.path=hdfs://bigdata32:9000/input/1
–conf spark.output.path=hdfs://bigdata32:9000/output1/
/home/hadoop/project/spark/spark-2262-1.0.jar


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部