5. Basic Transformations¶
5.1. Overview of Basic Transformations¶
Let us define problem statements to learn more about Data Frame APIs. We will try to cover filtering, aggregations and sorting as part of solutions for these problem statements.
Get total number of flights as well as number of flights which are delayed in departure and number of flights delayed in arrival.
Output should contain 3 columns - FlightCount, DepDelayedCount, ArrDelayedCount
Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day.
Output should contain 4 columns - FlightDate, FlightCount, DepDelayedCount, ArrDelayedCount
FlightDate should be of YYYY-MM-dd format.
Data should be sorted in ascending order by flightDate
Get all the flights which are departed late but arrived early (IsArrDelayed is NO).
Output should contain - FlightCRSDepTime, UniqueCarrier, FlightNum, Origin, Dest, DepDelay, ArrDelay
FlightCRSDepTime need to be computed using Year, Month, DayOfMonth, CRSDepTime
FlightCRSDepTime should be displayed using YYYY-MM-dd HH:mm format.
Output should be sorted by FlightCRSDepTime and then by the difference between DepDelay and ArrDelay
Also get the count of such flights
5.2. Starting Spark Context¶
Let us start spark context for this Notebook so that we can execute the code provided.
from pyspark.sql import SparkSession
spark = SparkSession. \
builder. \
config('spark.ui.port', '0'). \
appName('Basic Transformations'). \
master('yarn'). \
getOrCreate()
5.3. Overview of Filtering¶
Let us understand few important details related to filtering before we get into the solution
Filtering can be done either by using filter or where. These are like synonyms to each other.
When it comes to the condition, we can either pass it in SQL Style or Data Frame Style.
Example for SQL Style - airlines.filter(“IsArrDelayed = ‘YES’”).show() or airlines.where(“IsArrDelayed = ‘YES’”).show()
Example for Data Frame Style - airlines.filter(airlines[“IsArrDelayed”] == ‘YES’).show() or airlines.filter(airlines.IsArrDelayed == ‘YES’).show(). We can also use where instead of filter.
Here are the other operations we can perform to filter the data - !=, >, <, >=, <=, LIKE, BETWEEN with AND
If we have to validate against multiple columns then we need to use boolean operations such as AND and OR.
If we have to compare each column value with multiple values then we can use the IN operator.
5.3.1. Tasks¶
Let us perform some tasks to understand filtering in detail. Solve all the problems by passing conditions using both SQL Style as well as API Style.
Read the data for the month of 2008 January.
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
read. \
parquet(airlines_path)
airlines.printSchema()
Get count of flights which are departed late at origin and reach destination early or on time.
airlines. \
filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO'"). \
count()
API Style
from pyspark.sql.functions import col
airlines. \
filter((col("IsDepDelayed") == "YES") &
(col("IsArrDelayed") == "NO")
). \
count()
airlines. \
filter((airlines["IsDepDelayed"] == "YES") &
(airlines.IsArrDelayed == "NO")
). \
count()
Get count of flights which are departed late from origin by more than 60 minutes.
airlines. \
filter("DepDelay > 60"). \
count()
API Style
from pyspark.sql.functions import col
airlines. \
filter(col("DepDelay") > 60). \
count()
Get count of flights which are departed early or on time but arrive late by at least 15 minutes.
airlines. \
filter("IsDepDelayed = 'NO' AND ArrDelay >= 15"). \
count()
API Style
from pyspark.sql.functions import col
airlines. \
filter((col("IsDepDelayed") == "NO") & (col("ArrDelay") >= 15)). \
count()
Get count of flights departed from following major airports - ORD, DFW, ATL, LAX, SFO.
airlines. \
filter("Origin IN ('ORD', 'DFW', 'ATL', 'LAX', 'SFO')"). \
count()
airlines.count()
API Style
from pyspark.sql.functions import col
c = col('x')
help(c.isin)
from pyspark.sql.functions import col
airlines. \
filter(col("Origin").isin("ORD", "DFW", "ATL", "LAX", "SFO")). \
count()
Add a column FlightDate by using Year, Month and DayOfMonth. Format should be yyyyMMdd.
from pyspark.sql.functions import col, concat, lpad
airlines. \
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
). \
show()
Get count of flights departed late between 2008 January 1st to January 9th using FlightDate.
from pyspark.sql.functions import col, concat, lpad
airlines. \
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
). \
filter("IsDepDelayed = 'YES' AND FlightDate LIKE '2008010%'"). \
count()
from pyspark.sql.functions import col, concat, lpad
airlines. \
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
). \
filter("""
IsDepDelayed = 'YES' AND
FlightDate BETWEEN 20080101 AND 20080109
"""). \
count()
API Style
from pyspark.sql.functions import col
c = col('x')
help(c.like)
from pyspark.sql.functions import col, concat, lpad
airlines. \
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
). \
filter((col("IsDepDelayed") == "YES") &
(col("FlightDate").like("2008010%"))
). \
count()
from pyspark.sql.functions import col
c = col('x')
help(c.between)
from pyspark.sql.functions import col, concat, lpad
airlines. \
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
). \
filter((col("IsDepDelayed") == "YES") &
(col("FlightDate").between("20080101", "20080109"))
). \
count()
Get number of flights departed late on Sundays.
l = [('X',)]
df = spark.createDataFrame(l, "dummy STRING")
from pyspark.sql.functions import current_date
df.select(current_date()).show()
from pyspark.sql.functions import date_format
df.select(current_date(), date_format(current_date(), 'EE')).show()
from pyspark.sql.functions import col, concat, lpad
airlines. \
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
). \
filter("""
IsDepDelayed = 'YES' AND
date_format(to_date(FlightDate, 'yyyyMMdd'), 'EEEE') = 'Sunday'
"""). \
count()
API Style
from pyspark.sql.functions import col, concat, lpad, date_format, to_date
airlines. \
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
). \
filter((col("IsDepDelayed") == "YES") &
(date_format(
to_date("FlightDate", "yyyyMMdd"), "EEEE"
) == "Sunday")
). \
count()
5.4. Overview of Aggregations¶
Let us go through the details related to aggregation using Spark.
We can perform total aggregations directly on Dataframe or we can perform aggregations after grouping by a key(s).
Here are the APIs which we typically use to group the data using a key.
groupBy
rollup
cube
Here are the functions which we typically use to perform aggregations.
count
sum, avg
min, max
If we want to provide aliases to the aggregated fields then we have to use agg after groupBy.
5.5. Overview of Sorting¶
Let us understand how to sort the data in a Data Frame.
We can use orderBy or sort to sort the data.
We can perform composite sorting by passing multiple columns or expressions.
By default data is sorted in ascending order, we can change it to descending by applying desc() function on the column or expression.
5.6. Solutions - Problem 1¶
Get total number of flights as well as number of flights which are delayed in departure and number of flights delayed in arrival.
Output should contain 3 columns - FlightCount, DepDelayedCount, ArrDelayedCount
5.6.1. Reading airlines data¶
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
read. \
parquet(airlines_path)
airlines.printSchema()
5.6.2. Get flights with delayed arrival¶
# SQL Style
airlines.filter("IsArrDelayed = 'YES'").show()
# Data Frame Style
airlines.filter(airlines["IsArrDelayed"] == 'YES').show()
airlines.filter(airlines.IsArrDelayed == 'YES').show()
5.6.3. Get delayed counts¶
## Departure Delayed Count
airlines. \
filter(airlines.IsDepDelayed == "YES"). \
count()
## Arrival Delayed Count
airlines. \
filter(airlines.IsArrDelayed == "YES"). \
count()
airlines. \
filter("IsDepDelayed = 'YES' OR IsArrDelayed = 'YES'"). \
select('Year', 'Month', 'DayOfMonth',
'FlightNum', 'IsDepDelayed', 'IsArrDelayed'
). \
show()
## Both Departure Delayed and Arrival Delayed
from pyspark.sql.functions import col, lit, count, sum, expr
airlines. \
agg(count(lit(1)).alias("FlightCount"),
sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
). \
show()
5.7. Solutions - Problem 2¶
Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day.
Output should contain 4 columns - FlightDate, FlightCount, DepDelayedCount, ArrDelayedCount
FlightDate should be of YYYY-MM-dd format.
Data should be sorted in ascending order by flightDate
5.7.1. Grouping Data by Flight Date¶
from pyspark.sql.functions import lit, concat, lpad
airlines. \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate"))
5.7.2. Getting Counts by FlightDate¶
from pyspark.sql.functions import lit, concat, lpad, count
airlines. \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate")). \
agg(count(lit(1)).alias("FlightCount")). \
show(31)
# Alternative to get the count with out using agg
# We will not be able to provide alias for aggregated fields
from pyspark.sql.functions import lit, concat, lpad
airlines. \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate")). \
count(). \
show()
5.7.3. Getting total as well as delayed counts for each day¶
from pyspark.sql.functions import lit, concat, lpad, count, sum, expr
airlines. \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate")). \
agg(count(lit(1)).alias("FlightCount"),
sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
). \
show()
5.7.4. Sorting Data By FlightDate¶
help(airlines.sort)
help(airlines.orderBy)
from pyspark.sql.functions import lit, concat, lpad, sum, expr
airlines. \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate")). \
agg(count(lit(1)).alias("FlightCount"),
sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
). \
orderBy("FlightDate"). \
show(31)
5.7.5. Sorting Data in descending order by count¶
from pyspark.sql.functions import lit, concat, lpad, sum, expr, col
airlines. \
groupBy(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0")).
alias("FlightDate")). \
agg(count(lit(1)).alias("FlightCount"),
sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
). \
orderBy(col("FlightCount").desc()). \
show()
5.8. Solutions - Problem 3¶
Get all the flights which are departed late but arrived early (IsArrDelayed is NO).
Output should contain - FlightCRSDepTime, UniqueCarrier, FlightNum, Origin, Dest, DepDelay, ArrDelay
FlightCRSDepTime need to be computed using Year, Month, DayOfMonth, CRSDepTime
FlightCRSDepTime should be displayed using YYYY-MM-dd HH:mm format.
Output should be sorted by FlightCRSDepTime and then by the difference between DepDelay and ArrDelay
Also get the count of such flights
airlines.select('Year', 'Month', 'DayOfMonth', 'CRSDepTime').show()
l = [(2008, 1, 23, 700),
(2008, 1, 10, 1855),
]
df = spark.createDataFrame(l, "Year INT, Month INT, DayOfMonth INT, DepTime INT")
df.show()
from pyspark.sql.functions import substring
df.select(substring(col('DepTime'), -2, 2)). \
show()
df.select("DepTime", date_format(lpad('DepTime', 4, "0"), 'HH:mm')).show()
help(substring)
df.select(substring(col('DepTime'), 1, length(col('DepTime').cast('string')))). \
show()
from pyspark.sql.functions import lit, col, concat, lpad, sum, expr
flightsFiltered = airlines. \
filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO'"). \
select(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0"), lit(" "),
lpad("CRSDepTime", 4, "0")
).alias("FlightCRSDepTime"),
"UniqueCarrier", "FlightNum", "Origin",
"Dest", "DepDelay", "ArrDelay"
). \
orderBy("FlightCRSDepTime", col("DepDelay") - col("ArrDelay")). \
show()
5.8.1. Getting Count¶
from pyspark.sql.functions import lit, col, concat, lpad, sum, expr
flightsFiltered = airlines. \
filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO'"). \
select(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0"), lit(" "),
lpad("CRSDepTime", 4, "0")
).alias("FlightCRSDepTime"),
"UniqueCarrier", "FlightNum", "Origin",
"Dest", "DepDelay", "ArrDelay"
). \
count()
flightsFiltered