7. Windowing Functions

As part of this module let us get into Windowing Functions.

7.1. Starting Spark Context

Let us start spark context for this Notebook so that we can execute the code provided.

import org.apache.spark.sql.SparkSession
val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    appName("Windowing Functions").
    master("yarn").
    getOrCreate
spark.conf.set("spark.sql.shuffle.partitions", "2")

7.2. Overview of Windowing Functions

Let us get an overview of Windowing Functions.

  • First let us understand relevance of these functions using employees data set.

val employeesPath = "/public/hr_db/employees"
val employees = spark.
    read.
    format("csv").
    option("sep", "\t").
    schema("""employee_id INT, 
              first_name STRING, 
              last_name STRING, 
              email STRING,
              phone_number STRING, 
              hire_date STRING, 
              job_id STRING, 
              salary FLOAT,
              commission_pct STRING,
              manager_id STRING, 
              department_id STRING
            """).
    load(employeesPath)
import org.apache.spark.sql.functions.col
employees.
    select($"employee_id", 
           $"department_id".cast("int").alias("department_id"), 
           $"salary"
          ).
    orderBy("department_id", "salary").
    show
  • Let us say we want to compare individual salary with department wise salary expense.

  • Here is one of the approach which require self join.

    • Compute department wise expense usig groupBy and agg.

    • Join with employees again on department_id.

import org.apache.spark.sql.functions.{sum, col}
val department_expense = employees.
    groupBy("department_id").
    agg(sum("salary").alias("expense"))
department_expense.show
employees.
    select("employee_id", "department_id", "salary").
    join(department_expense, employees("department_id") === department_expense("department_id")).
    orderBy(employees("department_id"), $"salary").
    show

However, using this approach is not very efficient and also overly complicated. Windowing functions actually simplify the logic and also runs efficiently

Now let us get into the details related to Windowing functions.

  • Main package org.apache.spark.sql.expressions

  • It has classes such as Window and WindowSpec

  • Window have APIs such as partitionBy, orderBy etc

  • These APIs (such as partitionBy) return WindowSpec object. We can pass WindowSpec object to over on functions such as rank(), dense_rank(), sum() etc

  • Syntax: sum().over(spec) where spec = Window.partitionBy("ColumnName")

Functions

API or Function

Aggregate Functions

  • sum
  • avg
  • min
  • max

Ranking Functions

  • rank
  • dense_rank
  • percent_rank
  • row_number
  • ntile

Analytic Functions

  • cume_dist
  • first
  • last
  • lead
  • lag

7.3. Aggregate Functions

Let us see how to perform aggregations within each group while projecting the raw data that is used to perform the aggregation.

  • We have functions such as sum, avg, min, max etc which can be used to aggregate the data.

  • We need to create WindowSpec object using partitionBy to get aggregations within each group.

  • Typically we don’t need to sort the data to perform aggregations, however if we want to perform cumulative aggregations using rowsBetween, then we have to sort the data using cumulative criteria.

  • Let us try to get total departure delay, minimum departure delay, maximum departure delay and average departure delay for each day for each airport. We will ignore all those flights which are departured early or ontime.

val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
  read.
  parquet(airlines_path)
import org.apache.spark.sql.functions.{col, lit, lpad, concat}
import org.apache.spark.sql.functions.{min, max, sum, avg, round}
import org.apache.spark.sql.expressions.Window
airlines.printSchema
val spec = Window.
    partitionBy("FlightDate", "Origin")
airlines.
    filter("IsDepDelayed = 'YES' and Cancelled = 0").
    select(concat($"Year", 
                  lpad($"Month", 2, "0"), 
                  lpad($"DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           $"Origin",
           $"UniqueCarrier",
           $"FlightNum",
           $"CRSDepTime",
           $"IsDepDelayed",
           $"DepDelay".cast("int").alias("DepDelay")
          ).
    withColumn("DepDelayMin", min("DepDelay").over(spec)).
    withColumn("DepDelayMax", max("DepDelay").over(spec)).
    withColumn("DepDelaySum", sum("DepDelay").over(spec)).
    withColumn("DepDelayAvg", round(avg("DepDelay").over(spec), 2)).
    orderBy("FlightDate", "Origin", "DepDelay").
    show

7.4. Using rowsBetween and rangeBetween

We can get cumulative aggregations using rowsBetween or rangeBetween.

  • We can use rowsBetween to include particular set of rows to perform aggregations.

  • We can use rangeBetween to include particular range of values on a given column.

val spec = Window.
    partitionBy("FlightDate", "Origin").
    orderBy("CRSDepTime").
    rowsBetween(Window.unboundedPreceding, 0)
airlines.
    filter("IsDepDelayed = 'YES' and Cancelled = 0").
    select(concat($"Year", 
                  lpad($"Month", 2, "0"), 
                  lpad($"DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           $"Origin",
           $"UniqueCarrier",
           $"FlightNum",
           $"CRSDepTime",
           $"IsDepDelayed",
           $"DepDelay".cast("int").alias("DepDelay")
          ).
    withColumn("DepDelaySum", sum("DepDelay").over(spec)).
    orderBy("FlightDate", "Origin", "CRSDepTime").
    show
val spec = Window.
    partitionBy("FlightDate", "Origin").
    orderBy("CRSDepTime").
    rowsBetween(-3, 0)
airlines.
    filter("IsDepDelayed = 'YES' and Cancelled = 0").
    select(concat($"Year", 
                  lpad($"Month", 2, "0"), 
                  lpad($"DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           $"Origin",
           $"UniqueCarrier",
           $"FlightNum",
           $"CRSDepTime",
           $"IsDepDelayed",
           $"DepDelay".cast("int").alias("DepDelay")
          ).
    withColumn("DepDelaySum", sum("DepDelay").over(spec)).
    orderBy("FlightDate", "Origin", "CRSDepTime").
    show

7.5. Ranking Functions

We can use ranking functions to assign ranks to a particular record within a partition.

  • Sparse Rank - rank

  • Dense Rank - dense_rank

  • Assigning Row Numbers - row_number

  • Percentage Rank - percent_rank

7.5.1. Tasks

Let us perform few tasks related to ranking.

val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
    read.
    parquet(airlines_path)
import org.apache.spark.sql.functions.{col, lit, lpad, concat}
import org.apache.spark.sql.functions.{rank, dense_rank}
import org.apache.spark.sql.functions.{percent_rank, row_number, round}
import org.apache.spark.sql.expressions.Window
val spec = Window.
    partitionBy("FlightDate", "Origin").
    orderBy(col("DepDelay").desc)
airlines.
    filter("IsDepDelayed = 'YES' and Cancelled = 0").
    select(concat($"Year", 
                  lpad($"Month", 2, "0"), 
                  lpad($"DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           $"Origin",
           $"UniqueCarrier",
           $"FlightNum",
           $"CRSDepTime",
           $"IsDepDelayed",
           $"DepDelay".cast("int").alias("DepDelay")
          ).
    withColumn("srank", rank().over(spec)).
    withColumn("drank", dense_rank().over(spec)).
    withColumn("prank", round(percent_rank().over(spec), 2)).
    withColumn("rn", row_number().over(spec)).
    orderBy($"FlightDate", $"Origin", $"DepDelay".desc).
    show

7.6. Analytic Functions

We can use Analytic Functions to compare current record with previous record or next record.

  • lead and lag are the main functions.

  • We can also compare each of the day of one week with corresponding day of another week.

  • lead and lag serve the same purpose. Depending up on the requirement and sorting of the data we can use either of them.

  • Here the examples are demonstrated using lead. Same can be achieved using lag however while defining the spec we have sort the data with in window in descending order to get similar results.

  • Also we can use first and last functions to get first or last value with in each group or partition based up on sorting criteria. They are typically used to get the details about other fields (for example, we can get employee name or id who is making highest or lowest salary with in a department).

7.6.1. Using LEAD

val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
  read.
  parquet(airlines_path)
import org.apache.spark.sql.functions.{col, lit, lpad, concat}
import org.apache.spark.sql.functions.lead
import org.apache.spark.sql.expressions.Window
val spec = Window.
    partitionBy("FlightDate", "Origin").
    orderBy(col("CRSDepTime"))
airlines.
    filter("IsDepDelayed = 'YES' and Cancelled = 0").
    select(concat($"Year", 
                  lpad($"Month", 2, "0"), 
                  lpad($"DayOfMonth", 2, "0")
                 ).alias("FlightDate"),
           $"Origin",
           $"UniqueCarrier",
           $"FlightNum",
           $"CRSDepTime",
           $"IsDepDelayed",
           $"DepDelay".cast("int").alias("DepDelay")
          ).
    withColumn("LeadUniqueCarrier", lead($"UniqueCarrier", 1).over(spec)).
    withColumn("LeadFlightNum", lead($"FlightNum", 1).over(spec)).
    withColumn("LeadCRSDepTime", lead($"CRSDepTime", 1).over(spec)).
    withColumn("LeadDepDelay", lead($"DepDelay", 1).over(spec)).
    orderBy("FlightDate", "Origin", "CRSDepTime").
    show

7.6.2. Using LEAD with 7

val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
    read.
    parquet(airlines_path)
import org.apache.spark.sql.functions.{col, lit, lpad, concat}
import org.apache.spark.sql.functions.{sum, lead, substring}
import org.apache.spark.sql.expressions.Window
val spec = Window.
    partitionBy(substring($"FlightDate", 1, 6), $"Origin").
    orderBy($"FlightDate", $"TotalDepDelay".desc)
airlines.
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ).
    groupBy(concat($"Year", 
                   lpad($"Month", 2, "0"), 
                   lpad($"DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            $"Origin"
           ).
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")).
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)).
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)).
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)).
    filter("Origin = 'ORD'").
    orderBy($"FlightDate", $"TotalDepDelay".desc).
    show
airlines.
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ).
    groupBy(concat($"Year", 
                   lpad($"Month", 2, "0"), 
                   lpad($"DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            $"Origin"
           ).
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")).
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)).
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)).
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)).
    filter("Origin = 'ORD' AND FlightDate BETWEEN 20080101 AND 20080107").
    orderBy($"FlightDate", $"TotalDepDelay".desc).
    show
airlines.
    filter("""IsDepDelayed = 'YES' 
              AND Cancelled = 0
              AND concat(Year, 
                         lpad(Month, 2, '0'),
                         lpad(DayOfMonth, 2, '0')
                        ) BETWEEN 20080101 AND 20080114
              AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
           """
          ).
    groupBy(concat($"Year", 
                   lpad($"Month", 2, "0"), 
                   lpad($"DayOfMonth", 2, "0")
                  ).alias("FlightDate"), 
            $"Origin"
           ).
    agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")).
    withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)).
    withColumn("LeadOrigin", lead("Origin", 7).over(spec)).
    withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)).
    filter("FlightDate BETWEEN 20080101 AND 20080107").
    orderBy($"FlightDate", $"TotalDepDelay".desc).
    show