7. Windowing Functions¶
As part of this module let us get into Windowing Functions.
7.1. Starting Spark Context¶
Let us start spark context for this Notebook so that we can execute the code provided.
from pyspark.sql import SparkSession
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
appName('Windowing Functions'). \
master('yarn'). \
getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', '2')
7.2. Overview of Windowing Functions¶
Let us get an overview of Windowing Functions.
First let us understand relevance of these functions using
employees
data set.
employeesPath = '/public/hr_db/employees'
employees = spark. \
read. \
format('csv'). \
option('sep', '\t'). \
schema('''employee_id INT,
first_name STRING,
last_name STRING,
email STRING,
phone_number STRING,
hire_date STRING,
job_id STRING,
salary FLOAT,
commission_pct STRING,
manager_id STRING,
department_id STRING
'''). \
load(employeesPath)
from pyspark.sql.functions import col
employees. \
select('employee_id',
col('department_id').cast('int').alias('department_id'),
'salary'
). \
orderBy('department_id', 'salary'). \
show()
+-----------+-------------+-------+
|employee_id|department_id| salary|
+-----------+-------------+-------+
| 178| null| 7000.0|
| 200| 10| 4400.0|
| 202| 20| 6000.0|
| 201| 20|13000.0|
| 119| 30| 2500.0|
| 118| 30| 2600.0|
| 117| 30| 2800.0|
| 116| 30| 2900.0|
| 115| 30| 3100.0|
| 114| 30|11000.0|
| 203| 40| 6500.0|
| 132| 50| 2100.0|
| 136| 50| 2200.0|
| 128| 50| 2200.0|
| 127| 50| 2400.0|
| 135| 50| 2400.0|
| 140| 50| 2500.0|
| 144| 50| 2500.0|
| 191| 50| 2500.0|
| 182| 50| 2500.0|
+-----------+-------------+-------+
only showing top 20 rows
Let us say we want to compare individual salary with department wise salary expense.
Here is one of the approach which require self join.
Compute department wise expense usig
groupBy
andagg
.Join with employees again on department_id.
from pyspark.sql.functions import sum, col
department_expense = employees. \
groupBy('department_id'). \
agg(sum('salary').alias('expense'))
department_expense.show()
+-------------+--------+
|department_id| expense|
+-------------+--------+
| 80|304500.0|
| 90| 58000.0|
| 60| 28800.0|
| 20| 19000.0|
| 70| 10000.0|
| 110| 20300.0|
| 100| 51600.0|
| 30| 24900.0|
| 50|156400.0|
| 10| 4400.0|
| 40| 6500.0|
| null| 7000.0|
+-------------+--------+
employees. \
select('employee_id', 'department_id', 'salary'). \
join(department_expense, employees.department_id == department_expense.department_id). \
orderBy(employees.department_id, col('salary')). \
show()
+-----------+-------------+-------+-------------+--------+
|employee_id|department_id| salary|department_id| expense|
+-----------+-------------+-------+-------------+--------+
| 200| 10| 4400.0| 10| 4400.0|
| 113| 100| 6900.0| 100| 51600.0|
| 111| 100| 7700.0| 100| 51600.0|
| 112| 100| 7800.0| 100| 51600.0|
| 110| 100| 8200.0| 100| 51600.0|
| 109| 100| 9000.0| 100| 51600.0|
| 108| 100|12000.0| 100| 51600.0|
| 206| 110| 8300.0| 110| 20300.0|
| 205| 110|12000.0| 110| 20300.0|
| 202| 20| 6000.0| 20| 19000.0|
| 201| 20|13000.0| 20| 19000.0|
| 119| 30| 2500.0| 30| 24900.0|
| 118| 30| 2600.0| 30| 24900.0|
| 117| 30| 2800.0| 30| 24900.0|
| 116| 30| 2900.0| 30| 24900.0|
| 115| 30| 3100.0| 30| 24900.0|
| 114| 30|11000.0| 30| 24900.0|
| 203| 40| 6500.0| 40| 6500.0|
| 132| 50| 2100.0| 50|156400.0|
| 128| 50| 2200.0| 50|156400.0|
+-----------+-------------+-------+-------------+--------+
only showing top 20 rows
However, using this approach is not very efficient and also overly complicated. Windowing functions actually simplify the logic and also runs efficiently
Now let us get into the details related to Windowing functions.
Main package
pyspark.sql.window
It has classes such as
Window
andWindowSpec
Window
have APIs such aspartitionBy
,orderBy
etcThese APIs (such as
partitionBy
) returnWindowSpec
object. We can passWindowSpec
object to over on functions such asrank()
,dense_rank()
,sum()
etcSyntax:
sum().over(spec)
wherespec = Window.partitionBy('ColumnName')
Functions |
API or Function |
---|---|
Aggregate Functions |
|
Ranking Functions |
|
Analytic Functions |
|
7.3. Aggregate Functions¶
Let us see how to perform aggregations within each group while projecting the raw data that is used to perform the aggregation.
We have functions such as
sum
,avg
,min
,max
etc which can be used to aggregate the data.We need to create
WindowSpec
object usingpartitionBy
to get aggregations within each group.Typically we don’t need to sort the data to perform aggregations, however if we want to perform cumulative aggregations using rowsBetween, then we have to sort the data using cumulative criteria.
Let us try to get total departure delay, minimum departure delay, maximum departure delay and average departure delay for each day for each airport. We will ignore all those flights which are departured early or ontime.
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
read. \
parquet(airlines_path)
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import min, max, sum, avg
from pyspark.sql.window import Window
airlines.printSchema()
root
|-- Year: integer (nullable = true)
|-- Month: integer (nullable = true)
|-- DayofMonth: integer (nullable = true)
|-- DayOfWeek: integer (nullable = true)
|-- DepTime: string (nullable = true)
|-- CRSDepTime: integer (nullable = true)
|-- ArrTime: string (nullable = true)
|-- CRSArrTime: integer (nullable = true)
|-- UniqueCarrier: string (nullable = true)
|-- FlightNum: integer (nullable = true)
|-- TailNum: string (nullable = true)
|-- ActualElapsedTime: string (nullable = true)
|-- CRSElapsedTime: integer (nullable = true)
|-- AirTime: string (nullable = true)
|-- ArrDelay: string (nullable = true)
|-- DepDelay: string (nullable = true)
|-- Origin: string (nullable = true)
|-- Dest: string (nullable = true)
|-- Distance: string (nullable = true)
|-- TaxiIn: string (nullable = true)
|-- TaxiOut: string (nullable = true)
|-- Cancelled: integer (nullable = true)
|-- CancellationCode: string (nullable = true)
|-- Diverted: integer (nullable = true)
|-- CarrierDelay: string (nullable = true)
|-- WeatherDelay: string (nullable = true)
|-- NASDelay: string (nullable = true)
|-- SecurityDelay: string (nullable = true)
|-- LateAircraftDelay: string (nullable = true)
|-- IsArrDelayed: string (nullable = true)
|-- IsDepDelayed: string (nullable = true)
spec = Window. \
partitionBy("FlightDate", "Origin")
airlines. \
filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
select(concat("Year",
lpad("Month", 2, "0"),
lpad("DayOfMonth", 2, "0")
).alias("FlightDate"),
"Origin",
"UniqueCarrier",
"FlightNum",
"CRSDepTime",
"IsDepDelayed",
col("DepDelay").cast("int").alias("DepDelay")
). \
withColumn("DepDelayMin", min("DepDelay").over(spec)). \
withColumn("DepDelayMax", max("DepDelay").over(spec)). \
withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
withColumn("DepDelayAvg", avg("DepDelay").over(spec)). \
orderBy("FlightDate", "Origin", "DepDelay"). \
show()
+----------+------+-------------+---------+----------+------------+--------+-----------+-----------+-----------+------------------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelayMin|DepDelayMax|DepDelaySum| DepDelayAvg|
+----------+------+-------------+---------+----------+------------+--------+-----------+-----------+-----------+------------------+
| 20080101| ABE| OO| 5873| 720| YES| 1| 1| 175| 487| 60.875|
| 20080101| ABE| OH| 5457| 1720| YES| 14| 1| 175| 487| 60.875|
| 20080101| ABE| XE| 2578| 1410| YES| 22| 1| 175| 487| 60.875|
| 20080101| ABE| 9E| 2936| 1615| YES| 34| 1| 175| 487| 60.875|
| 20080101| ABE| XE| 2594| 1740| YES| 34| 1| 175| 487| 60.875|
| 20080101| ABE| 9E| 2940| 1215| YES| 70| 1| 175| 487| 60.875|
| 20080101| ABE| YV| 7263| 1230| YES| 137| 1| 175| 487| 60.875|
| 20080101| ABE| YV| 7138| 1741| YES| 175| 1| 175| 487| 60.875|
| 20080101| ABI| MQ| 3214| 1735| YES| 3| 3| 3| 3| 3.0|
| 20080101| ABQ| WN| 2976| 1040| YES| 1| 1| 218| 1580|32.916666666666664|
| 20080101| ABQ| WN| 972| 1810| YES| 1| 1| 218| 1580|32.916666666666664|
| 20080101| ABQ| WN| 61| 1320| YES| 1| 1| 218| 1580|32.916666666666664|
| 20080101| ABQ| WN| 3425| 1440| YES| 2| 1| 218| 1580|32.916666666666664|
| 20080101| ABQ| WN| 88| 755| YES| 2| 1| 218| 1580|32.916666666666664|
| 20080101| ABQ| WN| 2284| 1520| YES| 3| 1| 218| 1580|32.916666666666664|
| 20080101| ABQ| XE| 2771| 1430| YES| 3| 1| 218| 1580|32.916666666666664|
| 20080101| ABQ| WN| 1493| 2020| YES| 4| 1| 218| 1580|32.916666666666664|
| 20080101| ABQ| WN| 360| 1800| YES| 5| 1| 218| 1580|32.916666666666664|
| 20080101| ABQ| WN| 644| 1725| YES| 5| 1| 218| 1580|32.916666666666664|
| 20080101| ABQ| WN| 729| 1530| YES| 7| 1| 218| 1580|32.916666666666664|
+----------+------+-------------+---------+----------+------------+--------+-----------+-----------+-----------+------------------+
only showing top 20 rows
7.4. Using rowsBetween and rangeBetween¶
We can get cumulative aggregations using rowsBetween
or rangeBetween
.
We can use
rowsBetween
to include particular set of rows to perform aggregations.We can use
rangeBetween
to include particular range of values on a given column.
spec = Window. \
partitionBy("FlightDate", "Origin"). \
orderBy("CRSDepTime"). \
rowsBetween(Window.unboundedPreceding, 0)
airlines. \
filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
select(concat("Year",
lpad("Month", 2, "0"),
lpad("DayOfMonth", 2, "0")
).alias("FlightDate"),
"Origin",
"UniqueCarrier",
"FlightNum",
"CRSDepTime",
"IsDepDelayed",
col("DepDelay").cast("int").alias("DepDelay")
). \
withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
orderBy("FlightDate", "Origin", "CRSDepTime"). \
show()
+----------+------+-------------+---------+----------+------------+--------+-----------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelaySum|
+----------+------+-------------+---------+----------+------------+--------+-----------+
| 20080101| ABE| OO| 5873| 720| YES| 1| 1|
| 20080101| ABE| 9E| 2940| 1215| YES| 70| 71|
| 20080101| ABE| YV| 7263| 1230| YES| 137| 208|
| 20080101| ABE| XE| 2578| 1410| YES| 22| 230|
| 20080101| ABE| 9E| 2936| 1615| YES| 34| 264|
| 20080101| ABE| OH| 5457| 1720| YES| 14| 278|
| 20080101| ABE| XE| 2594| 1740| YES| 34| 312|
| 20080101| ABE| YV| 7138| 1741| YES| 175| 487|
| 20080101| ABI| MQ| 3214| 1735| YES| 3| 3|
| 20080101| ABQ| DL| 1601| 740| YES| 65| 65|
| 20080101| ABQ| WN| 88| 755| YES| 2| 67|
| 20080101| ABQ| UA| 782| 805| YES| 48| 115|
| 20080101| ABQ| AA| 1814| 925| YES| 100| 215|
| 20080101| ABQ| WN| 2976| 1040| YES| 1| 216|
| 20080101| ABQ| WN| 1185| 1140| YES| 9| 225|
| 20080101| ABQ| WN| 1297| 1155| YES| 12| 237|
| 20080101| ABQ| WN| 2480| 1200| YES| 16| 253|
| 20080101| ABQ| WN| 137| 1220| YES| 10| 263|
| 20080101| ABQ| WN| 1328| 1245| YES| 78| 341|
| 20080101| ABQ| WN| 3245| 1250| YES| 24| 365|
+----------+------+-------------+---------+----------+------------+--------+-----------+
only showing top 20 rows
spec = Window. \
partitionBy("FlightDate", "Origin"). \
orderBy("CRSDepTime"). \
rowsBetween(-3, 0)
airlines. \
filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
select(concat("Year",
lpad("Month", 2, "0"),
lpad("DayOfMonth", 2, "0")
).alias("FlightDate"),
"Origin",
"UniqueCarrier",
"FlightNum",
"CRSDepTime",
"IsDepDelayed",
col("DepDelay").cast("int").alias("DepDelay")
). \
withColumn("DepDelaySum", sum("DepDelay").over(spec)). \
orderBy("FlightDate", "Origin", "CRSDepTime"). \
show()
+----------+------+-------------+---------+----------+------------+--------+-----------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|DepDelaySum|
+----------+------+-------------+---------+----------+------------+--------+-----------+
| 20080101| ABE| OO| 5873| 720| YES| 1| 1|
| 20080101| ABE| 9E| 2940| 1215| YES| 70| 71|
| 20080101| ABE| YV| 7263| 1230| YES| 137| 208|
| 20080101| ABE| XE| 2578| 1410| YES| 22| 230|
| 20080101| ABE| 9E| 2936| 1615| YES| 34| 263|
| 20080101| ABE| OH| 5457| 1720| YES| 14| 207|
| 20080101| ABE| XE| 2594| 1740| YES| 34| 104|
| 20080101| ABE| YV| 7138| 1741| YES| 175| 257|
| 20080101| ABI| MQ| 3214| 1735| YES| 3| 3|
| 20080101| ABQ| DL| 1601| 740| YES| 65| 65|
| 20080101| ABQ| WN| 88| 755| YES| 2| 67|
| 20080101| ABQ| UA| 782| 805| YES| 48| 115|
| 20080101| ABQ| AA| 1814| 925| YES| 100| 215|
| 20080101| ABQ| WN| 2976| 1040| YES| 1| 151|
| 20080101| ABQ| WN| 1185| 1140| YES| 9| 158|
| 20080101| ABQ| WN| 1297| 1155| YES| 12| 122|
| 20080101| ABQ| WN| 2480| 1200| YES| 16| 38|
| 20080101| ABQ| WN| 137| 1220| YES| 10| 47|
| 20080101| ABQ| WN| 1328| 1245| YES| 78| 116|
| 20080101| ABQ| WN| 3245| 1250| YES| 24| 128|
+----------+------+-------------+---------+----------+------------+--------+-----------+
only showing top 20 rows
7.5. Ranking Functions¶
We can use ranking functions to assign ranks to a particular record within a partition.
Sparse Rank - rank
Dense Rank - dense_rank
Assigning Row Numbers - row_number
Percentage Rank - percent_rank
7.5.1. Tasks¶
Let us perform few tasks related to ranking.
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
read. \
parquet(airlines_path)
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import rank, dense_rank
from pyspark.sql.functions import percent_rank, row_number, round
from pyspark.sql.window import Window
spec = Window. \
partitionBy("FlightDate", "Origin"). \
orderBy(col("DepDelay").desc())
airlines. \
filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
select(concat("Year",
lpad("Month", 2, "0"),
lpad("DayOfMonth", 2, "0")
).alias("FlightDate"),
"Origin",
"UniqueCarrier",
"FlightNum",
"CRSDepTime",
"IsDepDelayed",
col("DepDelay").cast("int").alias("DepDelay")
). \
withColumn("srank", rank().over(spec)). \
withColumn("drank", dense_rank().over(spec)). \
withColumn("prank", round(percent_rank().over(spec), 2)). \
withColumn("rn", row_number().over(spec)). \
orderBy("FlightDate", "Origin", col("DepDelay").desc()). \
show()
+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|srank|drank|prank| rn|
+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
| 20080101| ABE| YV| 7138| 1741| YES| 175| 1| 1| 0.0| 1|
| 20080101| ABE| YV| 7263| 1230| YES| 137| 2| 2| 0.14| 2|
| 20080101| ABE| 9E| 2940| 1215| YES| 70| 3| 3| 0.29| 3|
| 20080101| ABE| 9E| 2936| 1615| YES| 34| 4| 4| 0.43| 4|
| 20080101| ABE| XE| 2594| 1740| YES| 34| 4| 4| 0.43| 5|
| 20080101| ABE| XE| 2578| 1410| YES| 22| 6| 5| 0.71| 6|
| 20080101| ABE| OH| 5457| 1720| YES| 14| 7| 6| 0.86| 7|
| 20080101| ABE| OO| 5873| 720| YES| 1| 8| 7| 1.0| 8|
| 20080101| ABI| MQ| 3214| 1735| YES| 3| 1| 1| 0.0| 1|
| 20080101| ABQ| WN| 823| 2045| YES| 218| 1| 1| 0.0| 1|
| 20080101| ABQ| WN| 357| 1525| YES| 171| 2| 2| 0.02| 2|
| 20080101| ABQ| AA| 1814| 925| YES| 100| 3| 3| 0.04| 3|
| 20080101| ABQ| WN| 1178| 1845| YES| 94| 4| 4| 0.06| 4|
| 20080101| ABQ| WN| 2497| 1825| YES| 93| 5| 5| 0.09| 5|
| 20080101| ABQ| WN| 3481| 1500| YES| 78| 6| 6| 0.11| 6|
| 20080101| ABQ| WN| 1328| 1245| YES| 78| 6| 6| 0.11| 7|
| 20080101| ABQ| XE| 221| 1850| YES| 68| 8| 7| 0.15| 8|
| 20080101| ABQ| DL| 1601| 740| YES| 65| 9| 8| 0.17| 9|
| 20080101| ABQ| WN| 45| 1435| YES| 64| 10| 9| 0.19| 10|
| 20080101| ABQ| WN| 2788| 1755| YES| 53| 11| 10| 0.21| 11|
+----------+------+-------------+---------+----------+------------+--------+-----+-----+-----+---+
only showing top 20 rows
7.6. Analytic Functions¶
We can use Analytic Functions to compare current record with previous record or next record.
lead
andlag
are the main functions.We can also compare each of the day of one week with corresponding day of another week.
lead
andlag
serve the same purpose. Depending up on the requirement and sorting of the data we can use either of them.Here the examples are demonstrated using
lead
. Same can be achieved usinglag
however while defining the spec we have sort the data with in window in descending order to get similar results.Also we can use
first
andlast
functions to get first or last value with in each group or partition based up on sorting criteria. They are typically used to get the details about other fields (for example, we can get employee name or id who is making highest or lowest salary with in a department).
7.6.1. Using LEAD¶
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
read. \
parquet(airlines_path)
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import lead
from pyspark.sql.window import Window
spec = Window. \
partitionBy("FlightDate", "Origin"). \
orderBy(col("CRSDepTime"))
airlines. \
filter("IsDepDelayed = 'YES' and Cancelled = 0"). \
select(concat("Year",
lpad("Month", 2, "0"),
lpad("DayOfMonth", 2, "0")
).alias("FlightDate"),
"Origin",
"UniqueCarrier",
"FlightNum",
"CRSDepTime",
"IsDepDelayed",
col("DepDelay").cast("int").alias("DepDelay")
). \
withColumn("LeadUniqueCarrier", lead("UniqueCarrier").over(spec)). \
withColumn("LeadFlightNum", lead("FlightNum").over(spec)). \
withColumn("LeadCRSDepTime", lead("CRSDepTime").over(spec)). \
withColumn("LeadDepDelay", lead("DepDelay").over(spec)). \
orderBy("FlightDate", "Origin", "CRSDepTime"). \
show()
+----------+------+-------------+---------+----------+------------+--------+-----------------+-------------+--------------+------------+
|FlightDate|Origin|UniqueCarrier|FlightNum|CRSDepTime|IsDepDelayed|DepDelay|LeadUniqueCarrier|LeadFlightNum|LeadCRSDepTime|LeadDepDelay|
+----------+------+-------------+---------+----------+------------+--------+-----------------+-------------+--------------+------------+
| 20080101| ABE| OO| 5873| 720| YES| 1| 9E| 2940| 1215| 70|
| 20080101| ABE| 9E| 2940| 1215| YES| 70| YV| 7263| 1230| 137|
| 20080101| ABE| YV| 7263| 1230| YES| 137| XE| 2578| 1410| 22|
| 20080101| ABE| XE| 2578| 1410| YES| 22| 9E| 2936| 1615| 34|
| 20080101| ABE| 9E| 2936| 1615| YES| 34| OH| 5457| 1720| 14|
| 20080101| ABE| OH| 5457| 1720| YES| 14| XE| 2594| 1740| 34|
| 20080101| ABE| XE| 2594| 1740| YES| 34| YV| 7138| 1741| 175|
| 20080101| ABE| YV| 7138| 1741| YES| 175| null| null| null| null|
| 20080101| ABI| MQ| 3214| 1735| YES| 3| null| null| null| null|
| 20080101| ABQ| DL| 1601| 740| YES| 65| WN| 88| 755| 2|
| 20080101| ABQ| WN| 88| 755| YES| 2| UA| 782| 805| 48|
| 20080101| ABQ| UA| 782| 805| YES| 48| AA| 1814| 925| 100|
| 20080101| ABQ| AA| 1814| 925| YES| 100| WN| 2976| 1040| 1|
| 20080101| ABQ| WN| 2976| 1040| YES| 1| WN| 1185| 1140| 9|
| 20080101| ABQ| WN| 1185| 1140| YES| 9| WN| 1297| 1155| 12|
| 20080101| ABQ| WN| 1297| 1155| YES| 12| WN| 2480| 1200| 16|
| 20080101| ABQ| WN| 2480| 1200| YES| 16| WN| 137| 1220| 10|
| 20080101| ABQ| WN| 137| 1220| YES| 10| WN| 1328| 1245| 78|
| 20080101| ABQ| WN| 1328| 1245| YES| 78| WN| 3245| 1250| 24|
| 20080101| ABQ| WN| 3245| 1250| YES| 24| WN| 61| 1320| 1|
+----------+------+-------------+---------+----------+------------+--------+-----------------+-------------+--------------+------------+
only showing top 20 rows
7.6.2. Using LEAD with 7¶
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
read. \
parquet(airlines_path)
from pyspark.sql.functions import col, lit, lpad, concat
from pyspark.sql.functions import sum, lead, substring
from pyspark.sql.window import Window
spec = Window. \
partitionBy(substring("FlightDate", 1, 6), "Origin"). \
orderBy("FlightDate", col("TotalDepDelay").desc())
airlines. \
filter("""IsDepDelayed = 'YES'
AND Cancelled = 0
AND concat(Year,
lpad(Month, 2, '0'),
lpad(DayOfMonth, 2, '0')
) BETWEEN 20080101 AND 20080114
AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
"""
). \
groupBy(concat("Year",
lpad("Month", 2, "0"),
lpad("DayOfMonth", 2, "0")
).alias("FlightDate"),
"Origin"
). \
agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
filter('Origin = "ORD"'). \
orderBy("FlightDate", col("TotalDepDelay").desc()). \
show()
+----------+------+-------------+--------------+----------+-----------------+
|FlightDate|Origin|TotalDepDelay|LeadFlightDate|LeadOrigin|LeadTotalDepDelay|
+----------+------+-------------+--------------+----------+-----------------+
| 20080101| ORD| 49353| 20080108| ORD| 35658|
| 20080102| ORD| 41545| 20080109| ORD| 10075|
| 20080103| ORD| 15784| 20080110| ORD| 18431|
| 20080104| ORD| 13442| 20080111| ORD| 15372|
| 20080105| ORD| 23800| 20080112| ORD| 5785|
| 20080106| ORD| 31148| 20080113| ORD| 9630|
| 20080107| ORD| 47817| 20080114| ORD| 24969|
+----------+------+-------------+--------------+----------+-----------------+
airlines. \
filter("""IsDepDelayed = 'YES'
AND Cancelled = 0
AND concat(Year,
lpad(Month, 2, '0'),
lpad(DayOfMonth, 2, '0')
) BETWEEN 20080101 AND 20080114
AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
"""
). \
groupBy(concat("Year",
lpad("Month", 2, "0"),
lpad("DayOfMonth", 2, "0")
).alias("FlightDate"),
"Origin"
). \
agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
filter('Origin = "ORD" AND FlightDate BETWEEN 20080101 AND 20080107'). \
orderBy("FlightDate", col("TotalDepDelay").desc()). \
show()
+----------+------+-------------+--------------+----------+-----------------+
|FlightDate|Origin|TotalDepDelay|LeadFlightDate|LeadOrigin|LeadTotalDepDelay|
+----------+------+-------------+--------------+----------+-----------------+
| 20080101| ORD| 49353| 20080108| ORD| 35658|
| 20080102| ORD| 41545| 20080109| ORD| 10075|
| 20080103| ORD| 15784| 20080110| ORD| 18431|
| 20080104| ORD| 13442| 20080111| ORD| 15372|
| 20080105| ORD| 23800| 20080112| ORD| 5785|
| 20080106| ORD| 31148| 20080113| ORD| 9630|
| 20080107| ORD| 47817| 20080114| ORD| 24969|
+----------+------+-------------+--------------+----------+-----------------+
airlines. \
filter("""IsDepDelayed = 'YES'
AND Cancelled = 0
AND concat(Year,
lpad(Month, 2, '0'),
lpad(DayOfMonth, 2, '0')
) BETWEEN 20080101 AND 20080114
AND Origin IN ('ATL', 'DFW', 'JFK', 'LAX', 'SFO', 'ORD')
"""
). \
groupBy(concat("Year",
lpad("Month", 2, "0"),
lpad("DayOfMonth", 2, "0")
).alias("FlightDate"),
"Origin"
). \
agg(sum(col("DepDelay").cast("int")).alias("TotalDepDelay")). \
withColumn("LeadFlightDate", lead("FlightDate", 7).over(spec)). \
withColumn("LeadOrigin", lead("Origin", 7).over(spec)). \
withColumn("LeadTotalDepDelay", lead("TotalDepDelay", 7).over(spec)). \
filter('FlightDate BETWEEN 20080101 AND 20080107'). \
orderBy("FlightDate", col("TotalDepDelay").desc()). \
show()
+----------+------+-------------+--------------+----------+-----------------+
|FlightDate|Origin|TotalDepDelay|LeadFlightDate|LeadOrigin|LeadTotalDepDelay|
+----------+------+-------------+--------------+----------+-----------------+
| 20080101| ORD| 49353| 20080108| ORD| 35658|
| 20080101| DFW| 13741| 20080108| DFW| 8277|
| 20080101| ATL| 11592| 20080108| ATL| 9988|
| 20080101| LAX| 10360| 20080108| LAX| 8767|
| 20080101| JFK| 6948| 20080108| JFK| 2261|
| 20080101| SFO| 5359| 20080108| SFO| 18095|
| 20080102| ORD| 41545| 20080109| ORD| 10075|
| 20080102| ATL| 25127| 20080109| ATL| 6404|
| 20080102| DFW| 12827| 20080109| DFW| 4532|
| 20080102| JFK| 9734| 20080109| JFK| 1980|
| 20080102| LAX| 9253| 20080109| LAX| 4121|
| 20080102| SFO| 6150| 20080109| SFO| 8033|
| 20080103| ATL| 22603| 20080110| ATL| 9760|
| 20080103| SFO| 21957| 20080110| SFO| 17650|
| 20080103| ORD| 15784| 20080110| ORD| 18431|
| 20080103| LAX| 11837| 20080110| LAX| 6327|
| 20080103| DFW| 9585| 20080110| DFW| 6786|
| 20080103| JFK| 8031| 20080110| JFK| 1722|
| 20080104| SFO| 40137| 20080111| SFO| 8734|
| 20080104| LAX| 26441| 20080111| LAX| 5852|
+----------+------+-------------+--------------+----------+-----------------+
only showing top 20 rows