Pyspark Windows Functions : Notes by Arun Nautiyal
There are three types of Windows functions:
1. Ranking functions
2. Analytic functions
3. Aggregate functions
In [4]: import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
In [6]: spark=SparkSession.builder.appName('Arun_test').getOrCreate()
In [34]: simpleData = [('Arun','UK',50000),
('Arjun','MP',30000),
('James','USA',20000),
('Kumar',"UK",35000),
('Ajay','UK',50000)]
columns = ['Name','State','Salary']
df = spark.createDataFrame(data=simpleData,schema=columns)
In [35]: df.show(truncate=False)
+-----+-----+------+
|Name |State|Salary|
+-----+-----+------+
|Arun |UK |50000 |
|Arjun|MP |30000 |
|James|USA |20000 |
|Kumar|UK |35000 |
|Ajay |UK |50000 |
+-----+-----+------+
In [15]: df.printSchema()
root
|-- Name: string (nullable = true)
|-- State: string (nullable = true)
|-- Salary: long (nullable = true)
Ranking Functions
row_number()
rank()
dense_rank()
ntile()
percent_rank()
In [36]: from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql.functions import col
In [37]: WindFunc = Window.partitionBy('State').orderBy(col('Salary').desc())
df.withColumn('RN',row_number().over(WindFunc)).show()
+-----+-----+------+---+
| Name|State|Salary| RN|
+-----+-----+------+---+
|James| USA| 20000| 1|
| Arun| UK| 50000| 1|
| Ajay| UK| 50000| 2|
|Kumar| UK| 35000| 3|
|Arjun| MP| 30000| 1|
+-----+-----+------+---+
In [38]: from pyspark.sql.functions import rank
df.withColumn('Person_Rank',rank().over(WindFunc)).show(truncate=False)
+-----+-----+------+-----------+
|Name |State|Salary|Person_Rank|
+-----+-----+------+-----------+
|James|USA |20000 |1 |
|Arun |UK |50000 |1 |
|Ajay |UK |50000 |1 |
|Kumar|UK |35000 |3 |
|Arjun|MP |30000 |1 |
+-----+-----+------+-----------+
In [39]: from pyspark.sql.functions import dense_rank
df.withColumn('Person_Dense_Rank',dense_rank().over(WindFunc)).show(truncate=False)
+-----+-----+------+-----------------+
|Name |State|Salary|Person_Dense_Rank|
+-----+-----+------+-----------------+
|James|USA |20000 |1 |
|Arun |UK |50000 |1 |
|Ajay |UK |50000 |1 |
|Kumar|UK |35000 |2 |
|Arjun|MP |30000 |1 |
+-----+-----+------+-----------------+
In [40]: from pyspark.sql.functions import percent_rank
df.withColumn('percent_rank',percent_rank().over(WindFunc)).show()
+-----+-----+------+------------+
| Name|State|Salary|percent_rank|
+-----+-----+------+------------+
|James| USA| 20000| 0.0|
| Arun| UK| 50000| 0.0|
| Ajay| UK| 50000| 0.0|
|Kumar| UK| 35000| 1.0|
|Arjun| MP| 30000| 0.0|
+-----+-----+------+------------+
In [43]: from pyspark.sql.functions import ntile
df.withColumn("ntile_values",ntile(3).over(WindFunc)).show()
+-----+-----+------+------------+
| Name|State|Salary|ntile_values|
+-----+-----+------+------------+
|James| USA| 20000| 1|
| Arun| UK| 50000| 1|
| Ajay| UK| 50000| 2|
|Kumar| UK| 35000| 3|
|Arjun| MP| 30000| 1|
+-----+-----+------+------------+
In [44]: df.withColumn("ntile_values",ntile(2).over(WindFunc)).show()
+-----+-----+------+------------+
| Name|State|Salary|ntile_values|
+-----+-----+------+------------+
|James| USA| 20000| 1|
| Arun| UK| 50000| 1|
| Ajay| UK| 50000| 1|
|Kumar| UK| 35000| 2|
|Arjun| MP| 30000| 1|
+-----+-----+------+------------+
In [45]: df.withColumn("ntile_values",ntile(1).over(WindFunc)).show()
+-----+-----+------+------------+
| Name|State|Salary|ntile_values|
+-----+-----+------+------------+
|James| USA| 20000| 1|
| Arun| UK| 50000| 1|
| Ajay| UK| 50000| 1|
|Kumar| UK| 35000| 1|
|Arjun| MP| 30000| 1|
+-----+-----+------+------------+
PySpark Window Analytic functions:
cume_dist()
lag
lead
In [47]: from pyspark.sql.functions import cume_dist
df.withColumn("cume_dist_value",cume_dist().over(WindFunc)).show(truncate= False)
+-----+-----+------+------------------+
|Name |State|Salary|cume_dist_value |
+-----+-----+------+------------------+
|James|USA |20000 |1.0 |
|Arun |UK |50000 |0.6666666666666666|
|Ajay |UK |50000 |0.6666666666666666|
|Kumar|UK |35000 |1.0 |
|Arjun|MP |30000 |1.0 |
+-----+-----+------+------------------+
In [48]: from pyspark.sql.functions import lag
df.withColumn("lag_value",lag('salary',1).over(WindFunc)).show()
+-----+-----+------+---------+
| Name|State|Salary|lag_value|
+-----+-----+------+---------+
|James| USA| 20000| null|
| Arun| UK| 50000| null|
| Ajay| UK| 50000| 50000|
|Kumar| UK| 35000| 50000|
|Arjun| MP| 30000| null|
+-----+-----+------+---------+
In [50]: # craeted a new window function without partition
WindFunc_new = Window.orderBy(col('Salary').desc())
df.withColumn('lag_val',lag('salary',1).over(WindFunc_new)).show()
+-----+-----+------+-------+
| Name|State|Salary|lag_val|
+-----+-----+------+-------+
| Arun| UK| 50000| null|
| Ajay| UK| 50000| 50000|
|Kumar| UK| 35000| 50000|
|Arjun| MP| 30000| 35000|
|James| USA| 20000| 30000|
+-----+-----+------+-------+
In [51]: from pyspark.sql.functions import lead
df.withColumn("load_value",lead("salary",1).over(WindFunc)).show(truncate=False)
+-----+-----+------+----------+
|Name |State|Salary|load_value|
+-----+-----+------+----------+
|James|USA |20000 |null |
|Arun |UK |50000 |50000 |
|Ajay |UK |50000 |35000 |
|Kumar|UK |35000 |null |
|Arjun|MP |30000 |null |
+-----+-----+------+----------+
In [53]: df.withColumn('leadValue_without_partition',lead('salary',1).over(WindFunc_new)).show(truncate=False)
df.withColumn('leadValue_without_partition',lead('salary',2).over(WindFunc_new)).show(truncate=False)
+-----+-----+------+---------------------------+
|Name |State|Salary|leadValue_without_partition|
+-----+-----+------+---------------------------+
|Arun |UK |50000 |50000 |
|Ajay |UK |50000 |35000 |
|Kumar|UK |35000 |30000 |
|Arjun|MP |30000 |20000 |
|James|USA |20000 |null |
+-----+-----+------+---------------------------+
+-----+-----+------+---------------------------+
|Name |State|Salary|leadValue_without_partition|
+-----+-----+------+---------------------------+
|Arun |UK |50000 |35000 |
|Ajay |UK |50000 |30000 |
|Kumar|UK |35000 |20000 |
|Arjun|MP |30000 |null |
|James|USA |20000 |null |
+-----+-----+------+---------------------------+
Pyspark Aggregate Functions
min
max
sum
avg
Note: In the below code, I have implemented the aggregate functions.
Please note
that I have used WindFunc(orderBy is mandatory) with row_number since it is a ranking function and windAgg(without orderBy) with the aggregate
functions
In [96]: windAgg = Window.partitionBy("state")
from pyspark.sql.functions import min, max, avg, sum, col, row_number
df.withColumn("row",row_number().over(WindFunc)) \
.withColumn("min",min(col('salary')).over(windAgg)) \
.withColumn('max',max(col('salary')).over(windAgg)) \
.withColumn('avg',avg('salary').over(windAgg)) \
.withColumn('sum',sum('salary').over(windAgg))\
.where(col('row') ==1) \
.select('state','min','max','avg','sum').show()
+-----+-----+-----+-------+------+
|state| min| max| avg| sum|
+-----+-----+-----+-------+------+
| USA|20000|20000|20000.0| 20000|
| UK|35000|50000|45000.0|135000|
| MP|30000|30000|30000.0| 30000|
+-----+-----+-----+-------+------+