Chapter 17 : Window operations

Chapter Learning Objectives

  • Various window operations on data frame.

Chapter Outline

import pyspark
from pyspark.sql import functions as func
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
from IPython.display import display_html
import pandas as pd 
import numpy as np
def display_side_by_side(*args):
    html_str=''
    for df in args:
        html_str+=df.to_html(index=False)
        html_str+= "\xa0\xa0\xa0"*10
    display_html(html_str.replace('table','table style="display:inline"'),raw=True)
space = "\xa0" * 10
import panel as pn

css = """
div.special_table + table, th, td {
  border: 3px solid orange;
}
"""
pn.extension(raw_css=[css])

1. Window Operations

What are Window Functions?

A window function performs a calculation across a set of table rows that are somehow related to the current row. This is comparable to the type of calculation that can be done with an aggregate function. But unlike regular aggregate functions, use of a window function does not cause rows to become grouped into a single output row — the rows retain their separate identities. Behind the scenes, the window function is able to access more than just the current row of the query result.

In the DataFrame API, we provide utility functions to define a window specification. Taking Python as an example, users can specify partitioning expressions and ordering expressions as follows.

from pyspark.sql.window import Window windowSpec =
Window
.partitionBy(…)
.orderBy(…)

In addition to the ordering and partitioning, users need to define the start boundary of the frame, the end boundary of the frame, and the type of the frame, which are three components of a frame specification.

There are five types of boundaries, which are unboundedPreceding, unboundedFollowing, currentRow, Preceding, and Following.

unboundedPreceding and unboundedFollowing represent the first row of the partition and the last row of the partition, respectively.

For the other three types of boundaries, they specify the offset from the position of the current input row and their specific meanings are defined based on the type of the frame.

There are two types of frames, ROW frame and RANGE frame.

ROW frame

ROW frames are based on physical offsets from the position of the current input row, which means thatcurrentRow, Preceding, and Following specifies a physical offset.

If currentRow is used as a boundary, it represents the current input row. Preceding and Following describes the number of rows appear before and after the current input row, respectively.

The following figure illustrates a ROW frame with a 1 Preceding as the start boundary and 1 FOLLOWING as the end boundary (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING in the SQL syntax).

df = spark.createDataFrame([
    ("sales",10,6000),("hr",7,3000),("it",5,5000),("sales",2,6000),
    ("hr",3,2000),("hr",4,6000),("it",8,8000),("sales",9,5000),
    ("sales",1,7000),("it",6,6000)],
    ["dept_id","emp_id","salary"])
d1 = df.toPandas()
print(d1.to_string(index=False))#show(truncate=False)print()
dept_id  emp_id  salary
  sales      10    6000
     hr       7    3000
     it       5    5000
  sales       2    6000
     hr       3    2000
     hr       4    6000
     it       8    8000
  sales       9    5000
  sales       1    7000
     it       6    6000

1a. How to calculate a new column for each group whose row value is equal to sum of the current row and previous 2 rows?

../_images/17.png
../_images/27.png
from pyspark.sql import functions as func
from pyspark.sql import Window
window = Window.partitionBy("dept_id").orderBy("emp_id").rowsBetween(-2,  0)
print(df.withColumn("sum", func.sum("salary").over(window)).toPandas().to_string(index=False))#show()
dept_id  emp_id  salary   sum
  sales       1    7000  7000
  sales       2    6000 13000
  sales       9    5000 18000
  sales      10    6000 17000
     hr       3    2000  2000
     hr       4    6000  8000
     hr       7    3000 11000
     it       5    5000  5000
     it       6    6000 11000
     it       8    8000 19000

1b. How to calculate a new column for each group whose row value is equal to sum of the current row and 2 following rows?

../_images/36.png
window = Window.partitionBy("dept_id").orderBy("emp_id").rowsBetween(0, 2)
print(df.withColumn("sum", func.sum("salary").over(window)).toPandas().to_string(index=False))#show()
dept_id  emp_id  salary   sum
  sales       1    7000 18000
  sales       2    6000 17000
  sales       9    5000 11000
  sales      10    6000  6000
     hr       3    2000 11000
     hr       4    6000  9000
     hr       7    3000  3000
     it       5    5000 19000
     it       6    6000 14000
     it       8    8000  8000

1c. How to calculate a new column whose row value is equal to sum of the current row and following 2 rows?

../_images/46.png
window = Window.rowsBetween(-2,  0)
print(df.withColumn("sum", func.sum("salary").over(window)).toPandas().to_string(index=False))#show()
dept_id  emp_id  salary   sum
  sales      10    6000  6000
     hr       7    3000  9000
     it       5    5000 14000
  sales       2    6000 14000
     hr       3    2000 13000
     hr       4    6000 14000
     it       8    8000 16000
  sales       9    5000 19000
  sales       1    7000 20000
     it       6    6000 18000

1d. How to calculate a new column whose row value is equal to sum of the current row and following 1 row?

../_images/55.png
window = Window.rowsBetween(0,  1)
df.withColumn("sum", func.sum("salary").over(window)).show()
print(df.withColumn("sum", func.sum("salary").over(window)).toPandas().to_string(index=False))
+-------+------+------+-----+
|dept_id|emp_id|salary|  sum|
+-------+------+------+-----+
|  sales|    10|  6000| 9000|
|     hr|     7|  3000| 8000|
|     it|     5|  5000|11000|
|  sales|     2|  6000| 8000|
|     hr|     3|  2000| 8000|
|     hr|     4|  6000|14000|
|     it|     8|  8000|13000|
|  sales|     9|  5000|12000|
|  sales|     1|  7000|13000|
|     it|     6|  6000| 6000|
+-------+------+------+-----+
dept_id  emp_id  salary   sum
  sales      10    6000  9000
     hr       7    3000  8000
     it       5    5000 11000
  sales       2    6000  8000
     hr       3    2000  8000
     hr       4    6000 14000
     it       8    8000 13000
  sales       9    5000 12000
  sales       1    7000 13000
     it       6    6000  6000

1e. How to calculate the dense rank?

../_images/64.png
window = Window.partitionBy("dept_id").orderBy(func.col("salary").desc())
print(df.withColumn("rank", func.dense_rank().over(window)).toPandas().to_string(index=False))#show()
dept_id  emp_id  salary  rank
  sales       1    7000     1
  sales      10    6000     2
  sales       2    6000     2
  sales       9    5000     3
     hr       4    6000     1
     hr       7    3000     2
     hr       3    2000     3
     it       8    8000     1
     it       6    6000     2
     it       5    5000     3

1f. How to calculate the rank?

../_images/73.png
window = Window.partitionBy("dept_id").orderBy(func.col("salary").desc())
print(df.withColumn("rank", func.rank().over(window)).toPandas().to_string(index=False))#show()
dept_id  emp_id  salary  rank
  sales       1    7000     1
  sales      10    6000     2
  sales       2    6000     2
  sales       9    5000     4
     hr       4    6000     1
     hr       7    3000     2
     hr       3    2000     3
     it       8    8000     1
     it       6    6000     2
     it       5    5000     3

1g. How to calculate the ntile?

../_images/82.png
window = Window.partitionBy("dept_id").orderBy(func.col("salary").desc())
print(df.withColumn("salary_bucket", func.ntile(4).over(window)).toPandas().to_string(index=False))#show()
dept_id  emp_id  salary  salary_bucket
  sales       1    7000              1
  sales      10    6000              2
  sales       2    6000              3
  sales       9    5000              4
     hr       4    6000              1
     hr       7    3000              2
     hr       3    2000              3
     it       8    8000              1
     it       6    6000              2
     it       5    5000              3

1h. How to calculate the lag?

../_images/91.png
window = Window.partitionBy("dept_id").orderBy(func.col("salary").desc())
print(df.withColumn("previousrow_salary", func.lag('salary',1).over(window)).toPandas().to_string(index=False))#.show()
dept_id  emp_id  salary  previousrow_salary
  sales       1    7000                 NaN
  sales      10    6000              7000.0
  sales       2    6000              6000.0
  sales       9    5000              6000.0
     hr       4    6000                 NaN
     hr       7    3000              6000.0
     hr       3    2000              3000.0
     it       8    8000                 NaN
     it       6    6000              8000.0
     it       5    5000              6000.0

1i. How to calculate the lead?

../_images/101.png
window = Window.partitionBy("dept_id").orderBy(func.col("salary").desc())
print(df.withColumn("nextrow_salary", func.lead('salary',1).over(window)).toPandas().to_string(index=False))#show()
dept_id  emp_id  salary  nextrow_salary
  sales       1    7000          6000.0
  sales      10    6000          6000.0
  sales       2    6000          5000.0
  sales       9    5000             NaN
     hr       4    6000          3000.0
     hr       7    3000          2000.0
     hr       3    2000             NaN
     it       8    8000          6000.0
     it       6    6000          5000.0
     it       5    5000             NaN

1j. How to calculate the percent rank?

../_images/111.png
window = Window.partitionBy("dept_id").orderBy(func.col("salary"))
print(df.withColumn("percentile", func.percent_rank().over(window)).toPandas().to_string(index=False))#show()
dept_id  emp_id  salary  percentile
  sales       9    5000    0.000000
  sales      10    6000    0.333333
  sales       2    6000    0.333333
  sales       1    7000    1.000000
     hr       3    2000    0.000000
     hr       7    3000    0.500000
     hr       4    6000    1.000000
     it       5    5000    0.000000
     it       6    6000    0.500000
     it       8    8000    1.000000

1k. How to calculate the row number?

../_images/121.png
window = Window.partitionBy("dept_id").orderBy(func.col("salary"))
print(df.withColumn("row_no", func.row_number().over(window)).toPandas().to_string(index=False))#show()
dept_id  emp_id  salary  row_no
  sales       9    5000       1
  sales      10    6000       2
  sales       2    6000       3
  sales       1    7000       4
     hr       3    2000       1
     hr       7    3000       2
     hr       4    6000       3
     it       5    5000       1
     it       6    6000       2
     it       8    8000       3

1l. How to calculate the cume dist?

../_images/131.png
window = Window.partitionBy("dept_id").orderBy(func.col("salary"))
print(df.withColumn("cume_dist", func.cume_dist().over(window)).toPandas().to_string(index=False))#show()
dept_id  emp_id  salary  cume_dist
  sales       9    5000   0.250000
  sales      10    6000   0.750000
  sales       2    6000   0.750000
  sales       1    7000   1.000000
     hr       3    2000   0.333333
     hr       7    3000   0.666667
     hr       4    6000   1.000000
     it       5    5000   0.333333
     it       6    6000   0.666667
     it       8    8000   1.000000