5. Basic Transformations¶
5.1. Overview of Basic Transformations¶
Let us define problem statements to learn more about Data Frame APIs. We will try to cover filtering, aggregations and sorting as part of solutions for these problem statements.
Get total number of flights as well as number of flights which are delayed in departure and number of flights delayed in arrival.
Output should contain 3 columns - FlightCount, DepDelayedCount, ArrDelayedCount
Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day.
Output should contain 4 columns - FlightDate, FlightCount, DepDelayedCount, ArrDelayedCount
FlightDate should be of YYYY-MM-dd format.
Data should be sorted in ascending order by flightDate
Get all the flights which are departed late but arrived early (IsArrDelayed is NO).
Output should contain - FlightCRSDepTime, UniqueCarrier, FlightNum, Origin, Dest, DepDelay, ArrDelay
FlightCRSDepTime need to be computed using Year, Month, DayOfMonth, CRSDepTime
FlightCRSDepTime should be displayed using YYYY-MM-dd HH:mm format.
Output should be sorted by FlightCRSDepTime and then by the difference between DepDelay and ArrDelay
Also get the count of such flights
5.2. Starting Spark Context¶
Let us start spark context for this Notebook so that we can execute the code provided.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
appName("Basic Transformations").
master("yarn").
getOrCreate
5.3. Overview of Filtering¶
Let us understand few important details related to filtering before we get into the solution
Filtering can be done either by using
filter
orwhere
. These are like synonyms to each other.When it comes to the condition, we can either pass it in SQL Style or Data Frame Style.
Example for SQL Style -
airlines.filter("IsArrDelayed = "YES"").show() or airlines.where("IsArrDelayed = "YES"").show()
Example for Data Frame Style -
airlines.filter(airlines["IsArrDelayed"] == "YES").show()
orairlines.filter(airlines.IsArrDelayed == "YES").show()
. We can also use where instead of filter.Here are the other operations we can perform to filter the data -
!=
,>
,<
,>=
,<=
,LIKE
,BETWEEN
withAND
If we have to validate against multiple columns then we need to use boolean operations such as
AND
andOR
.If we have to compare each column value with multiple values then we can use the
IN
operator.
5.3.1. Tasks¶
Let us perform some tasks to understand filtering in detail. Solve all the problems by passing conditions using both SQL Style as well as API Style.
Read the data for the month of 2008 January.
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlines_path)
airlines.printSchema
Get count of flights which are departed late at origin and reach destination early or on time.
airlines.
filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO'").
count
API Style
import org.apache.spark.sql.functions.col
airlines.
filter(col("IsDepDelayed") === "YES" and
col("IsArrDelayed") === "NO"
).
count
airlines.
filter(airlines("IsDepDelayed") === "YES" and
airlines("IsArrDelayed") === "NO"
).
count
Get count of flights which are departed late from origin by more than 60 minutes.
airlines.
filter("DepDelay > 60").
count
API Style
import org.apache.spark.sql.functions.col
airlines.
filter(col("DepDelay") > 60).
count
Get count of flights which are departed early or on time but arrive late by at least 15 minutes.
airlines.
filter("IsDepDelayed = 'NO' AND ArrDelay >= 15").
count
API Style
import org.apache.spark.sql.functions. col
airlines.
filter(col("IsDepDelayed") === "NO" and col("ArrDelay") >= 15).
count()
Get count of flights departed from following major airports - ORD, DFW, ATL, LAX, SFO.
airlines.count
airlines.
filter("Origin IN ('ORD', 'DFW', 'ATL', 'LAX', 'SFO')").
count
API Style
import org.apache.spark.sql.functions.col
airlines.
filter(col("Origin").isin("ORD", "DFW", "ATL", "LAX", "SFO")).
count
Add a column FlightDate by using Year, Month and DayOfMonth. Format should be yyyyMMdd.
import org.apache.spark.sql.functions.{col, concat, lpad}
airlines.
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
).
show
Get count of flights departed late between 2008 January 1st to January 9th using FlightDate.
import org.apache.spark.sql.functions.{col, concat, lpad}
airlines.
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
).
filter("IsDepDelayed = 'YES' AND FlightDate LIKE '2008010%'").
count
import org.apache.spark.sql.functions.{col, concat, lpad}
airlines.
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
).
filter("""
IsDepDelayed = "YES" AND
FlightDate BETWEEN 20080101 AND 20080109
""").
count
API Style
import org.apache.spark.sql.functions.{col, concat, lpad}
airlines.
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
).
filter(col("IsDepDelayed") === "YES" and
(col("FlightDate") like ("2008010%"))
).
count
import org.apache.spark.sql.functions.{col, concat, lpad}
airlines.
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
).
filter(col("IsDepDelayed") === "YES" and
(col("FlightDate") between ("20080101", "20080109"))
).
count
Get number of flights departed late on Sundays.
val l = List("X")
val df = l.toDF("dummy")
import org.apache.spark.sql.functions.current_date
df.select(current_date).show
import org.apache.spark.sql.functions.date_format
df.select(current_date, date_format(current_date, "EE")).show
import org.apache.spark.sql.functions.{col, concat, lpad}
airlines.
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
).
filter("""
IsDepDelayed = "YES" AND
date_format(to_date(FlightDate, "yyyyMMdd"), "EEEE") = "Sunday"
""").
count
API Style
import spark.implicits._
import org.apache.spark.sql.functions.{col, concat, lpad, date_format, to_date}
airlines.
withColumn("FlightDate",
concat(col("Year"),
lpad(col("Month"), 2, "0"),
lpad(col("DayOfMonth"), 2, "0")
)
).
filter(col("IsDepDelayed") === "YES" and
date_format(
to_date($"FlightDate", "yyyyMMdd"), "EEEE"
) === "Sunday"
).
count
5.4. Overview of Aggregations¶
Let us go through the details related to aggregation using Spark.
We can perform total aggregations directly on Dataframe or we can perform aggregations after grouping by a key(s).
Here are the APIs which we typically use to group the data using a key.
groupBy
rollup
cube
Here are the functions which we typically use to perform aggregations.
count
sum, avg
min, max
If we want to provide aliases to the aggregated fields then we have to use
agg
aftergroupBy
.Let us get the count of flights for each day for the month of 200801.
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines_path = /public/airlines_all/airlines-part/flightmonth=200801
/public/airlines_all/airlines-part/flightmonth=200801
val airlines = spark.
read.
parquet(airlines_path)
Waiting for a Spark session to start...
airlines = [Year: int, Month: int ... 29 more fields]
[Year: int, Month: int ... 29 more fields]
import org.apache.spark.sql.functions.{concat, lpad, count, lit}
import spark.implicits._
Name: Compile Error
Message: <console>:29: error: stable identifier required, but this.$line7$read.spark.implicits found.
import spark.implicits._
^
StackTrace:
airlines.
groupBy(concat($"year",
lpad($"Month", 2, "0"),
lpad($"DayOfMonth", 2, "0")
).alias("FlightDate")
).
agg(count(lit(1)).alias("FlightCount")).
show
+----------+-----------+
|FlightDate|FlightCount|
+----------+-----------+
| 20080120| 18653|
| 20080130| 19766|
| 20080115| 19503|
| 20080118| 20347|
| 20080122| 19504|
| 20080104| 20929|
| 20080125| 20313|
| 20080102| 20953|
| 20080105| 18066|
| 20080111| 20349|
| 20080109| 19820|
| 20080127| 18903|
| 20080101| 19175|
| 20080128| 20147|
| 20080119| 16249|
| 20080106| 19893|
| 20080123| 19769|
| 20080117| 20273|
| 20080116| 19764|
| 20080112| 16572|
+----------+-----------+
only showing top 20 rows
5.5. Overview of Sorting¶
Let us understand how to sort the data in a Data Frame.
We can use
orderBy
orsort
to sort the data.We can perform composite sorting by passing multiple columns or expressions.
By default data is sorted in ascending order, we can change it to descending by applying
desc()
function on the column or expression.
5.6. Solutions - Problem 1¶
Get total number of flights as well as number of flights which are delayed in departure and number of flights delayed in arrival.
Output should contain 3 columns - FlightCount, DepDelayedCount, ArrDelayedCount
5.6.1. Reading airlines data¶
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlines_path)
airlines.printSchema
5.6.2. Get flights with delayed arrival¶
// SQL Style
airlines.filter("IsArrDelayed = 'YES'").show
// Data Frame Style
airlines.filter(airlines("IsArrDelayed") === "YES").show
import spark.implicits._
airlines.filter($"IsArrDelayed" === "YES").show
5.6.3. Get delayed counts¶
// Departure Delayed Count
airlines.
filter(airlines("IsDepDelayed") === "YES").
count
// Arrival Delayed Count
airlines.
filter(airlines("IsArrDelayed") === "YES").
count()
airlines.
filter("IsDepDelayed = 'YES' OR IsArrDelayed = 'YES'").
select("Year", "Month", "DayOfMonth",
"FlightNum", "IsDepDelayed", "IsArrDelayed"
).
show
// Both Departure Delayed and Arrival Delayed
import org.apache.spark.sql.functions.{col, lit, count, sum, expr}
airlines.
agg(count(lit(1)).alias("FlightCount"),
sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
).
show
5.7. Solutions - Problem 2¶
Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day.
Output should contain 4 columns - FlightDate, FlightCount, DepDelayedCount, ArrDelayedCount
FlightDate should be of YYYY-MM-dd format.
Data should be sorted in ascending order by flightDate
5.7.1. Grouping Data by Flight Date¶
import org.apache.spark.sql.functions.{lit, concat, lpad}
import spark.implicits._
Example of
groupBy
. It should follow withagg
. If you run the below code, it will throw exception.
airlines.
groupBy(concat($"Year", lit("-"),
lpad($"Month", 2, "0"), lit("-"),
lpad($"DayOfMonth", 2, "0")).
alias("FlightDate"))
5.7.2. Getting Counts by FlightDate¶
import org.apache.spark.sql.functions.{lit, concat, lpad, count}
airlines.
groupBy(concat($"Year", lit("-"),
lpad($"Month", 2, "0"), lit("-"),
lpad($"DayOfMonth", 2, "0")).
alias("FlightDate")).
agg(count(lit(1)).alias("FlightCount")).
show(31)
// Alternative to get the count with out using agg
// We will not be able to provide alias for aggregated fields
import org.apache.spark.sql.functions.{lit, concat, lpad}
airlines.
groupBy(concat($"Year", lit("-"),
lpad($"Month", 2, "0"), lit("-"),
lpad($"DayOfMonth", 2, "0")).
alias("FlightDate")).
count.
show(31)
5.7.3. Getting total as well as delayed counts for each day¶
import org.apache.spark.sql.functions.{lit, concat, lpad, count, sum, expr}
airlines.
groupBy(concat($"Year", lit("-"),
lpad($"Month", 2, "0"), lit("-"),
lpad($"DayOfMonth", 2, "0")).
alias("FlightDate")).
agg(count(lit(1)).alias("FlightCount"),
sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
).
show
5.7.4. Sorting Data By FlightDate¶
import org.apache.spark.sql.functions.{lit, concat, lpad, sum, expr}
airlines.
groupBy(concat($"Year", lit("-"),
lpad($"Month", 2, "0"), lit("-"),
lpad($"DayOfMonth", 2, "0")).
alias("FlightDate")).
agg(count(lit(1)).alias("FlightCount"),
sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
).
orderBy("FlightDate").
show(31)
5.7.5. Sorting Data in descending order by count¶
import org.apache.spark.sql.functions.{lit, concat, lpad, sum, expr, col}
airlines.
groupBy(concat($"Year", lit("-"),
lpad($"Month", 2, "0"), lit("-"),
lpad($"DayOfMonth", 2, "0")).
alias("FlightDate")).
agg(count(lit(1)).alias("FlightCount"),
sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
).
orderBy(col("FlightCount").desc).
show(31)
5.8. Solutions - Problem 3¶
Get all the flights which are departed late but arrived early (IsArrDelayed is NO).
Output should contain - FlightCRSDepTime, UniqueCarrier, FlightNum, Origin, Dest, DepDelay, ArrDelay
FlightCRSDepTime need to be computed using Year, Month, DayOfMonth, CRSDepTime
FlightCRSDepTime should be displayed using YYYY-MM-dd HH:mm format.
Output should be sorted by FlightCRSDepTime and then by the difference between DepDelay and ArrDelay
Also get the count of such flights
airlines.select("Year", "Month", "DayOfMonth", "CRSDepTime").show()
l = [(2008, 1, 23, 700),
(2008, 1, 10, 1855),
]
df = spark.createDataFrame(l, "Year INT, Month INT, DayOfMonth INT, DepTime INT")
df.show()
import org.apache.spark.sql.functions. substring
df.select(substring(col("DepTime"), -2, 2)).
show()
df.select("DepTime", date_format(lpad("DepTime", 4, "0"), "HH:mm")).show()
help(substring)
df.select(substring(col("DepTime"), 1, length(col("DepTime").cast("string")))).
show()
import org.apache.spark.sql.functions. lit, col, concat, lpad, sum, expr
flightsFiltered = airlines.
filter("IsDepDelayed = "YES" AND IsArrDelayed = "NO"").
select(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0"), lit(" "),
lpad("CRSDepTime", 4, "0")
).alias("FlightCRSDepTime"),
"UniqueCarrier", "FlightNum", "Origin",
"Dest", "DepDelay", "ArrDelay"
).
orderBy("FlightCRSDepTime", col("DepDelay") - col("ArrDelay")).
show()
5.8.1. Getting Count¶
import org.apache.spark.sql.functions. lit, col, concat, lpad, sum, expr
flightsFiltered = airlines.
filter("IsDepDelayed = "YES" AND IsArrDelayed = "NO"").
select(concat("Year", lit("-"),
lpad("Month", 2, "0"), lit("-"),
lpad("DayOfMonth", 2, "0"), lit(" "),
lpad("CRSDepTime", 4, "0")
).alias("FlightCRSDepTime"),
"UniqueCarrier", "FlightNum", "Origin",
"Dest", "DepDelay", "ArrDelay"
).
count()
flightsFiltered