7. Windowing Functions

As part of this module let us get into Windowing Functions.

7.1. Starting Spark Context

Let us start spark context for this Notebook so that we can execute the code provided.

from pyspark.sql import SparkSession

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    appName('Windowing Functions'). \
    master('yarn'). \
    getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', '2')

7.2. Overview of Windowing Functions

Let us get an overview of Windowing Functions.

  • First let us understand relevance of these functions using employees data set.

employeesPath = '/public/hr_db/employees'
employees = spark. \
    read. \
    format('csv'). \
    option('sep', '\t'). \
    schema('''employee_id INT, 
              first_name STRING, 
              last_name STRING, 
              email STRING,
              phone_number STRING, 
              hire_date STRING, 
              job_id STRING, 
              salary FLOAT,
              commission_pct STRING,
              manager_id STRING, 
              department_id STRING
            '''). \
    load(employeesPath)
from pyspark.sql.functions import col
employees. \
    select('employee_id', 
           col('department_id').cast('int').alias('department_id'), 
           'salary'
          ). \
    orderBy('department_id', 'salary'). \
    show()
+-----------+-------------+-------+
|employee_id|department_id| salary|
+-----------+-------------+-------+
|        178|         null| 7000.0|
|        200|           10| 4400.0|
|        202|           20| 6000.0|
|        201|           20|13000.0|
|        119|           30| 2500.0|
|        118|           30| 2600.0|
|        117|           30| 2800.0|
|        116|           30| 2900.0|
|        115|           30| 3100.0|
|        114|           30|11000.0|
|        203|           40| 6500.0|
|        132|           50| 2100.0|
|        136|           50| 2200.0|
|        128|           50| 2200.0|
|        127|           50| 2400.0|
|        135|           50| 2400.0|
|        140|           50| 2500.0|
|        144|           50| 2500.0|
|        191|           50| 2500.0|
|        182|           50| 2500.0|
+-----------+-------------+-------+
only showing top 20 rows
  • Let us say we want to compare individual salary with department wise salary expense.

  • Here is one of the approach which require self join.

    • Compute department wise expense usig groupBy and agg.

    • Join with employees again on department_id.

from pyspark.sql.functions import sum, col
department_expense = employees. \
    groupBy('department_id'). \
    agg(sum('salary').alias('expense'))
department_expense.show()
+-------------+--------+
|department_id| expense|
+-------------+--------+
|           80|304500.0|
|           90| 58000.0|
|           60| 28800.0|
|           20| 19000.0|
|           70| 10000.0|
|          110| 20300.0|
|          100| 51600.0|
|           30| 24900.0|
|           50|156400.0|
|           10|  4400.0|
|           40|  6500.0|
|         null|  7000.0|
+-------------+--------+
employees. \
    select('employee_id', 'department_id', 'salary'). \
    join(department_expense, employees.department_id == department_expense.department_id). \
    orderBy(employees.department_id, col('salary')). \
    show()
+-----------+-------------+-------+-------------+--------+
|employee_id|department_id| salary|department_id| expense|
+-----------+-------------+-------+-------------+--------+
|        200|           10| 4400.0|           10|  4400.0|
|        113|          100| 6900.0|          100| 51600.0|
|        111|          100| 7700.0|          100| 51600.0|
|        112|          100| 7800.0|          100| 51600.0|
|        110|          100| 8200.0|          100| 51600.0|
|        109|          100| 9000.0|          100| 51600.0|
|        108|          100|12000.0|          100| 51600.0|
|        206|          110| 8300.0|          110| 20300.0|
|        205|          110|12000.0|          110| 20300.0|
|        202|           20| 6000.0|           20| 19000.0|
|        201|           20|13000.0|           20| 19000.0|
|        119|           30| 2500.0|           30| 24900.0|
|        118|           30| 2600.0|           30| 24900.0|
|        117|           30| 2800.0|           30| 24900.0|
|        116|           30| 2900.0|           30| 24900.0|
|        115|           30| 3100.0|           30| 24900.0|
|        114|           30|11000.0|           30| 24900.0|
|        203|           40| 6500.0|           40|  6500.0|
|        132|           50| 2100.0|           50|156400.0|
|        128|           50| 2200.0|           50|156400.0|
+-----------+-------------+-------+-------------+--------+
only showing top 20 rows

However, using this approach is not very efficient and also overly complicated. Windowing functions actually simplify the logic and also runs efficiently

Now let us get into the details related to Windowing functions.

  • Main package pyspark.sql.window

  • It has classes such as Window and WindowSpec

  • Window have APIs such as partitionBy, orderBy etc

  • These APIs (such as partitionBy) return WindowSpec object. We can pass WindowSpec object to over on functions such as rank(), dense_rank(), sum() etc

  • Syntax: sum().over(spec) where spec = Window.partitionBy('ColumnName')

Functions

API or Function

Aggregate Functions

  • sum
  • avg
  • min
  • max

Ranking Functions

  • rank
  • dense_rank
  • percent_rank
  • row_number
  • ntile

Analytic Functions

  • cume_dist
  • first
  • last
  • lead
  • lag

7.3. Aggregate Functions

Let us see how to perform aggregations within each group while projecting the raw data that is used to perform the aggregation.

  • We have functions such as sum, avg, min, max etc which can be used to aggregate the data.

  • We need to create WindowSpec object using partitionBy to get aggregations within each group.

  • Typically we don’t need to sort the data to perform aggregations, however if we want to perform cumulative aggregations using rowsBetween, then we have to sort the data using cumulative criteria.

  • Let us try to get total departure delay, minimum departure delay, maximum departure delay and average departure delay for each day for each airport. We will ignore all those flights which are departured early or ontime.

airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
  read. \
  parquet(airlines_path)
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import min, max, sum, avg
from pyspark.sql.window import Window
airlines.printSchema()
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- CarrierDelay: string (nullable = true)
 |-- WeatherDelay: string (nullable = true)
 |-- NASDelay: string (nullable = true)
 |-- SecurityDelay: string (nullable = true)
 |-- LateAircraftDelay: string (nullable = true)
 |-- IsArrDelayed: string (nullable = true)
 |-- IsDepDelayed: string (nullable = true)
spec = Window. \
    partitionBy("FlightDate", "Origin")
airlines. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelayMin", min("DepDelay").over(spec)). \
    withColumn("DepDelayMax", max("DepDelay").over(spec)). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    withColumn("DepDelayAvg", avg("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "DepDelay"). \
    show()
+----------+------+-------------+---------+----------+------------+--------+-----------+-----------+-----------+------------------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelayMin|DepDelayMax|DepDelaySum|       DepDelayAvg|
+----------+------+-------------+---------+----------+------------+--------+-----------+-----------+-----------+------------------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|          1|        175|        487|            60.875|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|          1|        175|        487|            60.875|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|          1|        175|        487|            60.875|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|          1|        175|        487|            60.875|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|          1|        175|        487|            60.875|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|          1|        175|        487|            60.875|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|          1|        175|        487|            60.875|
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|          1|        175|        487|            60.875|
|  20080101|   ABI|           MQ|     3214|      1735|         YES|       3|          3|          3|          3|               3.0|
|  20080101|   ABQ|           WN|     2976|      1040|         YES|       1|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|      972|      1810|         YES|       1|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|       61|      1320|         YES|       1|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|     3425|      1440|         YES|       2|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|       88|       755|         YES|       2|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|     2284|      1520|         YES|       3|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           XE|     2771|      1430|         YES|       3|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|     1493|      2020|         YES|       4|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|      360|      1800|         YES|       5|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|      644|      1725|         YES|       5|          1|        218|       1580|32.916666666666664|
|  20080101|   ABQ|           WN|      729|      1530|         YES|       7|          1|        218|       1580|32.916666666666664|
+----------+------+-------------+---------+----------+------------+--------+-----------+-----------+-----------+------------------+
only showing top 20 rows

7.4. Using rowsBetween and rangeBetween

We can get cumulative aggregations using rowsBetween or rangeBetween.

  • We can use rowsBetween to include particular set of rows to perform aggregations.

  • We can use rangeBetween to include particular range of values on a given column.

spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy("CRSDepTime"). \
    rowsBetween(Window.unboundedPreceding, 0)
airlines. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show()
+----------+------+-------------+---------+----------+------------+--------+-----------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelaySum|
+----------+------+-------------+---------+----------+------------+--------+-----------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|          1|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|         71|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|        208|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|        230|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|        264|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|        278|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|        312|
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|        487|
|  20080101|   ABI|           MQ|     3214|      1735|         YES|       3|          3|
|  20080101|   ABQ|           DL|     1601|       740|         YES|      65|         65|
|  20080101|   ABQ|           WN|       88|       755|         YES|       2|         67|
|  20080101|   ABQ|           UA|      782|       805|         YES|      48|        115|
|  20080101|   ABQ|           AA|     1814|       925|         YES|     100|        215|
|  20080101|   ABQ|           WN|     2976|      1040|         YES|       1|        216|
|  20080101|   ABQ|           WN|     1185|      1140|         YES|       9|        225|
|  20080101|   ABQ|           WN|     1297|      1155|         YES|      12|        237|
|  20080101|   ABQ|           WN|     2480|      1200|         YES|      16|        253|
|  20080101|   ABQ|           WN|      137|      1220|         YES|      10|        263|
|  20080101|   ABQ|           WN|     1328|      1245|         YES|      78|        341|
|  20080101|   ABQ|           WN|     3245|      1250|         YES|      24|        365|
+----------+------+-------------+---------+----------+------------+--------+-----------+
only showing top 20 rows
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy("CRSDepTime"). \
    rowsBetween(-3, 0)
airlines. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show()
+----------+------+-------------+---------+----------+------------+--------+-----------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelaySum|
+----------+------+-------------+---------+----------+------------+--------+-----------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|          1|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|         71|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|        208|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|        230|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|        263|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|        207|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|        104|
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|        257|
|  20080101|   ABI|           MQ|     3214|      1735|         YES|       3|          3|
|  20080101|   ABQ|           DL|     1601|       740|         YES|      65|         65|
|  20080101|   ABQ|           WN|       88|       755|         YES|       2|         67|
|  20080101|   ABQ|           UA|      782|       805|         YES|      48|        115|
|  20080101|   ABQ|           AA|     1814|       925|         YES|     100|        215|
|  20080101|   ABQ|           WN|     2976|      1040|         YES|       1|        151|
|  20080101|   ABQ|           WN|     1185|      1140|         YES|       9|        158|
|  20080101|   ABQ|           WN|     1297|      1155|         YES|      12|        122|
|  20080101|   ABQ|           WN|     2480|      1200|         YES|      16|         38|
|  20080101|   ABQ|           WN|      137|      1220|         YES|      10|         47|
|  20080101|   ABQ|           WN|     1328|      1245|         YES|      78|        116|
|  20080101|   ABQ|           WN|     3245|      1250|         YES|      24|        128|
+----------+------+-------------+---------+----------+------------+--------+-----------+
only showing top 20 rows

7.5. Ranking Functions

We can use ranking functions to assign ranks to a particular record within a partition.

  • Sparse Rank - rank

  • Dense Rank - dense_rank

  • Assigning Row Numbers - row_number

  • Percentage Rank - percent_rank

7.5.1. Tasks

Let us perform few tasks related to ranking.

airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
    read. \
    parquet(airlines_path)
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import rank, dense_rank
from pyspark.sql.functions import percent_rank, row_number, round
from pyspark.sql.window import Window
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy(col("DepDelay").desc())
airlines. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("srank", rank().over(spec)). \
    withColumn("drank", dense_rank().over(spec)). \
    withColumn("prank", round(percent_rank().over(spec), 2)). \
    withColumn("rn", row_number().over(spec)). \
    orderBy("FlightDate", "Origin", col("DepDelay").desc()). \
    show()
+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|srank|drank|prank| rn|
+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|    1|    1|  0.0|  1|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|    2|    2| 0.14|  2|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|    3|    3| 0.29|  3|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|    4|    4| 0.43|  4|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|    4|    4| 0.43|  5|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|    6|    5| 0.71|  6|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|    7|    6| 0.86|  7|
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|    8|    7|  1.0|  8|
|  20080101|   ABI|           MQ|     3214|      1735|         YES|       3|    1|    1|  0.0|  1|
|  20080101|   ABQ|           WN|      823|      2045|         YES|     218|    1|    1|  0.0|  1|
|  20080101|   ABQ|           WN|      357|      1525|         YES|     171|    2|    2| 0.02|  2|
|  20080101|   ABQ|           AA|     1814|       925|         YES|     100|    3|    3| 0.04|  3|
|  20080101|   ABQ|           WN|     1178|      1845|         YES|      94|    4|    4| 0.06|  4|
|  20080101|   ABQ|           WN|     2497|      1825|         YES|      93|    5|    5| 0.09|  5|
|  20080101|   ABQ|           WN|     3481|      1500|         YES|      78|    6|    6| 0.11|  6|
|  20080101|   ABQ|           WN|     1328|      1245|         YES|      78|    6|    6| 0.11|  7|
|  20080101|   ABQ|           XE|      221|      1850|         YES|      68|    8|    7| 0.15|  8|
|  20080101|   ABQ|           DL|     1601|       740|         YES|      65|    9|    8| 0.17|  9|
|  20080101|   ABQ|           WN|       45|      1435|         YES|      64|   10|    9| 0.19| 10|
|  20080101|   ABQ|           WN|     2788|      1755|         YES|      53|   11|   10| 0.21| 11|
+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
only showing top 20 rows

7.6. Analytic Functions

We can use Analytic Functions to compare current record with previous record or next record.

  • lead and lag are the main functions.

  • We can also compare each of the day of one week with corresponding day of another week.

  • lead and lag serve the same purpose. Depending up on the requirement and sorting of the data we can use either of them.

  • Here the examples are demonstrated using lead. Same can be achieved using lag however while defining the spec we have sort the data with in window in descending order to get similar results.

  • Also we can use first and last functions to get first or last value with in each group or partition based up on sorting criteria. They are typically used to get the details about other fields (for example, we can get employee name or id who is making highest or lowest salary with in a department).

7.6.1. Using LEAD

airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
  read. \
  parquet(airlines_path)
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import lead
from pyspark.sql.window import Window
spec = Window. \
    partitionBy("FlightDate", "Origin"). \
    orderBy(col("CRSDepTime"))
airlines. \
    filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
    select(concat("Year", 
                  lpad("Month", 2, "0"), 
                  lpad("DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           "Origin",
           "UniqueCarrier",
           "FlightNum",
           "CRSDepTime",
           "IsDepDelayed",
           col("DepDelay").cast("int").alias("DepDelay")
          ). \
    withColumn("LeadUniqueCarrier", lead("UniqueCarrier").over(spec)). \
    withColumn("LeadFlightNum", lead("FlightNum").over(spec)). \
    withColumn("LeadCRSDepTime", lead("CRSDepTime").over(spec)). \
    withColumn("LeadDepDelay", lead("DepDelay").over(spec)). \
    orderBy("FlightDate", "Origin", "CRSDepTime"). \
    show()
+----------+------+-------------+---------+----------+------------+--------+-----------------+-------------+--------------+------------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|LeadUniqueCarrier|LeadFlightNum|LeadCRSDepTime|LeadDepDelay|
+----------+------+-------------+---------+----------+------------+--------+-----------------+-------------+--------------+------------+
|  20080101|   ABE|           OO|     5873|       720|         YES|       1|               9E|         2940|          1215|          70|
|  20080101|   ABE|           9E|     2940|      1215|         YES|      70|               YV|         7263|          1230|         137|
|  20080101|   ABE|           YV|     7263|      1230|         YES|     137|               XE|         2578|          1410|          22|
|  20080101|   ABE|           XE|     2578|      1410|         YES|      22|               9E|         2936|          1615|          34|
|  20080101|   ABE|           9E|     2936|      1615|         YES|      34|               OH|         5457|          1720|          14|
|  20080101|   ABE|           OH|     5457|      1720|         YES|      14|               XE|         2594|          1740|          34|
|  20080101|   ABE|           XE|     2594|      1740|         YES|      34|               YV|         7138|          1741|         175|
|  20080101|   ABE|           YV|     7138|      1741|         YES|     175|             null|         null|          null|        null|
|  20080101|   ABI|           MQ|     3214|      1735|         YES|       3|             null|         null|          null|        null|
|  20080101|   ABQ|           DL|     1601|       740|         YES|      65|               WN|           88|           755|           2|
|  20080101|   ABQ|           WN|       88|       755|         YES|       2|               UA|          782|           805|          48|
|  20080101|   ABQ|           UA|      782|       805|         YES|      48|               AA|         1814|           925|         100|
|  20080101|   ABQ|           AA|     1814|       925|         YES|     100|               WN|         2976|          1040|           1|
|  20080101|   ABQ|           WN|     2976|      1040|         YES|       1|               WN|         1185|          1140|           9|
|  20080101|   ABQ|           WN|     1185|      1140|         YES|       9|               WN|         1297|          1155|          12|
|  20080101|   ABQ|           WN|     1297|      1155|         YES|      12|               WN|         2480|          1200|          16|
|  20080101|   ABQ|           WN|     2480|      1200|         YES|      16|               WN|          137|          1220|          10|
|  20080101|   ABQ|           WN|      137|      1220|         YES|      10|               WN|         1328|          1245|          78|
|  20080101|   ABQ|           WN|     1328|      1245|         YES|      78|               WN|         3245|          1250|          24|
|  20080101|   ABQ|           WN|     3245|      1250|         YES|      24|               WN|           61|          1320|           1|
+----------+------+-------------+---------+----------+------------+--------+-----------------+-------------+--------------+------------+
only showing top 20 rows

7.6.2. Using LEAD with 7

airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
    read. \
    parquet(airlines_path)
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import sum, lead, substring
from pyspark.sql.window import Window
spec = Window. \
    partitionBy(substring("FlightDate", 1, 6), "Origin"). \
    orderBy("FlightDate", col("TotalDepDelay").desc())
airlines. \
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ). \
    groupBy(concat("Year", 
                   lpad("Month", 2, "0"), 
                   lpad("DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            "Origin"
           ). \
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
    filter('Origin = "ORD"'). \
    orderBy("FlightDate", col("TotalDepDelay").desc()). \
    show()
+----------+------+-------------+--------------+----------+-----------------+
|FlightDate|Origin|TotalDepDelay|LeadFlightDate|LeadOrigin|LeadTotalDepDelay|
+----------+------+-------------+--------------+----------+-----------------+
|  20080101|   ORD|        49353|      20080108|       ORD|            35658|
|  20080102|   ORD|        41545|      20080109|       ORD|            10075|
|  20080103|   ORD|        15784|      20080110|       ORD|            18431|
|  20080104|   ORD|        13442|      20080111|       ORD|            15372|
|  20080105|   ORD|        23800|      20080112|       ORD|             5785|
|  20080106|   ORD|        31148|      20080113|       ORD|             9630|
|  20080107|   ORD|        47817|      20080114|       ORD|            24969|
+----------+------+-------------+--------------+----------+-----------------+
airlines. \
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ). \
    groupBy(concat("Year", 
                   lpad("Month", 2, "0"), 
                   lpad("DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            "Origin"
           ). \
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
    filter('Origin = "ORD" AND FlightDate BETWEEN 20080101 AND 20080107'). \
    orderBy("FlightDate", col("TotalDepDelay").desc()). \
    show()
+----------+------+-------------+--------------+----------+-----------------+
|FlightDate|Origin|TotalDepDelay|LeadFlightDate|LeadOrigin|LeadTotalDepDelay|
+----------+------+-------------+--------------+----------+-----------------+
|  20080101|   ORD|        49353|      20080108|       ORD|            35658|
|  20080102|   ORD|        41545|      20080109|       ORD|            10075|
|  20080103|   ORD|        15784|      20080110|       ORD|            18431|
|  20080104|   ORD|        13442|      20080111|       ORD|            15372|
|  20080105|   ORD|        23800|      20080112|       ORD|             5785|
|  20080106|   ORD|        31148|      20080113|       ORD|             9630|
|  20080107|   ORD|        47817|      20080114|       ORD|            24969|
+----------+------+-------------+--------------+----------+-----------------+
airlines. \
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ). \
    groupBy(concat("Year", 
                   lpad("Month", 2, "0"), 
                   lpad("DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            "Origin"
           ). \
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
    filter('FlightDate BETWEEN 20080101 AND 20080107'). \
    orderBy("FlightDate", col("TotalDepDelay").desc()). \
    show()
+----------+------+-------------+--------------+----------+-----------------+
|FlightDate|Origin|TotalDepDelay|LeadFlightDate|LeadOrigin|LeadTotalDepDelay|
+----------+------+-------------+--------------+----------+-----------------+
|  20080101|   ORD|        49353|      20080108|       ORD|            35658|
|  20080101|   DFW|        13741|      20080108|       DFW|             8277|
|  20080101|   ATL|        11592|      20080108|       ATL|             9988|
|  20080101|   LAX|        10360|      20080108|       LAX|             8767|
|  20080101|   JFK|         6948|      20080108|       JFK|             2261|
|  20080101|   SFO|         5359|      20080108|       SFO|            18095|
|  20080102|   ORD|        41545|      20080109|       ORD|            10075|
|  20080102|   ATL|        25127|      20080109|       ATL|             6404|
|  20080102|   DFW|        12827|      20080109|       DFW|             4532|
|  20080102|   JFK|         9734|      20080109|       JFK|             1980|
|  20080102|   LAX|         9253|      20080109|       LAX|             4121|
|  20080102|   SFO|         6150|      20080109|       SFO|             8033|
|  20080103|   ATL|        22603|      20080110|       ATL|             9760|
|  20080103|   SFO|        21957|      20080110|       SFO|            17650|
|  20080103|   ORD|        15784|      20080110|       ORD|            18431|
|  20080103|   LAX|        11837|      20080110|       LAX|             6327|
|  20080103|   DFW|         9585|      20080110|       DFW|             6786|
|  20080103|   JFK|         8031|      20080110|       JFK|             1722|
|  20080104|   SFO|        40137|      20080111|       SFO|             8734|
|  20080104|   LAX|        26441|      20080111|       LAX|             5852|
+----------+------+-------------+--------------+----------+-----------------+
only showing top 20 rows