# Basic Transformations

## Overview of Basic Transformations

Let us define problem statements to learn more about Data Frame APIs. We will try to cover filtering, aggregations and sorting as part of solutions for these problem statements.
* Get total number of flights as well as number of flights which are delayed in departure and number of flights delayed in arrival. 
 * Output should contain 3 columns - **FlightCount**, **DepDelayedCount**, **ArrDelayedCount**
* Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day. 
 * Output should contain 4 columns - **FlightDate**, **FlightCount**, **DepDelayedCount**, **ArrDelayedCount**
 * **FlightDate** should be of **YYYY-MM-dd** format.
 * Data should be **sorted** in ascending order by **flightDate**
* Get all the flights which are departed late but arrived early (**IsArrDelayed is NO**).
 * Output should contain - **FlightCRSDepTime**, **UniqueCarrier**, **FlightNum**, **Origin**, **Dest**, **DepDelay**, **ArrDelay**
 * **FlightCRSDepTime** need to be computed using **Year**, **Month**, **DayOfMonth**, **CRSDepTime**
 * **FlightCRSDepTime** should be displayed using **YYYY-MM-dd HH:mm** format.
 * Output should be sorted by **FlightCRSDepTime** and then by the difference between **DepDelay** and **ArrDelay**
 * Also get the count of such flights

## Starting Spark Context

Let us start spark context for this Notebook so that we can execute the code provided.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    appName('Basic Transformations'). \
    master('yarn'). \
    getOrCreate()

 
## Overview of Filtering
Let us understand few important details related to filtering before we get into the solution
* Filtering can be done either by using <mark>filter</mark> or <mark>where</mark>. These are like synonyms to each other.
* When it comes to the condition, we can either pass it in **SQL Style** or **Data Frame Style**.
* Example for SQL Style - <mark>airlines.filter("IsArrDelayed = 'YES'").show() or airlines.where("IsArrDelayed = 'YES'").show()</mark>
* Example for Data Frame Style - <mark>airlines.filter(airlines["IsArrDelayed"] == 'YES').show()</mark> or <mark>airlines.filter(airlines.IsArrDelayed == 'YES').show()</mark>. We can also use where instead of filter.
* Here are the other operations we can perform to filter the data - <mark>!=</mark>, <mark>></mark>, <mark><</mark>, <mark>>=</mark>, <mark><=</mark>, <mark>LIKE</mark>, <mark>BETWEEN</mark> with <mark>AND</mark>
* If we have to validate against multiple columns then we need to use boolean operations such as <mark>AND</mark> and <mark>OR</mark>.
* If we have to compare each column value with multiple values then we can use the <mark>IN</mark> operator.
    

### Tasks

Let us perform some tasks to understand filtering in detail. Solve all the problems by passing  conditions using both SQL Style as well as API Style.

* Read the data for the month of 2008 January.

In [None]:
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

In [None]:
airlines = spark. \
    read. \
    parquet(airlines_path)

In [None]:
airlines.printSchema()

* Get count of flights which are departed late at origin and reach destination early or on time.


In [None]:
airlines. \
    filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO'"). \
    count()

* API Style

In [None]:
from pyspark.sql.functions import col

In [None]:
airlines. \
    filter((col("IsDepDelayed") == "YES") & 
           (col("IsArrDelayed") == "NO")
          ). \
    count()

In [None]:
airlines. \
    filter((airlines["IsDepDelayed"] == "YES") & 
           (airlines.IsArrDelayed == "NO")
          ). \
    count()

* Get count of flights which are departed late from origin by more than 60 minutes.


In [None]:
airlines. \
    filter("DepDelay > 60"). \
    count()


* API Style

In [None]:
from pyspark.sql.functions import col

airlines. \
    filter(col("DepDelay") > 60). \
    count()

* Get count of flights which are departed early or on time but arrive late by at least 15 minutes.


In [None]:
airlines. \
    filter("IsDepDelayed = 'NO' AND ArrDelay >= 15"). \
    count()

* API Style

In [None]:
from pyspark.sql.functions import col

airlines. \
    filter((col("IsDepDelayed") == "NO") & (col("ArrDelay") >= 15)). \
    count()

* Get count of flights departed from following major airports - ORD, DFW, ATL, LAX, SFO.

In [None]:
airlines. \
    filter("Origin IN ('ORD', 'DFW', 'ATL', 'LAX', 'SFO')"). \
    count()

In [None]:
airlines.count()

* API Style

In [None]:
from pyspark.sql.functions import col
c = col('x')
help(c.isin)

In [None]:
from pyspark.sql.functions import col

airlines. \
    filter(col("Origin").isin("ORD", "DFW", "ATL", "LAX", "SFO")). \
    count()

* Add a column FlightDate by using Year, Month and DayOfMonth. Format should be **yyyyMMdd**.


In [None]:
from pyspark.sql.functions import col, concat, lpad

airlines. \
    withColumn("FlightDate",
                concat(col("Year"),
                       lpad(col("Month"), 2, "0"),
                       lpad(col("DayOfMonth"), 2, "0")
                      )
              ). \
    show()

* Get count of flights departed late between 2008 January 1st to January 9th using FlightDate.


In [None]:
from pyspark.sql.functions import col, concat, lpad

airlines. \
    withColumn("FlightDate",
               concat(col("Year"),
                      lpad(col("Month"), 2, "0"),
                      lpad(col("DayOfMonth"), 2, "0")
                     )
              ). \
    filter("IsDepDelayed = 'YES' AND FlightDate LIKE '2008010%'"). \
    count()

In [None]:
from pyspark.sql.functions import col, concat, lpad

airlines. \
    withColumn("FlightDate",
               concat(col("Year"),
                      lpad(col("Month"), 2, "0"),
                      lpad(col("DayOfMonth"), 2, "0")
                     )
              ). \
    filter("""
           IsDepDelayed = 'YES' AND 
           FlightDate BETWEEN 20080101 AND 20080109
          """). \
    count()

* API Style

In [None]:
from pyspark.sql.functions import col
c = col('x')
help(c.like)

In [None]:
from pyspark.sql.functions import col, concat, lpad

airlines. \
    withColumn("FlightDate",
               concat(col("Year"),
                      lpad(col("Month"), 2, "0"),
                      lpad(col("DayOfMonth"), 2, "0")
                     )
              ). \
    filter((col("IsDepDelayed") == "YES") & 
           (col("FlightDate").like("2008010%"))
          ). \
    count()

In [None]:
from pyspark.sql.functions import col
c = col('x')
help(c.between)

In [None]:
from pyspark.sql.functions import col, concat, lpad

airlines. \
    withColumn("FlightDate",
               concat(col("Year"),
                      lpad(col("Month"), 2, "0"),
                      lpad(col("DayOfMonth"), 2, "0")
                     )
              ). \
    filter((col("IsDepDelayed") == "YES") & 
           (col("FlightDate").between("20080101", "20080109"))
          ). \
    count()

* Get number of flights departed late on Sundays.

In [None]:
l = [('X',)]
df = spark.createDataFrame(l, "dummy STRING")

In [None]:
from pyspark.sql.functions import current_date
df.select(current_date()).show()

In [None]:
from pyspark.sql.functions import date_format

df.select(current_date(), date_format(current_date(), 'EE')).show()

In [None]:
from pyspark.sql.functions import col, concat, lpad

airlines. \
    withColumn("FlightDate",
               concat(col("Year"),
                      lpad(col("Month"), 2, "0"),
                      lpad(col("DayOfMonth"), 2, "0")
                     )
              ). \
    filter("""
           IsDepDelayed = 'YES' AND 
           date_format(to_date(FlightDate, 'yyyyMMdd'), 'EEEE') = 'Sunday'
           """). \
    count()

* API Style

In [None]:
from pyspark.sql.functions import col, concat, lpad, date_format, to_date

airlines. \
    withColumn("FlightDate",
               concat(col("Year"),
                      lpad(col("Month"), 2, "0"),
                      lpad(col("DayOfMonth"), 2, "0")
                     )
              ). \
    filter((col("IsDepDelayed") == "YES") &
           (date_format(
               to_date("FlightDate", "yyyyMMdd"), "EEEE"
           ) == "Sunday")
          ). \
    count()

## Overview of Aggregations

Let us go through the details related to aggregation using Spark.

* We can perform total aggregations directly on Dataframe or we can perform aggregations after grouping by a key(s).
* Here are the APIs which we typically use to group the data using a key.
 * groupBy
 * rollup
 * cube
* Here are the functions which we typically use to perform aggregations.
 * count
 * sum, avg
 * min, max
* If we want to provide aliases to the aggregated fields then we have to use <mark>agg</mark> after <mark>groupBy</mark>.

## Overview of Sorting

Let us understand how to sort the data in a Data Frame.
* We can use <mark>orderBy</mark> or <mark>sort</mark> to sort the data.
* We can perform composite sorting by passing multiple columns or expressions.
* By default data is sorted in ascending order, we can change it to descending by applying <mark>desc()</mark> function on the column or expression.

## Solutions - Problem 1
Get total number of flights as well as number of flights which are delayed in departure and number of flights delayed in arrival. 
* Output should contain 3 columns - **FlightCount**, **DepDelayedCount**, **ArrDelayedCount**

### Reading airlines data

In [None]:
airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

airlines = spark. \
    read. \
    parquet(airlines_path)

airlines.printSchema()

### Get flights with delayed arrival

In [None]:
# SQL Style
airlines.filter("IsArrDelayed = 'YES'").show()

In [None]:
# Data Frame Style
airlines.filter(airlines["IsArrDelayed"] == 'YES').show()

In [None]:
airlines.filter(airlines.IsArrDelayed == 'YES').show()

### Get delayed counts

In [None]:
## Departure Delayed Count
airlines. \
    filter(airlines.IsDepDelayed == "YES"). \
    count()

In [None]:
## Arrival Delayed Count
airlines. \
    filter(airlines.IsArrDelayed == "YES"). \
    count()

In [None]:
airlines. \
    filter("IsDepDelayed = 'YES' OR IsArrDelayed = 'YES'"). \
    select('Year', 'Month', 'DayOfMonth', 
           'FlightNum', 'IsDepDelayed', 'IsArrDelayed'
          ). \
    show()

In [None]:
## Both Departure Delayed and Arrival Delayed
from pyspark.sql.functions import col, lit, count, sum, expr
airlines. \
    agg(count(lit(1)).alias("FlightCount"),
        sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
        sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
       ). \
    show()

## Solutions - Problem 2

Get number of flights which are delayed in departure and number of flights delayed in arrival for each day along with number of flights departed for each day. 

* Output should contain 4 columns - **FlightDate**, **FlightCount**, **DepDelayedCount**, **ArrDelayedCount**
* **FlightDate** should be of **YYYY-MM-dd** format.
*   Data should be **sorted** in ascending order by **flightDate**

### Grouping Data by Flight Date


In [None]:
from pyspark.sql.functions import lit, concat, lpad
airlines. \
  groupBy(concat("Year", lit("-"), 
                 lpad("Month", 2, "0"), lit("-"), 
                 lpad("DayOfMonth", 2, "0")).
          alias("FlightDate"))

### Getting Counts by FlightDate

In [None]:
from pyspark.sql.functions import lit, concat, lpad, count

airlines. \
    groupBy(concat("Year", lit("-"), 
                   lpad("Month", 2, "0"), lit("-"), 
                   lpad("DayOfMonth", 2, "0")).
            alias("FlightDate")). \
    agg(count(lit(1)).alias("FlightCount")). \
    show(31)

In [None]:
# Alternative to get the count with out using agg
# We will not be able to provide alias for aggregated fields
from pyspark.sql.functions import lit, concat, lpad

airlines. \
    groupBy(concat("Year", lit("-"), 
                   lpad("Month", 2, "0"), lit("-"), 
                   lpad("DayOfMonth", 2, "0")).
            alias("FlightDate")). \
    count(). \
    show()

### Getting total as well as delayed counts for each day

In [None]:
from pyspark.sql.functions import lit, concat, lpad, count, sum, expr

airlines. \
    groupBy(concat("Year", lit("-"), 
                   lpad("Month", 2, "0"), lit("-"), 
                   lpad("DayOfMonth", 2, "0")).
            alias("FlightDate")). \
    agg(count(lit(1)).alias("FlightCount"),
        sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
        sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
       ). \
    show()

### Sorting Data By FlightDate

In [None]:
help(airlines.sort)

In [None]:
help(airlines.orderBy)

In [None]:
from pyspark.sql.functions import lit, concat, lpad, sum, expr
airlines. \
    groupBy(concat("Year", lit("-"), 
                   lpad("Month", 2, "0"), lit("-"), 
                   lpad("DayOfMonth", 2, "0")).
            alias("FlightDate")). \
    agg(count(lit(1)).alias("FlightCount"),
        sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
        sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
       ). \
    orderBy("FlightDate"). \
    show(31)

### Sorting Data in descending order by count

In [None]:
from pyspark.sql.functions import lit, concat, lpad, sum, expr, col
airlines. \
    groupBy(concat("Year", lit("-"), 
                   lpad("Month", 2, "0"), lit("-"), 
                   lpad("DayOfMonth", 2, "0")).
            alias("FlightDate")). \
    agg(count(lit(1)).alias("FlightCount"),
        sum(expr("CASE WHEN IsDepDelayed = 'YES' THEN 1 ELSE 0 END")).alias("DepDelayedCount"),
        sum(expr("CASE WHEN IsArrDelayed = 'YES' THEN 1 ELSE 0 END")).alias("ArrDelayedCount")
       ). \
    orderBy(col("FlightCount").desc()). \
    show()

## Solutions - Problem 3
Get all the flights which are departed late but arrived early (**IsArrDelayed is NO**).
* Output should contain - **FlightCRSDepTime**, **UniqueCarrier**, **FlightNum**, **Origin**, **Dest**, **DepDelay**, **ArrDelay**
* **FlightCRSDepTime** need to be computed using **Year**, **Month**, **DayOfMonth**, **CRSDepTime**
* **FlightCRSDepTime** should be displayed using **YYYY-MM-dd HH:mm** format.
* Output should be sorted by **FlightCRSDepTime** and then by the difference between **DepDelay** and **ArrDelay**
* Also get the count of such flights


In [None]:
airlines.select('Year', 'Month', 'DayOfMonth', 'CRSDepTime').show()

In [None]:
l = [(2008, 1, 23, 700),
     (2008, 1, 10, 1855),
    ]

In [None]:
df = spark.createDataFrame(l, "Year INT, Month INT, DayOfMonth INT, DepTime INT")
df.show()

In [None]:
from pyspark.sql.functions import substring
df.select(substring(col('DepTime'), -2, 2)). \
    show()

In [None]:
df.select("DepTime", date_format(lpad('DepTime', 4, "0"), 'HH:mm')).show()

In [None]:
help(substring)

In [None]:
df.select(substring(col('DepTime'), 1, length(col('DepTime').cast('string')))). \
    show()

In [None]:
from pyspark.sql.functions import lit, col, concat, lpad, sum, expr

flightsFiltered = airlines. \
    filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO'"). \
    select(concat("Year", lit("-"), 
                  lpad("Month", 2, "0"), lit("-"), 
                  lpad("DayOfMonth", 2, "0"), lit(" "),
                  lpad("CRSDepTime", 4, "0")
                 ).alias("FlightCRSDepTime"),
           "UniqueCarrier", "FlightNum", "Origin", 
           "Dest", "DepDelay", "ArrDelay"
          ). \
    orderBy("FlightCRSDepTime", col("DepDelay") - col("ArrDelay")). \
    show()

### Getting Count

In [None]:
from pyspark.sql.functions import lit, col, concat, lpad, sum, expr

flightsFiltered = airlines. \
    filter("IsDepDelayed = 'YES' AND IsArrDelayed = 'NO'"). \
    select(concat("Year", lit("-"), 
                  lpad("Month", 2, "0"), lit("-"), 
                  lpad("DayOfMonth", 2, "0"), lit(" "),
                  lpad("CRSDepTime", 4, "0")
                 ).alias("FlightCRSDepTime"),
           "UniqueCarrier", "FlightNum", "Origin", 
           "Dest", "DepDelay", "ArrDelay"
          ). \
    count()

flightsFiltered