6. Joining Data Sets

Let us understand how to join multiple Data Sets using Spark based APIs.

6.1. Prepare Datasets for Joins

Let us prepare Dataset to join and get the details related to airports (origin and destination).

  • Make sure airport-codes is in HDFS.

%%sh
hdfs dfs -ls /public/airlines_all/airport-codes

6.2. Starting Spark Context

Let us start spark context for this Notebook so that we can execute the code provided.

from pyspark.sql import SparkSession

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    appName('Joining Data Sets'). \
    master('yarn'). \
    getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', '2')
  • Analyze the Dataset to confirm if there is header and also how the data is structured.

spark.read. \
    text("/public/airlines_all/airport-codes"). \
    show(truncate=False)
  • Data is tab separated.

  • There is header for the data set.

  • Dataset have 4 fields - Country, State, City, IATA

Create DataFrame airport_codes applying appropriate Schema.

airport_codes_path = "/public/airlines_all/airport-codes"
airport_codes = spark. \
    read. \
    csv(airport_codes_path,
        sep="\t",
        header=True,
        inferSchema=True
       )
  • Preview and Understand the data.

airport_codes.show()
  • Get schema of airport_codes.

airport_codes.printSchema()
  • Preview the data

  • Get the count of records

airport_codes.count()
  • Get the count of unique records and see if it is the same as total count.

airport_codes. \
    select("IATA"). \
    distinct(). \
    count()
  • If they are not equal, analyze the data and identify IATA codes which are repeated more than once.

from pyspark.sql.functions import lit, count
duplicate_iata_count = airport_codes. \
    groupBy("IATA"). \
    agg(count(lit(1)).alias("iata_count")). \
    filter("iata_count > 1")
duplicate_iata_count.show()
  • Filter out the duplicates using the most appropriate one and discard others.

airport_codes. \
    filter("IATA = 'Big'"). \
    show()
airport_codes. \
    filter("!(State = 'Hawaii' AND IATA = 'Big')"). \
    show()
  • Get number of airports (IATA Codes) for each state in the US. Sort the data in descending order by count.

from pyspark.sql.functions import col, lit, count
airport_codes_path = "/public/airlines_all/airport-codes"
airport_codes = spark. \
    read. \
    csv(airport_codes_path,
        sep="\t",
        header=True,
        inferSchema=True
       ). \
    filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country='USA'")
airport_count_per_state = airport_codes. \
    groupBy("Country", "State"). \
    agg(count(lit(1)).alias("IATACount")). \
    orderBy(col("IATACount").desc())
airport_count_per_state.show()

6.3. Joining Data Frames

Let us understand how to join Data Frames by using some problem statements. Use 2008 January data.

  • Get number of flights departed from each of the US airport.

  • Get number of flights departed from each of the state.

  • Get the list of airports in the US from which flights are not departed.

  • Check if there are any origins in airlines data which do not have record in airport-codes.

  • Get the total number of flights from the airports that do not contain entries in airport-codes.

  • Get the total number of flights per airport that do not contain entries in airport-codes.

6.4. Overview of Joins

Let us get an overview of joining Data Frames.

  • Our data cannot be stored in one table. It will be stored in multiple tables and the tables might be related.

  • When it comes to transactional systems, we typically define tables based on Normalization Principles.

  • When it comes to data warehousing applications, we typically define tables using Dimensional Modeling.

  • Either of the approach data is scattered into multiple tables and relationships are defined.

  • Typically tables are related with one to one, one to many, many to many relationships.

  • When we have 2 Data Sets that are related based on a common key we typically perform join.

  • There are different types of joins.

  • INNER JOIN

  • OUTER JOIN (LEFT or RIGHT)

  • FULL OUTER JOIN (a LEFT OUTER JOIN b UNION a RIGHT OUTER JOIN b)

6.5. Solutions - Problem 1

Get number of flights departed from each of the US airport.

from pyspark.sql.functions import col, lit, count
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
    read. \
    parquet(airlines_path)
airlines.show()
airlines.select("Year", "Month", "DayOfMonth", "Origin", "Dest", "CRSDepTime").show()
airlines.count()
airport_codes_path = "/public/airlines_all/airport-codes"
airport_codes = spark. \
    read. \
    csv(airport_codes_path,
        sep="\t",
        header=True,
        inferSchema=True
       ). \
    filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country='USA'")
airport_codes.count()
airlines. \
    join(airport_codes, col("IATA") == col("Origin")). \
    select("Year", "Month", "DayOfMonth", airport_codes["*"], "CRSDepTime"). \
    show()
airlines. \
    join(airport_codes, airport_codes.IATA == airlines["Origin"]). \
    select("Year", "Month", "DayOfMonth", airport_codes["*"], "CRSDepTime"). \
    show()
airlines.join?
flight_count_per_airport = airlines. \
    join(airport_codes, airport_codes.IATA == airlines.Origin). \
    groupBy("Origin"). \
    agg(count(lit(1)).alias("FlightCount")). \
    orderBy(col("FlightCount").desc())
flight_count_per_airport.show()

6.6. Solutions - Problem 2

Get number of flights departed from each of the state.

from pyspark.sql.functions import col, lit, count
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
    read. \
    parquet(airlines_path)
airport_codes_path = "/public/airlines_all/airport-codes"
airport_codes = spark. \
    read. \
    csv(airport_codes_path,
        sep="\t",
        header=True,
        inferSchema=True
       ). \
    filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country='USA'")
flight_count_per_state = airlines. \
    join(airport_codes, airport_codes.IATA == airlines.Origin). \
    groupBy("State"). \
    agg(count(lit(1)).alias("FlightCount")). \
    orderBy(col("FlightCount").desc())
flight_count_per_state.show()

6.7. Solutions - Problem 3

Get the list of airports in the US from which flights are not departed.

airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
    read. \
    parquet(airlines_path)
airport_codes_path = "/public/airlines_all/airport-codes"
airport_codes = spark. \
    read. \
    csv(airport_codes_path,
        sep="\t",
        header=True,
        inferSchema=True
       ). \
    filter("!(State = 'Hawaii' AND IATA = 'Big') AND Country='USA'")
airport_codes.printSchema()
airports_not_used = airport_codes. \
    join(airlines, airport_codes.IATA == airlines.Origin, "left"). \
    select(airport_codes["*"], "Year", "Month", 
           "DayOfMonth", "Origin", "CRSDepTime"). \
    show()
airports_not_used = airport_codes. \
    join(airlines, airport_codes.IATA == airlines.Origin, "left"). \
    filter(airlines.Origin.isNull()). \
    select('City', 'State', 'Country', 'IATA')
airports_not_used = airlines. \
    join(airport_codes, airport_codes.IATA == airlines.Origin, "right"). \
    filter("Origin IS NULL"). \
    select('City', 'State', 'Country', 'IATA')
airports_not_used.count()
airport_codes.show()

6.8. Solutions - Problem 4

Check if there are any origins in airlines data which do not have record in airport-codes.

airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
    read. \
    parquet(airlines_path)
airport_codes_path = "/public/airlines_all/airport-codes"
airport_codes = spark. \
    read. \
    csv(airport_codes_path,
        sep="\t",
        header=True,
        inferSchema=True
       ). \
    filter("!(State = 'Hawaii' AND IATA = 'Big')")
airlines. \
    join(airport_codes, airlines.Origin == airport_codes.IATA, "left"). \
    filter("IATA IS NULL"). \
    select("Origin"). \
    distinct(). \
    show()
airlines. \
    join(airport_codes, airlines.Origin == airport_codes.IATA, "left"). \
    filter("IATA IS NULL"). \
    select("Origin"). \
    distinct(). \
    count()

6.9. Solutions - Problem 5

Get the total number of flights from the airports that do not contain entries in airport-codes.

airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
    read. \
    parquet(airlines_path)
airport_codes_path = "/public/airlines_all/airport-codes"
airport_codes = spark. \
    read. \
    csv(airport_codes_path,
        sep="\t",
        header=True,
        inferSchema=True
       ). \
    filter("!(State = 'Hawaii' AND IATA = 'Big')")
airlines. \
    join(airport_codes, airlines.Origin == airport_codes.IATA, "left"). \
    filter("IATA IS NULL"). \
    count()

6.10. Solutions - Problem 6

Get the total number of flights per airport that do not contain entries in airport-codes.

airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
airlines = spark. \
    read. \
    parquet(airlines_path)
airport_codes_path = "/public/airlines_all/airport-codes"
airport_codes = spark. \
    read. \
    csv(airport_codes_path,
        sep="\t",
        header=True,
        inferSchema=True
       ). \
    filter("!(State = 'Hawaii' AND IATA = 'Big')")
airlines. \
    join(airport_codes, airlines.Origin == airport_codes.IATA, "left"). \
    filter("IATA IS NULL"). \
    groupBy("Origin"). \
    count(). \
    show()