6. Joining Data Sets¶
Let us understand how to join multiple Data Sets using Spark based APIs.
Prepare Datasets for Joins
Starting Spark Context
Analyze Datasets for Joins
Problem Statements
Overview of Joins
Solutions - Problem 1
Solutions - Problem 2
Solutions - Problem 3
Solutions - Problem 4
Solutions - Problem 5
Solutions - Problem 6
6.1. Prepare Datasets for Joins¶
Let us prepare datasets to join.
Make sure airport-codes is in HDFS.
We will also use airlines data for the month of January 2008. We have used that data set in the past as well.
import sys.process._
"hdfs dfs -ls /public/airlines_all"!
import sys.process._
"hdfs dfs -ls /public/airlines_all/airport-codes"!
import sys.process._
"hdfs dfs -ls /public/airlines_all/airlines-part/flightmonth=200801"!
6.2. Starting Spark Context¶
Let us start spark context for this Notebook so that we can execute the code provided.
If you want to use terminal for the practice, here is the command to use.
spark2-shell \
--master yarn \
--name "Joining Data Sets" \
--conf spark.ui.port=0
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
appName("Joining Data Sets").
master("yarn").
getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "2")
import spark.implicits._
6.3. Analyze Datasets for Joins¶
Let us analyze data sets that are going to be used for joins.
We will use January 2008 airlines data which have all relevant flight details.
Let us read and review the airlines data quickly
val airlines = spark.
read.
parquet("/public/airlines_all/airlines-part/flightmonth=200801")
airlines.printSchema
airlines.show
We will be using another data set to get details about airports. Details include information such as State, City etc for a given airport code.
Let us analyze the Dataset to confirm if there is header and also how the data is structured.
val airportCodesPath = "/public/airlines_all/airport-codes"
spark.
read.
text(airportCodesPath).
show(false)
Data is tab separated.
There is header for the data set.
Dataset have 4 fields - Country, State, City, IATA
Create DataFrame airport_codes applying appropriate Schema.
val airportCodesPath = "/public/airlines_all/airport-codes"
val airportCodes = spark.
read.
option("sep", "\t").
option("header", true).
option("inferSchema", true).
csv(airportCodesPath)
Preview and Understand the data.
airportCodes.show
Get schema of airport_codes.
airportCodes.printSchema
Get the count of records
airportCodes.count
Get the count of unique records and see if it is the same as total count.
airportCodes.
select("IATA").
distinct.
count
If they are not equal, analyze the data and identify IATA codes which are repeated more than once.
import org.apache.spark.sql.functions.{lit, count}
val duplicateIATACount = airportCodes.
groupBy("IATA").
agg(count(lit(1)).alias("iata_count")).
filter("iata_count > 1")
duplicateIATACount.show
Filter out the duplicates using the most appropriate one and discard others.
airportCodes.
filter("IATA = 'Big'").
show
airportCodes.
filter("!(State = 'Hawaii' AND IATA = 'Big')").
show
airportCodes.
filter("!(State = 'Hawaii' AND IATA = 'Big')").
count
Get number of airports (IATA Codes) for each state in the US. Sort the data in descending order by count.
val airportCodesPath = "/public/airlines_all/airport-codes"
val airportCodes = spark.
read.
option("sep", "\t").
option("header", true).
option("inferSchema", true).
csv(airportCodesPath).
filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country = 'USA'")
airportCodes.count
import org.apache.spark.sql.functions.{count, col, lit}
val airportCountByState = airportCodes.
groupBy("Country", "State").
agg(count(lit(1)).alias("IATACount")).
orderBy(col("IATACount").desc)
airportCountByState.show(51)
6.4. Problem Statements¶
Let us understand how to join Data Frames by using some problem statements. We will use 2008 January airlines data along with Airport Codes.
Get number of flights departed from each of the US airport.
Get number of flights departed from each of the state.
Get the list of airports in the US from which flights are not departed.
Check if there are any origins in airlines data which do not have record in airport-codes.
Get the total number of flights from the airports that do not contain entries in airport-codes.
Get the total number of flights per airport that do not contain entries in airport-codes.
6.5. Overview of Joins¶
Let us get an overview of joining Data Frames.
Our data cannot be stored in one table. It will be stored in multiple tables and the tables might be related.
When it comes to transactional systems, we typically define tables based on Normalization Principles.
When it comes to data warehousing applications, we typically define tables using Dimensional Modeling.
Either of the approach data is scattered into multiple tables and relationships are defined.
Typically tables are related with one to one, one to many, many to many relationships.
When we have 2 Data Sets that are related based on a common key we typically perform join.
There are different types of joins.
INNER JOIN
OUTER JOIN (LEFT or RIGHT)
FULL OUTER JOIN (a LEFT OUTER JOIN b UNION a RIGHT OUTER JOIN b)
6.6. Solutions - Problem 1¶
Get number of flights departed from each of the US airport in the month of 2008 January.
We have to use airport codes to determine US airport.
We need to use airlines data to get departure details.
To solve this problem we have to perform inner join.
val airlinesPath = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlinesPath)
airlines.select("Year", "Month", "DayOfMonth", "Origin", "Dest", "CRSDepTime").show
airlines.count
val airportCodesPath = "/public/airlines_all/airport-codes"
def getValidAirportCodes(airportCodesPath: String) = {
val airportCodes = spark.
read.
option("sep", "\t").
option("header", true).
option("inferSchema", true).
csv(airportCodesPath).
filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country = 'USA'")
airportCodes
}
val airportCodes = getValidAirportCodes(airportCodesPath)
airportCodes.count
import org.apache.spark.sql.functions.{col, lit, count}
airlines.
join(airportCodes, airportCodes("IATA") === airlines("Origin")).
select(col("Year"), col("Month"), col("DayOfMonth"), airportCodes("*"), col("CRSDepTime")).
show
airlines.
join(airportCodes, airportCodes("IATA") === airlines("Origin")).
groupBy("Origin").
agg(count(lit(1)).alias("FlightCount")).
orderBy(col("FlightCount").desc).
show
6.7. Solutions - Problem 2¶
Get number of flights departed from each of the US state in the month of 2008 January.
We have to use airport codes to determine state of each of the US airport.
We need to use airlines data to get departure details.
To solve this problem we have to perform inner join.
val airlinesPath = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlinesPath)
airlines.select("Year", "Month", "DayOfMonth", "Origin", "Dest", "CRSDepTime").show
airlines.count
val airportCodesPath = "/public/airlines_all/airport-codes"
def getValidAirportCodes(airportCodesPath: String) = {
val airportCodes = spark.
read.
option("sep", "\t").
option("header", true).
option("inferSchema", true).
csv(airportCodesPath).
filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country = 'USA'")
airportCodes
}
val airportCodes = getValidAirportCodes(airportCodesPath)
airportCodes.count
import org.apache.spark.sql.functions.{col, lit, count}
airlines.
join(airportCodes, col("IATA") === col("Origin"), "inner").
groupBy("State").
agg(count(lit(1)).alias("FlightCount")).
orderBy(col("FlightCount").desc).
show
airportCodes.filter("State IS NULL").show
airportCodes.filter(col("State") isNull).show
6.8. Solutions - Problem 3¶
Get the list of airports in the US from which flights are not departed in the month of 2008 January.
This is an example for outer join.
We need to get those airports which are in airport codes but not in 2008 January airlines data set.
Based on the side of the airport codes data set, we can say left or right. We will be using airport codes as the driving data set and hence we will use left outer join.
val airlinesPath = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlinesPath)
airlines.select("Year", "Month", "DayOfMonth", "Origin", "Dest", "CRSDepTime").show
airlines.count
val airportCodesPath = "/public/airlines_all/airport-codes"
def getValidAirportCodes(airportCodesPath: String) = {
val airportCodes = spark.
read.
option("sep", "\t").
option("header", true).
option("inferSchema", true).
csv(airportCodesPath).
filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country = 'USA'")
airportCodes
}
val airportCodes = getValidAirportCodes(airportCodesPath)
airportCodes.count
import org.apache.spark.sql.functions.col
airportCodes.
join(airlines, col("IATA") === col("Origin"), "left").
filter("Origin IS NULL").
select(airportCodes("*"), col("Origin")).
show
6.9. Solutions - Problem 4¶
Check if there are any origins in airlines data which do not have correpsonding records in airport-codes.
This is an example for outer join.
We need to get those airports which are in Origin field in January 2008 airlines data set but not in airport-codes.
Based on the side of the airlines data set, we can say left or right. We will be using airlines as the driving data set and hence we will use left outer join.
We will also apply distinct on Origin before performing left outer join.
val airlinesPath = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlinesPath)
airlines.select("Year", "Month", "DayOfMonth", "Origin", "Dest", "CRSDepTime").show
airlines.count
val airportCodesPath = "/public/airlines_all/airport-codes"
def getValidAirportCodes(airportCodesPath: String) = {
val airportCodes = spark.
read.
option("sep", "\t").
option("header", true).
option("inferSchema", true).
csv(airportCodesPath).
filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country = 'USA'")
airportCodes
}
val airportCodes = getValidAirportCodes(airportCodesPath)
airportCodes.show
airportCodes.count
airlines.
select("Origin").
distinct.
show
airlines.
select("Origin").
distinct.
join(airportCodes, airlines("Origin") === airportCodes("IATA"), "left").
show
airlines.
select("Origin").
distinct.
join(airportCodes, airlines("Origin") === airportCodes("IATA"), "left").
filter("IATA IS NULL").
show
6.10. Solutions - Problem 5¶
Get the total number of flights departed from the airports in January 2008 that do not contain entries in airport-codes.
This is an example for outer join.
We need to get number of flights from the 2008 January airlines data which do not have entries in airport-codes.
Based on the side of the airlines data set, we can say left or right. We will be using airlines as the driving data set and hence we will use left outer join.
We will be peforming join first and then we will aggregate to get number of flights from the concerned airports.
In this case will get total number of flights.
val airlinesPath = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlinesPath)
airlines.select("Year", "Month", "DayOfMonth", "Origin", "Dest", "CRSDepTime").show
airlines.count
val airportCodesPath = "/public/airlines_all/airport-codes"
def getValidAirportCodes(airportCodesPath: String) = {
val airportCodes = spark.
read.
option("sep", "\t").
option("header", true).
option("inferSchema", true).
csv(airportCodesPath).
filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country = 'USA'")
airportCodes
}
val airportCodes = getValidAirportCodes(airportCodesPath)
airportCodes.show
airportCodes.count
airlines.
join(airportCodes, airlines("Origin") === airportCodes("IATA"), "left").
filter("IATA IS NULL").
select(airlines("Year"), airlines("Month"), airlines("DayOfMonth"),
airlines("Origin"), airlines("Dest"), airlines("CRSDepTime"),
airportCodes("*")
).
show
airlines.
join(airportCodes, airlines("Origin") === airportCodes("IATA"), "left").
filter("IATA IS NULL").
count
6.11. Solutions - Problem 6¶
Get the total number of flights per airport that do not contain entries in airport-codes.
This is an example for outer join.
We need to get number of flights from the 2008 January airlines data which do not have entries in airport-codes.
Based on the side of the airlines data set, we can say left or right. We will be using airlines as the driving data set and hence we will use left outer join.
We will be peforming join first and then we will aggregate to get number of flights from the concerned airports per airport.
In this case will get total number of flights per airport.
val airlinesPath = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlinesPath)
airlines.select("Year", "Month", "DayOfMonth", "Origin", "Dest", "CRSDepTime").show
airlines.count
val airportCodesPath = "/public/airlines_all/airport-codes"
def getValidAirportCodes(airportCodesPath: String) = {
val airportCodes = spark.
read.
option("sep", "\t").
option("header", true).
option("inferSchema", true).
csv(airportCodesPath).
filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country = 'USA'")
airportCodes
}
val airportCodes = getValidAirportCodes(airportCodesPath)
airportCodes.show
airportCodes.count
airlines.
join(airportCodes, airlines("Origin") === airportCodes("IATA"), "left").
filter("IATA IS NULL").
select(airlines("Year"), airlines("Month"), airlines("DayOfMonth"),
airlines("Origin"), airlines("Dest"), airlines("CRSDepTime"),
airportCodes("*")
).
show
import org.apache.spark.sql.functions.{lit, count}
airlines.
join(airportCodes, airlines("Origin") === airportCodes("IATA"), "left").
filter("IATA IS NULL").
groupBy("Origin").
agg(count(lit(1)).alias("FlightCount")).
orderBy($"FlightCount".desc).
show