# Windowing Functions

As part of this module let us get into Windowing Functions.

## Starting Spark Context

Let us start spark context for this Notebook so that we can execute the code provided.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    appName('Windowing Functions'). \
    master('yarn'). \
    getOrCreate()

In [2]:
spark.conf.set('spark.sql.shuffle.partitions', '2')

## Overview of Windowing Functions

Let us get an overview of Windowing Functions.

 * First let us understand relevance of these functions using `employees` data set.

In [3]:
employeesPath = '/public/hr_db/employees'

In [4]:
employees = spark. \
    read. \
    format('csv'). \
    option('sep', '\t'). \
    schema('''employee_id INT, 
              first_name STRING, 
              last_name STRING, 
              email STRING,
              phone_number STRING, 
              hire_date STRING, 
              job_id STRING, 
              salary FLOAT,
              commission_pct STRING,
              manager_id STRING, 
              department_id STRING
            '''). \
    load(employeesPath)

In [7]:
from pyspark.sql.functions import col
employees. \
    select('employee_id', 
           col('department_id').cast('int').alias('department_id'), 
           'salary'
          ). \
    orderBy('department_id', 'salary'). \
    show()

+-----------+-------------+-------+
|employee_id|department_id| salary|
+-----------+-------------+-------+
|        178|         null| 7000.0|
|        200|           10| 4400.0|
|        202|           20| 6000.0|
|        201|           20|13000.0|
|        119|           30| 2500.0|
|        118|           30| 2600.0|
|        117|           30| 2800.0|
|        116|           30| 2900.0|
|        115|           30| 3100.0|
|        114|           30|11000.0|
|        203|           40| 6500.0|
|        132|           50| 2100.0|
|        136|           50| 2200.0|
|        128|           50| 2200.0|
|        127|           50| 2400.0|
|        135|           50| 2400.0|
|        140|           50| 2500.0|
|        144|           50| 2500.0|
|        191|           50| 2500.0|
|        182|           50| 2500.0|
+-----------+-------------+-------+
only showing top 20 rows



* Let us say we want to compare individual salary with department wise salary expense.
* Here is one of the approach which require self join.
  * Compute department wise expense usig `groupBy` and `agg`.
  * Join with **employees** again on department_id.

In [9]:
from pyspark.sql.functions import sum, col

In [10]:
department_expense = employees. \
    groupBy('department_id'). \
    agg(sum('salary').alias('expense'))

In [11]:
department_expense.show()

+-------------+--------+
|department_id| expense|
+-------------+--------+
|           80|304500.0|
|           90| 58000.0|
|           60| 28800.0|
|           20| 19000.0|
|           70| 10000.0|
|          110| 20300.0|
|          100| 51600.0|
|           30| 24900.0|
|           50|156400.0|
|           10|  4400.0|
|           40|  6500.0|
|         null|  7000.0|
+-------------+--------+



In [12]:
employees. \
    select('employee_id', 'department_id', 'salary'). \
    join(department_expense, employees.department_id == department_expense.department_id). \
    orderBy(employees.department_id, col('salary')). \
    show()

+-----------+-------------+-------+-------------+--------+
|employee_id|department_id| salary|department_id| expense|
+-----------+-------------+-------+-------------+--------+
|        200|           10| 4400.0|           10|  4400.0|
|        113|          100| 6900.0|          100| 51600.0|
|        111|          100| 7700.0|          100| 51600.0|
|        112|          100| 7800.0|          100| 51600.0|
|        110|          100| 8200.0|          100| 51600.0|
|        109|          100| 9000.0|          100| 51600.0|
|        108|          100|12000.0|          100| 51600.0|
|        206|          110| 8300.0|          110| 20300.0|
|        205|          110|12000.0|          110| 20300.0|
|        202|           20| 6000.0|           20| 19000.0|
|        201|           20|13000.0|           20| 19000.0|
|        119|           30| 2500.0|           30| 24900.0|
|        118|           30| 2600.0|           30| 24900.0|
|        117|           30| 2800.0|           30| 24900.

 **However, using this approach is not very efficient and also overly complicated. Windowing functions actually simplify the logic and also runs efficiently**
 
Now let us get into the details related to Windowing functions.
 * Main package `pyspark.sql.window`
 * It has classes such as `Window` and `WindowSpec`
 * `Window` have APIs such as `partitionBy`, `orderBy` etc
 * These APIs (such as `partitionBy`) return `WindowSpec` object. We can pass `WindowSpec` object to over on functions such as `rank()`, `dense_rank()`, `sum()` etc
 * Syntax: `sum().over(spec)` where `spec = Window.partitionBy('ColumnName')`

| Functions        | API or Function      |
| ------------- |:-------------:|
| Aggregate Functions      | <ul><li>sum</li><li>avg</li><li>min</li><li>max</li></ul> |
| Ranking Functions      | <ul><li>rank</li><li>dense_rank</li></ul><ul><li>percent_rank</li><li>row_number</li> <li>ntile</li></ul> |
| Analytic Functions      | <ul><li>cume_dist</li><li>first</li><li>last</li><li>lead</li> <li>lag</li></ul> |

## Aggregate Functions

Let us see how to perform aggregations within each group while projecting the raw data that is used to perform the aggregation.

 * We have functions such as `sum`, `avg`, `min`, `max` etc which can be used to aggregate the data.
 * We need to create `WindowSpec` object using `partitionBy` to get aggregations within each group.
 * Typically we donâ€™t need to sort the data to perform aggregations, however if we want to perform cumulative aggregations using rowsBetween, then we have to sort the data using cumulative criteria.
 * Let us try to get total departure delay, minimum departure delay, maximum departure delay and average departure delay for each day for each airport. We will ignore all those flights which are departured early or ontime.

In [13]:
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

In [14]:
airlines = spark. \
  read. \
  parquet(airlines_path)

In [15]:
from pyspark.sql.functions import col, lit, lpad, concat

In [16]:
from pyspark.sql.functions import min, max, sum, avg

In [17]:
from pyspark.sql.window import Window

In [18]:
airlines.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- Car

In [19]:
spec = Window. \
    partitionBy("FlightDate", "Origin")

In [20]:
airlines. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelayMin", min("DepDelay").over(spec)). \
    withColumn("DepDelayMax", max("DepDelay").over(spec)). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    withColumn("DepDelayAvg", avg("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "DepDelay"). \
    show()

+----------+------+-------------+---------+----------+------------+--------+-----------+-----------+-----------+------------------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelayMin|DepDelayMax|DepDelaySum|       DepDelayAvg|
+----------+------+-------------+---------+----------+------------+--------+-----------+-----------+-----------+------------------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|          1|        175|        487|            60.875|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|          1|        175|        487|            60.875|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|          1|        175|        487|            60.875|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|          1|        175|        487|            60.875|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|

## Using rowsBetween and rangeBetween

We can get cumulative aggregations using `rowsBetween` or `rangeBetween`.

* We can use `rowsBetween` to include particular set of rows to perform aggregations.
* We can use `rangeBetween` to include particular range of values on a given column.

In [21]:
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy("CRSDepTime"). \
    rowsBetween(Window.unboundedPreceding, 0)

In [22]:
airlines. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show()

+----------+------+-------------+---------+----------+------------+--------+-----------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelaySum|
+----------+------+-------------+---------+----------+------------+--------+-----------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|          1|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|         71|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|        208|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|        230|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|        264|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|        278|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|        312|
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|        487|
|  20080101|   ABI|  

In [23]:
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy("CRSDepTime"). \
    rowsBetween(-3, 0)

In [24]:
airlines. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show()

+----------+------+-------------+---------+----------+------------+--------+-----------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelaySum|
+----------+------+-------------+---------+----------+------------+--------+-----------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|          1|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|         71|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|        208|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|        230|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|        263|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|        207|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|        104|
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|        257|
|  20080101|   ABI|  

## Ranking Functions

We can use ranking functions to assign ranks to a particular record within a partition.

* Sparse Rank - rank
* Dense Rank - dense_rank
* Assigning Row Numbers - row_number
* Percentage Rank - percent_rank

### Tasks

Let us perform few tasks related to ranking.

In [28]:
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

In [29]:
airlines = spark. \
    read. \
    parquet(airlines_path)

In [30]:
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import rank, dense_rank
from pyspark.sql.functions import percent_rank, row_number, round
from pyspark.sql.window import Window

In [31]:
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy(col("DepDelay").desc())

In [32]:
airlines. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("srank", rank().over(spec)). \
    withColumn("drank", dense_rank().over(spec)). \
    withColumn("prank", round(percent_rank().over(spec), 2)). \
    withColumn("rn", row_number().over(spec)). \
    orderBy("FlightDate", "Origin", col("DepDelay").desc()). \
    show()

+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|srank|drank|prank| rn|
+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|    1|    1|  0.0|  1|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|    2|    2| 0.14|  2|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|    3|    3| 0.29|  3|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|    4|    4| 0.43|  4|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|    4|    4| 0.43|  5|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|    6|    5| 0.71|  6|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|    7|    6| 0.86|  7|
|  2008010

## Analytic Functions

We can use Analytic Functions to compare current record with previous record or next record.
* `lead` and `lag` are the main functions.
* We can also compare each of the day of one week with corresponding day of another week.
* `lead` and `lag` serve the same purpose. Depending up on the requirement and sorting of the data we can use either of them.
* Here the examples are demonstrated using `lead`. Same can be achieved using `lag` however while defining the spec we have sort the data with in window in descending order to get similar results.
* Also we can use `first` and `last` functions to get first or last value with in each group or partition based up on sorting criteria. They are typically used to get the details about other fields (for example, we can get employee name or id who is making highest or lowest salary with in a department).

### Using LEAD

In [33]:
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

In [34]:
airlines = spark. \
  read. \
  parquet(airlines_path)

In [35]:
from pyspark.sql.functions import col, lit, lpad, concat

In [36]:
from pyspark.sql.functions import lead

In [37]:
from pyspark.sql.window import Window

In [38]:
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy(col("CRSDepTime"))

In [39]:
airlines. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("LeadUniqueCarrier", lead("UniqueCarrier").over(spec)). \
    withColumn("LeadFlightNum", lead("FlightNum").over(spec)). \
    withColumn("LeadCRSDepTime", lead("CRSDepTime").over(spec)). \
    withColumn("LeadDepDelay", lead("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show()

+----------+------+-------------+---------+----------+------------+--------+-----------------+-------------+--------------+------------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|LeadUniqueCarrier|LeadFlightNum|LeadCRSDepTime|LeadDepDelay|
+----------+------+-------------+---------+----------+------------+--------+-----------------+-------------+--------------+------------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|               9E|         2940|          1215|          70|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|               YV|         7263|          1230|         137|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|               XE|         2578|          1410|          22|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|               9E|         2936|          1615|          34|
|  20080101|   ABE|           9E|     293

### Using LEAD with 7

In [42]:
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

In [43]:
airlines = spark. \
    read. \
    parquet(airlines_path)

In [44]:
from pyspark.sql.functions import col, lit, lpad, concat

In [45]:
from pyspark.sql.functions import sum, lead, substring

In [46]:
from pyspark.sql.window import Window

In [48]:
spec = Window. \
    partitionBy(substring("FlightDate", 1, 6), "Origin"). \
    orderBy("FlightDate", col("TotalDepDelay").desc())

In [53]:
airlines. \
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ). \
    groupBy(concat("Year", 
                   lpad("Month", 2, "0"), 
                   lpad("DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            "Origin"
           ). \
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
    filter('Origin = "ORD"'). \
    orderBy("FlightDate", col("TotalDepDelay").desc()). \
    show()

+----------+------+-------------+--------------+----------+-----------------+
|FlightDate|Origin|TotalDepDelay|LeadFlightDate|LeadOrigin|LeadTotalDepDelay|
+----------+------+-------------+--------------+----------+-----------------+
|  20080101|   ORD|        49353|      20080108|       ORD|            35658|
|  20080102|   ORD|        41545|      20080109|       ORD|            10075|
|  20080103|   ORD|        15784|      20080110|       ORD|            18431|
|  20080104|   ORD|        13442|      20080111|       ORD|            15372|
|  20080105|   ORD|        23800|      20080112|       ORD|             5785|
|  20080106|   ORD|        31148|      20080113|       ORD|             9630|
|  20080107|   ORD|        47817|      20080114|       ORD|            24969|
+----------+------+-------------+--------------+----------+-----------------+



In [54]:
airlines. \
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ). \
    groupBy(concat("Year", 
                   lpad("Month", 2, "0"), 
                   lpad("DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            "Origin"
           ). \
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
    filter('Origin = "ORD" AND FlightDate BETWEEN 20080101 AND 20080107'). \
    orderBy("FlightDate", col("TotalDepDelay").desc()). \
    show()

+----------+------+-------------+--------------+----------+-----------------+
|FlightDate|Origin|TotalDepDelay|LeadFlightDate|LeadOrigin|LeadTotalDepDelay|
+----------+------+-------------+--------------+----------+-----------------+
|  20080101|   ORD|        49353|      20080108|       ORD|            35658|
|  20080102|   ORD|        41545|      20080109|       ORD|            10075|
|  20080103|   ORD|        15784|      20080110|       ORD|            18431|
|  20080104|   ORD|        13442|      20080111|       ORD|            15372|
|  20080105|   ORD|        23800|      20080112|       ORD|             5785|
|  20080106|   ORD|        31148|      20080113|       ORD|             9630|
|  20080107|   ORD|        47817|      20080114|       ORD|            24969|
+----------+------+-------------+--------------+----------+-----------------+



In [55]:
airlines. \
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ). \
    groupBy(concat("Year", 
                   lpad("Month", 2, "0"), 
                   lpad("DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            "Origin"
           ). \
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
    filter('FlightDate BETWEEN 20080101 AND 20080107'). \
    orderBy("FlightDate", col("TotalDepDelay").desc()). \
    show()

+----------+------+-------------+--------------+----------+-----------------+
|FlightDate|Origin|TotalDepDelay|LeadFlightDate|LeadOrigin|LeadTotalDepDelay|
+----------+------+-------------+--------------+----------+-----------------+
|  20080101|   ORD|        49353|      20080108|       ORD|            35658|
|  20080101|   DFW|        13741|      20080108|       DFW|             8277|
|  20080101|   ATL|        11592|      20080108|       ATL|             9988|
|  20080101|   LAX|        10360|      20080108|       LAX|             8767|
|  20080101|   JFK|         6948|      20080108|       JFK|             2261|
|  20080101|   SFO|         5359|      20080108|       SFO|            18095|
|  20080102|   ORD|        41545|      20080109|       ORD|            10075|
|  20080102|   ATL|        25127|      20080109|       ATL|             6404|
|  20080102|   DFW|        12827|      20080109|       DFW|             4532|
|  20080102|   JFK|         9734|      20080109|       JFK|     