7. Windowing Functions¶
As part of this module let us get into Windowing Functions.
7.1. Starting Spark Context¶
Let us start spark context for this Notebook so that we can execute the code provided.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
appName("Windowing Functions").
master("yarn").
getOrCreate
spark.conf.set("spark.sql.shuffle.partitions", "2")
7.2. Overview of Windowing Functions¶
Let us get an overview of Windowing Functions.
First let us understand relevance of these functions using
employees
data set.
val employeesPath = "/public/hr_db/employees"
val employees = spark.
read.
format("csv").
option("sep", "\t").
schema("""employee_id INT,
first_name STRING,
last_name STRING,
email STRING,
phone_number STRING,
hire_date STRING,
job_id STRING,
salary FLOAT,
commission_pct STRING,
manager_id STRING,
department_id STRING
""").
load(employeesPath)
import org.apache.spark.sql.functions.col
employees.
select($"employee_id",
$"department_id".cast("int").alias("department_id"),
$"salary"
).
orderBy("department_id", "salary").
show
Let us say we want to compare individual salary with department wise salary expense.
Here is one of the approach which require self join.
Compute department wise expense usig
groupBy
andagg
.Join with employees again on department_id.
import org.apache.spark.sql.functions.{sum, col}
val department_expense = employees.
groupBy("department_id").
agg(sum("salary").alias("expense"))
department_expense.show
employees.
select("employee_id", "department_id", "salary").
join(department_expense, employees("department_id") === department_expense("department_id")).
orderBy(employees("department_id"), $"salary").
show
However, using this approach is not very efficient and also overly complicated. Windowing functions actually simplify the logic and also runs efficiently
Now let us get into the details related to Windowing functions.
Main package
org.apache.spark.sql.expressions
It has classes such as
Window
andWindowSpec
Window
have APIs such aspartitionBy
,orderBy
etcThese APIs (such as
partitionBy
) returnWindowSpec
object. We can passWindowSpec
object to over on functions such asrank()
,dense_rank()
,sum()
etcSyntax:
sum().over(spec)
wherespec = Window.partitionBy("ColumnName")
Functions |
API or Function |
---|---|
Aggregate Functions |
|
Ranking Functions |
|
Analytic Functions |
|
7.3. Aggregate Functions¶
Let us see how to perform aggregations within each group while projecting the raw data that is used to perform the aggregation.
We have functions such as
sum
,avg
,min
,max
etc which can be used to aggregate the data.We need to create
WindowSpec
object usingpartitionBy
to get aggregations within each group.Typically we don’t need to sort the data to perform aggregations, however if we want to perform cumulative aggregations using rowsBetween, then we have to sort the data using cumulative criteria.
Let us try to get total departure delay, minimum departure delay, maximum departure delay and average departure delay for each day for each airport. We will ignore all those flights which are departured early or ontime.
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlines_path)
import org.apache.spark.sql.functions.{col, lit, lpad, concat}
import org.apache.spark.sql.functions.{min, max, sum, avg, round}
import org.apache.spark.sql.expressions.Window
airlines.printSchema
val spec = Window.
partitionBy("FlightDate", "Origin")
airlines.
filter("IsDepDelayed = 'YES' and Cancelled = 0").
select(concat($"Year",
lpad($"Month", 2, "0"),
lpad($"DayOfMonth", 2, "0")
).alias("FlightDate"),
$"Origin",
$"UniqueCarrier",
$"FlightNum",
$"CRSDepTime",
$"IsDepDelayed",
$"DepDelay".cast("int").alias("DepDelay")
).
withColumn("DepDelayMin", min("DepDelay").over(spec)).
withColumn("DepDelayMax", max("DepDelay").over(spec)).
withColumn("DepDelaySum", sum("DepDelay").over(spec)).
withColumn("DepDelayAvg", round(avg("DepDelay").over(spec), 2)).
orderBy("FlightDate", "Origin", "DepDelay").
show
7.4. Using rowsBetween and rangeBetween¶
We can get cumulative aggregations using rowsBetween
or rangeBetween
.
We can use
rowsBetween
to include particular set of rows to perform aggregations.We can use
rangeBetween
to include particular range of values on a given column.
val spec = Window.
partitionBy("FlightDate", "Origin").
orderBy("CRSDepTime").
rowsBetween(Window.unboundedPreceding, 0)
airlines.
filter("IsDepDelayed = 'YES' and Cancelled = 0").
select(concat($"Year",
lpad($"Month", 2, "0"),
lpad($"DayOfMonth", 2, "0")
).alias("FlightDate"),
$"Origin",
$"UniqueCarrier",
$"FlightNum",
$"CRSDepTime",
$"IsDepDelayed",
$"DepDelay".cast("int").alias("DepDelay")
).
withColumn("DepDelaySum", sum("DepDelay").over(spec)).
orderBy("FlightDate", "Origin", "CRSDepTime").
show
val spec = Window.
partitionBy("FlightDate", "Origin").
orderBy("CRSDepTime").
rowsBetween(-3, 0)
airlines.
filter("IsDepDelayed = 'YES' and Cancelled = 0").
select(concat($"Year",
lpad($"Month", 2, "0"),
lpad($"DayOfMonth", 2, "0")
).alias("FlightDate"),
$"Origin",
$"UniqueCarrier",
$"FlightNum",
$"CRSDepTime",
$"IsDepDelayed",
$"DepDelay".cast("int").alias("DepDelay")
).
withColumn("DepDelaySum", sum("DepDelay").over(spec)).
orderBy("FlightDate", "Origin", "CRSDepTime").
show
7.5. Ranking Functions¶
We can use ranking functions to assign ranks to a particular record within a partition.
Sparse Rank - rank
Dense Rank - dense_rank
Assigning Row Numbers - row_number
Percentage Rank - percent_rank
7.5.1. Tasks¶
Let us perform few tasks related to ranking.
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlines_path)
import org.apache.spark.sql.functions.{col, lit, lpad, concat}
import org.apache.spark.sql.functions.{rank, dense_rank}
import org.apache.spark.sql.functions.{percent_rank, row_number, round}
import org.apache.spark.sql.expressions.Window
val spec = Window.
partitionBy("FlightDate", "Origin").
orderBy(col("DepDelay").desc)
airlines.
filter("IsDepDelayed = 'YES' and Cancelled = 0").
select(concat($"Year",
lpad($"Month", 2, "0"),
lpad($"DayOfMonth", 2, "0")
).alias("FlightDate"),
$"Origin",
$"UniqueCarrier",
$"FlightNum",
$"CRSDepTime",
$"IsDepDelayed",
$"DepDelay".cast("int").alias("DepDelay")
).
withColumn("srank", rank().over(spec)).
withColumn("drank", dense_rank().over(spec)).
withColumn("prank", round(percent_rank().over(spec), 2)).
withColumn("rn", row_number().over(spec)).
orderBy($"FlightDate", $"Origin", $"DepDelay".desc).
show
7.6. Analytic Functions¶
We can use Analytic Functions to compare current record with previous record or next record.
lead
andlag
are the main functions.We can also compare each of the day of one week with corresponding day of another week.
lead
andlag
serve the same purpose. Depending up on the requirement and sorting of the data we can use either of them.Here the examples are demonstrated using
lead
. Same can be achieved usinglag
however while defining the spec we have sort the data with in window in descending order to get similar results.Also we can use
first
andlast
functions to get first or last value with in each group or partition based up on sorting criteria. They are typically used to get the details about other fields (for example, we can get employee name or id who is making highest or lowest salary with in a department).
7.6.1. Using LEAD¶
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlines_path)
import org.apache.spark.sql.functions.{col, lit, lpad, concat}
import org.apache.spark.sql.functions.lead
import org.apache.spark.sql.expressions.Window
val spec = Window.
partitionBy("FlightDate", "Origin").
orderBy(col("CRSDepTime"))
airlines.
filter("IsDepDelayed = 'YES' and Cancelled = 0").
select(concat($"Year",
lpad($"Month", 2, "0"),
lpad($"DayOfMonth", 2, "0")
).alias("FlightDate"),
$"Origin",
$"UniqueCarrier",
$"FlightNum",
$"CRSDepTime",
$"IsDepDelayed",
$"DepDelay".cast("int").alias("DepDelay")
).
withColumn("LeadUniqueCarrier", lead($"UniqueCarrier", 1).over(spec)).
withColumn("LeadFlightNum", lead($"FlightNum", 1).over(spec)).
withColumn("LeadCRSDepTime", lead($"CRSDepTime", 1).over(spec)).
withColumn("LeadDepDelay", lead($"DepDelay", 1).over(spec)).
orderBy("FlightDate", "Origin", "CRSDepTime").
show
7.6.2. Using LEAD with 7¶
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlines_path)
import org.apache.spark.sql.functions.{col, lit, lpad, concat}
import org.apache.spark.sql.functions.{sum, lead, substring}
import org.apache.spark.sql.expressions.Window
val spec = Window.
partitionBy(substring($"FlightDate", 1, 6), $"Origin").
orderBy($"FlightDate", $"TotalDepDelay".desc)
airlines.
filter("""IsDepDelayed = 'YES'
AND Cancelled = 0
AND concat(Year,
lpad(Month, 2, '0'),
lpad(DayOfMonth, 2, '0')
) BETWEEN 20080101 AND 20080114
AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
"""
).
groupBy(concat($"Year",
lpad($"Month", 2, "0"),
lpad($"DayOfMonth", 2, "0")
).alias("FlightDate"),
$"Origin"
).
agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")).
withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)).
withColumn("LeadOrigin", lead("Origin", 7).over(spec)).
withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)).
filter("Origin = 'ORD'").
orderBy($"FlightDate", $"TotalDepDelay".desc).
show
airlines.
filter("""IsDepDelayed = 'YES'
AND Cancelled = 0
AND concat(Year,
lpad(Month, 2, '0'),
lpad(DayOfMonth, 2, '0')
) BETWEEN 20080101 AND 20080114
AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
"""
).
groupBy(concat($"Year",
lpad($"Month", 2, "0"),
lpad($"DayOfMonth", 2, "0")
).alias("FlightDate"),
$"Origin"
).
agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")).
withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)).
withColumn("LeadOrigin", lead("Origin", 7).over(spec)).
withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)).
filter("Origin = 'ORD' AND FlightDate BETWEEN 20080101 AND 20080107").
orderBy($"FlightDate", $"TotalDepDelay".desc).
show
airlines.
filter("""IsDepDelayed = 'YES'
AND Cancelled = 0
AND concat(Year,
lpad(Month, 2, '0'),
lpad(DayOfMonth, 2, '0')
) BETWEEN 20080101 AND 20080114
AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
"""
).
groupBy(concat($"Year",
lpad($"Month", 2, "0"),
lpad($"DayOfMonth", 2, "0")
).alias("FlightDate"),
$"Origin"
).
agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")).
withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)).
withColumn("LeadOrigin", lead("Origin", 7).over(spec)).
withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)).
filter("FlightDate BETWEEN 20080101 AND 20080107").
orderBy($"FlightDate", $"TotalDepDelay".desc).
show