pyspark--Window Functions

文章目录

  • 1 Ranking functions
    • 1.1 row_number()
    • 1.2 rank()
    • 1.3 dense_rank()
    • 1.4 percent_rank()
    • 1.5 ntile()
  • 2 Analytic functions
    • 2.1 cume_dist()
    • 2.2 lag()
    • 2.3 lead()
  • 3 Aggregate Functions


参考链接:pyspark-window-functions

Window函数在实际业务场景中非常实用,用的好的话能避免很多浪费时间的计算。刚好最近看到一篇对Window讲解比较全的文章,在此稍微做个笔记,原文参考上述链接。

文中对Window函数分类为三种:ranking functions,analytic functions,aggregate functions
ranking functions包括row_number(),rank(),dense_rank(),percent_rank(),ntile();
analytic functions包括cume_dist(),lag(), lead();
aggregate functions包括sum(),first(),last(),max(),min(),mean(),stddev()等。
下面依次详解上述三类函数。

1 Ranking functions

首先,假设我们的数据是如下形式:

# spark = SparkSession.builder.appName('Window functions').getOrCreate()
employee_salary = [("Ali", "Sales", 8000),("Bob", "Sales", 7000),("Cindy", "Sales", 7500),("Davd", "Finance", 10000),("Elena", "Sales", 8000),("Fancy", "Finance", 12000),("George", "Finance", 11000),("Haffman", "Marketing", 7000),("Ilaja", "Marketing", 8000),("Joey", "Sales", 9000)]columns= ["name", "department", "salary"]
df = spark.createDataFrame(data = employee_salary, schema = columns)
df.show(truncate=False)

笔者在spark交互式环境中执行,因此不需新开一个sparkSession,大家如果在脚本中写然后用spark-submit提交的形式,需要开启sparkSession,结果如下:

+-------+----------+------+
|name   |department|salary|
+-------+----------+------+
|Ali    |Sales     |8000  |
|Bob    |Sales     |7000  |
|Cindy  |Sales     |7500  |
|Davd   |Finance   |10000 |
|Elena  |Sales     |8000  |
|Fancy  |Finance   |12000 |
|George |Finance   |11000 |
|Haffman|Marketing |7000  |
|Ilaja  |Marketing |8000  |
|Joey   |Sales     |9000  |
+-------+----------+------+

1.1 row_number()

row_number() 可以用来给按照指定列排序的分组窗增加一个行序号,这个列从1开始依次递增,序数是依据分组窗的指定排序列依次从小到大变化。我们来看如下代码:

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("row_number", F.row_number().over(windowSpec)).show(truncate=False)

按照部门对数据进行分组,然后按照薪水由高到低进行排序,结果如下:

+-------+----------+------+----------+
|name   |department|salary|row_number|
+-------+----------+------+----------+
|Joey   |Sales     |9000  |1         |
|Ali    |Sales     |8000  |2         |
|Elena  |Sales     |8000  |3         |
|Cindy  |Sales     |7500  |4         |
|Bob    |Sales     |7000  |5         |
|Fancy  |Finance   |12000 |1         |
|George |Finance   |11000 |2         |
|Davd   |Finance   |10000 |3         |
|Ilaja  |Marketing |8000  |1         |
|Haffman|Marketing |7000  |2         |
+-------+----------+------+----------+

观察上面的数据,你会发现,同样的薪水会有不同的行号,这是因为row_number() 是按照行来给定序号,其不关注实际数值的大小。由此我们可以引申出另一个用于给出排序数的函数rank。

1.2 rank()

rank() 用来给按照指定列排序的分组窗增加一个排序的序号,如果有相同数值,则排序数相同,下一个序数顺延一位。来看如下代码:

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("rank",F.rank().over(windowSpec)).show(truncate=False)

按照部门进行分组,组内对薪水按照从高到低进行排序,结果如下:

+-------+----------+------+----+
|name   |department|salary|rank|
+-------+----------+------+----+
|Joey   |Sales     |9000  |1   |
|Ali    |Sales     |8000  |2   |
|Elena  |Sales     |8000  |2   |
|Cindy  |Sales     |7500  |4   |
|Bob    |Sales     |7000  |5   |
|Fancy  |Finance   |12000 |1   |
|George |Finance   |11000 |2   |
|Davd   |Finance   |10000 |3   |
|Ilaja  |Marketing |8000  |1   |
|Haffman|Marketing |7000  |2   |
+-------+----------+------+----+

上面的结果我们观察到,两个相同的8000排序都是2,而下一档排序数自然顺延至4了。说到这,不得不提另一个排序数函数dense_rank()。

1.3 dense_rank()

dense_rank() 函数也是对分组窗进行排序,分组窗需指定排序列,排序时不考虑顺延,同样的值序号一致,后续数值排序不受影响。我们来看如下代码:

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("dense_rank",F.dense_rank().over(windowSpec)).show()

按照部门进行分组,对组内的数据按照薪水进行从高到低进行排序,结果如下:

+-------+----------+------+----------+
|   name|department|salary|dense_rank|
+-------+----------+------+----------+
|   Joey|     Sales|  9000|         1|
|    Ali|     Sales|  8000|         2|
|  Elena|     Sales|  8000|         2|
|  Cindy|     Sales|  7500|         3|
|    Bob|     Sales|  7000|         4|
|  Fancy|   Finance| 12000|         1|
| George|   Finance| 11000|         2|
|   Davd|   Finance| 10000|         3|
|  Ilaja| Marketing|  8000|         1|
|Haffman| Marketing|  7000|         2|
+-------+----------+------+----------+

1.4 percent_rank()

一些业务场景下,我们需要计算不同数值的百分比排序数据,先来看一个例子吧。

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("percent_rank",F.percent_rank().over(windowSpec)).show()

按照部门进行分组,然后在组内对每个人的薪水进行排序,使用percent_rank() 增加排序列,结果如下:

+-------+----------+------+------------+
|   name|department|salary|percent_rank|
+-------+----------+------+------------+
|   Joey|     Sales|  9000|         0.0|
|    Ali|     Sales|  8000|        0.25|
|  Elena|     Sales|  8000|        0.25|
|  Cindy|     Sales|  7500|        0.75|
|    Bob|     Sales|  7000|         1.0|
|  Fancy|   Finance| 12000|         0.0|
| George|   Finance| 11000|         0.5|
|   Davd|   Finance| 10000|         1.0|
|  Ilaja| Marketing|  8000|         0.0|
|Haffman| Marketing|  7000|         1.0|
+-------+----------+------+------------+

上述结果可以理解为将dense_rank() 的结果进行归一化,即可得到0-1以内的百分数。percent_rank() 与SQL中的 PERCENT_RANK 函数效果一致。

1.5 ntile()

ntile()可将分组的数据按照指定数值n切分为n个部分,每一部分按照行的先后给定相同的序数。例如n指定为2,则将组内数据分为两个部分,第一部分序号为1,第二部分序号为2。理论上两部分数据行数是均等的,但当数据为奇数行时,中间的那一行归到前一部分。我们来看如下代码:

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("ntile",F.ntile(2).over(windowSpec)).show()

按照部门对数据进行分组,然后在组内按照薪水高低进行排序,再使用ntile() 将组内数据切分为两个部分。结果如下:

+-------+----------+------+-----+
|   name|department|salary|ntile|
+-------+----------+------+-----+
|   Joey|     Sales|  9000|    1|
|    Ali|     Sales|  8000|    1|
|  Elena|     Sales|  8000|    1|
|  Cindy|     Sales|  7500|    2|
|    Bob|     Sales|  7000|    2|
|  Fancy|   Finance| 12000|    1|
| George|   Finance| 11000|    1|
|   Davd|   Finance| 10000|    2|
|  Ilaja| Marketing|  8000|    1|
|Haffman| Marketing|  7000|    2|
+-------+----------+------+-----+

2 Analytic functions

2.1 cume_dist()

cume_dist()函数用来获取数值的累进分布值,看如下例子:

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("cume_dist",F.cume_dist().over(windowSpec)).show()

按照部门进行分组,对薪水进行排序,然后cume_dist()获取累进分布值,结果如下:

+-------+----------+------+------------------+
|   name|department|salary|         cume_dist|
+-------+----------+------+------------------+
|   Joey|     Sales|  9000|               0.2|
|    Ali|     Sales|  8000|               0.6|
|  Elena|     Sales|  8000|               0.6|
|  Cindy|     Sales|  7500|               0.8|
|    Bob|     Sales|  7000|               1.0|
|  Fancy|   Finance| 12000|0.3333333333333333|
| George|   Finance| 11000|0.6666666666666666|
|   Davd|   Finance| 10000|               1.0|
|  Ilaja| Marketing|  8000|               0.5|
|Haffman| Marketing|  7000|               1.0|
+-------+----------+------+------------------+

结果好像和前面的percent_rank()很类似对不对,于是我们联想到这个其实也是一种归一化结果,其按照rank() 的结果进行归一化处理。回想一下前面讲过的rank() 函数,并列排序会影响后续排序,于是序号中间可能存在隔断。这样Sales组的排序数就是1、2、2、4、5,归一化以后就得到了0.2、0.6、0.6、0.8、1。这个统计结果按照实际业务来理解就是:9000及以上的人占了20%,8000及以上的人占了60%,7500以上的人数占了80%,7000以上的人数占了100%,这样是不是就好理解多了。

2.2 lag()

lag() 函数用于寻找按照指定列排好序的分组内每个数值的上一个数值,通俗的说,就是数值排好序以后,寻找排在每个数值的上一个数值。代码如下:

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("lag",F.lag("salary",1).over(windowSpec)).show()

按照部门进行分类,并按照薪水在组内进行排序,然后获取每一个薪水的上一个数值,结果如下:

+-------+----------+------+-----+
|   name|department|salary|  lag|
+-------+----------+------+-----+
|   Joey|     Sales|  9000| null|
|    Ali|     Sales|  8000| 9000|
|  Elena|     Sales|  8000| 8000|
|  Cindy|     Sales|  7500| 8000|
|    Bob|     Sales|  7000| 7500|
|  Fancy|   Finance| 12000| null|
| George|   Finance| 11000|12000|
|   Davd|   Finance| 10000|11000|
|  Ilaja| Marketing|  8000| null|
|Haffman| Marketing|  7000| 8000|
+-------+----------+------+-----+

与lag() 相对应的获取下一个数值的函数是lead() 。

2.3 lead()

lead() 用于获取排序后的数值的下一个,代码如下:

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("lead",F.lead("salary",1).over(windowSpec)).show()

按照部门进行分组,并在组内进行薪水排序,然后用lead获取每个薪水值的下一个数值,结果如下:

+-------+----------+------+-----+
|   name|department|salary| lead|
+-------+----------+------+-----+
|   Joey|     Sales|  9000| 8000|
|    Ali|     Sales|  8000| 8000|
|  Elena|     Sales|  8000| 7500|
|  Cindy|     Sales|  7500| 7000|
|    Bob|     Sales|  7000| null|
|  Fancy|   Finance| 12000|11000|
| George|   Finance| 11000|10000|
|   Davd|   Finance| 10000| null|
|  Ilaja| Marketing|  8000| 7000|
|Haffman| Marketing|  7000| null|
+-------+----------+------+-----+

实际业务场景中,假设我们获取了每个月的销售数据,我们可能想要知道,某月份与上一个月或下一个月数据相比怎么样,于是就可以使用lag和lead来进行数据分析了。

3 Aggregate Functions

常见的聚合函数有avg, sum, min, max, count, approx_count_distinct()等,我们用如下代码来同时使用这些函数:

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
windowSpecAgg  = Window.partitionBy("department")df.withColumn("row", F.row_number().over(windowSpec)) \.withColumn("avg", F.avg("salary").over(windowSpecAgg)) \.withColumn("sum", F.sum("salary").over(windowSpecAgg)) \.withColumn("min", F.min("salary").over(windowSpecAgg)) \.withColumn("max", F.max("salary").over(windowSpecAgg)) \.withColumn("count", F.count("salary").over(windowSpecAgg)) \.withColumn("distinct_count", F.approx_count_distinct("salary").over(windowSpecAgg)) \.show()

结果如下:

+-------+----------+------+---+-------+-----+-----+-----+-----+--------------+
|   name|department|salary|row|    avg|  sum|  min|  max|count|distinct_count|
+-------+----------+------+---+-------+-----+-----+-----+-----+--------------+
|   Joey|     Sales|  9000|  1| 7900.0|39500| 7000| 9000|    5|             4|
|    Ali|     Sales|  8000|  2| 7900.0|39500| 7000| 9000|    5|             4|
|  Elena|     Sales|  8000|  3| 7900.0|39500| 7000| 9000|    5|             4|
|  Cindy|     Sales|  7500|  4| 7900.0|39500| 7000| 9000|    5|             4|
|    Bob|     Sales|  7000|  5| 7900.0|39500| 7000| 9000|    5|             4|
|  Fancy|   Finance| 12000|  1|11000.0|33000|10000|12000|    3|             3|
| George|   Finance| 11000|  2|11000.0|33000|10000|12000|    3|             3|
|   Davd|   Finance| 10000|  3|11000.0|33000|10000|12000|    3|             3|
|  Ilaja| Marketing|  8000|  1| 7500.0|15000| 7000| 8000|    2|             2|
|Haffman| Marketing|  7000|  2| 7500.0|15000| 7000| 8000|    2|             2|
+-------+----------+------+---+-------+-----+-----+-----+-----+--------------+

需要注意的是 approx_count_distinct() 函数适用于窗函数的统计,而在groupby中通常用countDistinct()来代替该函数,用来求组内不重复的数值的条数。approx_count_distinct()取的是近似的数值,不太准确,使用需注意。从结果来看,统计值基本上是按照部门分组,统计组内的salary情况。如果我们只想要保留部门的统计结果,而将每个人的实际情况去掉,可以采用如下代码:

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
windowSpecAgg  = Window.partitionBy("department")df.withColumn("row", F.row_number().over(windowSpec)) \.withColumn("avg", F.avg("salary").over(windowSpecAgg)) \.withColumn("sum", F.sum("salary").over(windowSpecAgg)) \.withColumn("min", F.min("salary").over(windowSpecAgg)) \.withColumn("max", F.max("salary").over(windowSpecAgg)) \.withColumn("count", F.count("salary").over(windowSpecAgg)) \.withColumn("distinct_count", F.approx_count_distinct("salary").over(windowSpecAgg)) \.where(F.col("row")==1).select("department","avg","sum","min","max","count","distinct_count") \.show()
+----------+-------+-----+-----+-----+-----+--------------+
|department|    avg|  sum|  min|  max|count|distinct_count|
+----------+-------+-----+-----+-----+-----+--------------+
|     Sales| 7900.0|39500| 7000| 9000|    5|             4|
|   Finance|11000.0|33000|10000|12000|    3|             3|
| Marketing| 7500.0|15000| 7000| 8000|    2|             2|
+----------+-------+-----+-----+-----+-----+--------------+

分组窗在实际中用处还是很大的,部分关于Window的知识可移步 Window不同分组窗的使用


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部