Spark2 QuickStart上手操作 (pyspark2)

pyspark2 上手操作

1、准备数据(put hdfs)

[root@bdpcm01 spark2]# hdfs dfs -mkdir /tmp
[root@bdpcm01 spark2]# hdfs dfs -put /opt/cloudera/parcels/SPARK2/lib/spark2/README.md /tmp

2、上手练习

2.1 运行pyspark2

[root@bdpcm01 spark2]# pyspark2
......
>>> sc
<SparkContext master=yarn appName=PySparkShell>    

‘’’

2.2 使用textFile创建一个字符串的RDD

‘’’

>>> lines = sc.textFile("/tmp/README.md")
>>> lines.count()
103                                                                             
>>> lines.first()
'# Apache Spark'

‘’’

2.3 调用转化操作filter()

‘’’

>>> pythonLines = lines.filter(lambda line: "Python" in line)

‘’’

2.4 调用first()行动操作

‘’’

>>> pythonLines.first()
'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'

‘’’

2.5 把RDD 持久化到内存中

‘’’

>>> pythonLines.persist
<bound method RDD.persist of PythonRDD[16] at RDD at PythonRDD.scala:48>

‘’’

2.6 调用count()行动操作

‘’’

>>> pythonLines.count()
3   

3、验证2(Linux操作)

"""
[root@bdpcm01 spark2]# cat README.md -n | grep "Python"4  high-level APIs in Scala, Java, Python, and R, and an optimized engine that44  ## Interactive Python Shell46  Alternatively, if you prefer Python, you can use the Python shell:[root@bdpcm01 spark2]# cat README.md  | grep "Python" | cat -n1  high-level APIs in Scala, Java, Python, and R, and an optimized engine that2  ## Interactive Python Shell3  Alternatively, if you prefer Python, you can use the Python shell:
"""   

‘’’
总的来说,每个Spark 程序或shell 会话都按如下方式工作:
(1) 从外部数据创建出输入RDD。
(2) 使用诸如filter() 这样的转化操作对RDD 进行转化,以定义新的RDD。
(3) 告诉Spark 对需要被重用的中间结果RDD 执行persist() 操作。 cache() 与使用默认存储级别调用persist() 是一样的。
(4) 使用行动操作(例如count() 和first() 等)来触发一次并行计算,Spark 会对计算进行优化后再执行。
‘’’

4、创建RDD

‘’’

4.1 两种创建RDD 的方式

‘’’

‘’’
Spark 提供了两种创建RDD 的方式:
1、读取外部数据集,以及在驱动器程序中对一个集合进行并行化。
创建RDD 最简单的方式就是把程序中一个已有的集合传给SparkContext 的parallelize()方法,这种方式在学习Spark 时非常有用,它让你可以在shell 中快速创建出自己的RDD,然后对这些RDD 进行操作。不过,需要注意的是,除了开发原型和测试时,这种方式用得并不多,毕竟这种方式需要把你的整个数据集先放在一台机器的内存中。
2、更常用的方式是从外部存储中读取数据来创建RDD。 将文本文件读入为一个存储字符串的RDD 的方法SparkContext.textFile(),上述可见。
‘’’

“”"

4.2 Python 中的parallelize() 方法

“”"

>>> lines = sc.parallelize(["pandas", "i like pandas"])
>>> lines.count()
2                                                                               
>>> lines.first()
'pandas'
>>> lines.collect()  '可以用来获取整个RDD 中的数据。'
['pandas', 'i like pandas']  

5、RDD操作

‘’’
RDD 支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一个新的RDD 的操作,比如map() 和filter(),
而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如count() 和first()。
RDD惰性求值,在RDD动作操作时才进行数据的读取与装换。
‘’’

“”"

5.1 RDD装换操作

“”"

'''
1、用Python 实现filter() 转化操作
'''
>>> inputRDD = sc.textFile("log.txt")
>>> errorsRDD = inputRDD.filter(lambda x: "error" in x)
"""
2、用Python 进行union() 转化操作
"""
>>> errorsRDD = inputRDD.filter(lambda x: "error" in x)
>>> warningsRDD = inputRDD.filter(lambda x: "warning" in x)
>>> badLinesRDD = errorsRDD.union(warningsRDD)

“”"

5.2 RDD行动操作

“”"

'''
1、在Python 中使用行动操作对错误进行计数
'''
>>> print "Input had " + badLinesRDD.count() + " concerning lines"
>>> print "Here are 10 examples:"
>>> for line in badLinesRDD.take(10):
>>> 	print line
"""
2、用Python 进行collect() 动作操作'可以用来获取整个RDD 中的数据。(必须是小规模数据集)'
"""
>>> lines = sc.parallelize(["pandas", "i like pandas"])
>>> lines.collect() 
['pandas', 'i like pandas']  
"""
3、通常要把数据写到诸如HDFS 或Amazon S3 这样的分布式的存储系统中。可以使用saveAsTextFile()、saveAsSequenceFile(),或者任意的其他行动操作来把RDD 的数据内容以各种自带的格式保存起来。注:需要注意的是,每当我们调用一个新的行动操作时,整个RDD 都会从头开始计算。要避免这种低效的行为,用户可以将中间结果持久化。
"""

‘’’

5.3 向Spark传递函数

Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。
传递比较短的函数时,可以使用lambda 表达式来传递。除了lambda 表达式,我们也可以传递顶层函数或是定义的局部函数。

‘’’

'''
例:在Python 中传递函数
'''
word = rdd.filter(lambda s: "error" in s)def containsError(s):return "error" in s
word = rdd.filter(containsError)

“”"

5.4 常见的转换操作和行动操作

“”"

‘’’
1、针对各个元素的转化操作
两个最常用的转化操作是map() 和filter():
转化操作map() 接收一个函数,把这个函数用于RDD 中的每个元素,将函数的返回结果作为结果RDD 中对应元素的值。
而转化操作filter() 则接收一个函数,并将RDD 中满足该函数的元素放入新的RDD 中返回。
‘’’

#Python 版用map()计算RDD 中各值的平方
>>> nums = sc.parallelize([1, 2, 3, 4])
>>> squared = nums.map(lambda x: x * x).collect()
>>> for num in squared:print "%i " % (num)#Python 中的flatMap() 将行数据切分为单词
>>> lines = sc.parallelize(["hello world", "hi"])
>>> words = lines.flatMap(lambda line: line.split(" "))
>>> words.first()
'hello'
>>> lines.first()
'hello world'

‘’’
2、伪集合操作 这些操作都要求操作的RDD 是相同数据类型的。

  • 2.1 RDD 中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素。如果只要唯一的元素,我们可以使用RDD.distinct() 转化操作来生成一个只包含不同元素的新RDD。不过需要注意,distinct() 操作的开销很大,因为它需要将所有数据通过网络进行混洗(shuffle),以确保每个元素都只有一份。
  • 2.2 最简单的集合操作是union(other),它会返回一个包含两个RDD 中所有元素的RDD。这在很多用例下都很有用,比如处理来自多个数据源的日志文件。与数学中的union() 操作不同的是,如果输入的RDD 中有重复数据,Spark 的union() 操作也会包含这些重复数据(如有必要,我们可以通过distinct() 实现相同的效果)。
  • 2.3 Spark 还提供了intersection(other) 方法,只返回两个RDD 中都有的元素。intersection()在运行时也会去掉所有重复的元素(单个RDD 内的重复元素也会一起移除)。尽管intersection() 与union() 的概念相似,intersection() 的性能却要差很多,因为它需要通过网络混洗数据来发现共有的元素。
  • 2.4 有时我们需要移除一些数据。subtract(other) 函数接收另一个RDD 作为参数,返回一个由只存在于第一个RDD 中而不存在于第二个RDD 中的所有元素组成的RDD。 和intersection() 一样,它也需要数据混洗。

‘’’

‘’’
对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD转化操作:
‘’’

操作解释运算结果
map()将函数应用于RDD 中的每个元素,将返回值构成新的RDDrdd.map(x => x + 1){2, 3, 4, 4}
flatMap()将函数应用于RDD 中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词rdd.flatMap(x => x.to(3)){1, 2, 3, 2, 3, 3, 3}
filter()返回一个由通过传给filter()的函数的元素组成的RDDrdd.filter(x => x != 1){2, 3, 3}
distinct()去重rdd.distinct(){1, 2, 3}
sample(withReplacement, fraction, [seed])对RDD 采样,以及是否替换rdd.sample(false, 0.5)非确定的

‘’’
对数据分别为{1, 2, 3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作:
‘’’

操作解释运算结果
union()生成一个包含两个RDD 中所有元素的RDDrdd.union(other){1, 2, 3, 3, 4, 5}
intersection()求两个RDD 共同的元素的RDDrdd.intersection(other){3}
subtract()移除一个RDD 中的内容(例如移除训练数据) : rdd.subtract(other){1, 2}
cartesian()与另一个RDD 的笛卡儿积rdd.cartesian(other){(1, 3), (1, 4), …(3, 5)}

‘’’
3、行动操作

  • 3.1 基本RDD 上最常见的行动操作reduce()。它接收一个函数作为参数,这个函数要操作两个RDD 的元素类型的数据并返回一个同样类型的新元素。一个简单的例子就是函数+,可以用它来对我们的RDD 进行累加。使用reduce(),可以很方便地计算出RDD中所有元素的总和、元素的个数,以及其他类型的聚合操作。
  • 3.2 用aggregate() 来计算RDD 的平均值,来代替map() 后面接fold() 的方式

‘’’

#Python 中的reduce()
sum = rdd.reduce(lambda x, y: x + y)#Python 中的aggregate()
sumCount = nums.aggregate((0, 0),(lambda acc, value: (acc[0] + value, acc[1] + 1),(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))))
return sumCount[0] / float(sumCount[1])

‘’’
对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD行动操作
‘’’

操作解释运算结果
collect()返回RDD 中的所有元素rdd.collect(){1, 2, 3, 3}
count()RDD 中的元素个数rdd.count()4
countByValue()各元素在RDD 中出现的次数rdd.countByValue(){(1, 1),(2, 1),(3, 2)}
take(num)从RDD 中返回num 个元素rdd.take(2){1, 2}
top(num)从RDD 中返回最前面的num个元素rdd.top(2){3, 3}
takeOrdered(num)(ordering)从RDD 中按照提供的顺序返回最前面的num 个元素rdd.takeOrdered(2)(myOrdering): {3, 3}
takeSample(withReplacement, num, [seed])从RDD 中返回任意一些元素rdd.takeSample(false, 1)非确定的
reduce(func)并行整合RDD 中所有数据(例如sum)rdd.reduce((x, y) => x + y)9
fold(zero)(func)和reduce() 一样, 但是需要提供初始值rdd.fold(0)((x, y) => x + y)9
aggregate(zeroValue)(seqOp, combOp)和reduce() 相似, 但是通常返回不同类型的函数rdd.aggregate((0, 0))((x, y) =>(x._1 + y, x._2 + 1),(x, y) =>(x._1 + y._1, x._2 + y._2))
foreach(func)对RDD 中的每个元素使用给定的函数rdd.foreach(func)

‘’’

6、持久化(缓存)

‘’’

“”"
默认情况下,每个动作操作都会重复计算RDD以及所有的依赖RDD;为了避免多次计算同一个RDD,可以让Spark 对数据进行持久化。当我们让Spark 持久化存储一个RDD 时,
计算出RDD 的节点会分别保存它们所求出的分区数据。如果一个有持久化数据的节点发生故障,Spark 会在需要用到缓存的数据时重算丢失的数据分区。如果希望节点故障的
情况不会拖累我们的执行速度,也可以把数据备份到多个节点上。出于不同的目的,我们可以为RDD 选择不同的持久化级别:
“”"

>>> files = sc.textFile("/tmp/pysparktest.txt")
>>> words = files.flatMap(lambda file: file.split(" "))
>>> pairs = words.map(lambda x: (x.split(" "),1))
>>> files.collect()
['ARRAY CF CELL SL OC', 'array cf cell sl oc sloc', 'ARRAY CELL', 'CF SL OC', 'OC CELL CF ARRAY ']
>>> words.collect()
['ARRAY', 'CF', 'CELL', 'SL', 'OC', 'array', 'cf', 'cell', 'sl', 'oc', 'sloc', 'ARRAY', 'CELL', 'CF', 'SL', 'OC', 'OC', 'CELL', 'CF', 'ARRAY', '']
>>> pairs.collect()
[(['ARRAY'], 1), (['CF'], 1), (['CELL'], 1), (['SL'], 1), (['OC'], 1), (['array'], 1), (['cf'], 1), (['cell'], 1), (['sl'], 1), (['oc'], 1), (['sloc'], 1), (['ARRAY'], 1), (['CELL'], 1), (['CF'], 1), (['SL'], 1), (['OC'], 1), (['OC'], 1), (['CELL'], 1), (['CF'], 1), (['ARRAY'], 1), ([''], 1)]>>> maps = words.map(lambda x: (x, 1))
>>> maps.collect()
[('ARRAY', 1), ('CF', 1), ('CELL', 1), ('SL', 1), ('OC', 1), ('array', 1), ('cf', 1), ('cell', 1), ('sl', 1), ('oc', 1), ('sloc', 1), ('ARRAY', 1), ('CELL', 1), ('CF', 1), ('SL', 1), ('OC', 1), ('OC', 1), ('CELL', 1), ('CF', 1), ('ARRAY', 1), ('', 1)]
>>> counts = maps.reduceByKey(lambda x, y: x + y)
>>> counts.collect()
[('', 1), ('SL', 2), ('CF', 3), ('sl', 1), ('array', 1), ('ARRAY', 3), ('CELL', 3), ('cell', 1), ('cf', 1), ('sloc', 1), ('OC', 3), ('oc', 1)]
#中以字符串顺序对整数进行自定义排序
>>> results = counts.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x))
>>> results.collect()
[('', 1), ('ARRAY', 3), ('CELL', 3), ('CF', 3), ('OC', 3), ('SL', 2), ('array', 1), ('cell', 1), ('cf', 1), ('oc', 1), ('sl', 1), ('sloc', 1)]统计单词个数:
>>> files = sc.textFile("/tmp/pysparktest.txt")
>>> words = files.flatMap(lambda file: file.split(" "))
>>> maps = words.map(lambda x: (x, 1))
>>> counts = maps.reduceByKey(lambda x, y: x + y)or
>>> files = sc.textFile("/tmp/pysparktest.txt")
>>> results = files.flatMap(lambda x : x.split(" ")).countByValue()

“”"

7、数据读取与保存

“”"

'''
在Python 中读取一个文本文件
'''
>>> input = sc.textFile("file:///root/pyspark")
i>>> nput.count()
''''
在Python 中将数据保存为文本文件
'''
>>> RDD.saveAsTextFile(outputFile)

“”"

7.1 Spark 读取Hive数据

“”"

'Python 中SQL 的import 声明'
>>> from pyspark.sql import HiveContext,Row 
在Python 中创建SQL 上下文环境
>>> hiveCtx = HiveContext(sc)
在Python 中读取并查询
>>> rows = hiveCtx.sql("SELECT * FROM prod_bdw.dwd_factory_af")
>>> rows.show()
+----+-------+-----------+----+----------+-------+---------+-------+----------+-------------------+----------+------------+-------------------+
|site|factory|description|code|object_rrn|org_rrn|is_active|created|created_by|            updated|updated_by|lock_version|     interface_time|
+----+-------+-----------+----+----------+-------+---------+-------+----------+-------------------+----------+------------+-------------------+
|  P2|  ARRAY|        TFT| MES|        10|      0|        Y|   null|      null|2017-05-27 23:39:43|     admin|          28|2019-08-22 07:50:07|
|  P2|     CF|      彩色滤光片| MES|        11|      0|        Y|   null|      null|               null|      null|        null|2019-08-22 07:50:07|
+----+-------+-----------+----+----------+-------+---------+-------+----------+-------------------+----------+------------+-------------------+
>>> print (rows)
DataFrame[site: string, factory: string, description: string, code: string, object_rrn: bigint, org_rrn: bigint, is_active: string, created: timestamp, created_by: string, updated: timestamp, updated_by: string, lock_version: bigint, interface_time: timestamp]
>>> firstRow = rows.first()
>>> print (firstRow)
Row(site='P2', factory='ARRAY', description='TFT', code='MES', object_rrn=10, org_rrn=0, is_active='Y', created=None, created_by=None, updated=datetime.datetime(2017, 5, 27, 23, 39, 43), updated_by='admin', lock_version=28, interface_time=datetime.datetime(2019, 8, 22, 7, 50, 7))>>> rows.show()>>> limits = spark.sql("SELECT * FROM prod_bdw.dwd_calendar limit 3")
>>> limits.show()'缓存'
>>> hiveCtx.cacheTable("tableName")
  • CACHE TABLE tableName 缓存表
  • UNCACHE TABLE tableName 删除已有的缓存


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部