3. Data Processing - Overview

3.1. Pre-requisites and Module Introduction

Let us understand prerequisites before getting into the module.

  • Good understanding of Data Processing using Python.

  • Data Processing Life Cycle

  • Reading Data from files

  • Processing Data using APIs

  • Writing Processed Data back to files

  • We can also use Databases as sources and sinks. It will be covered at a later point in time.

  • We can also read data in streaming fashion which is out of the scope of this course.

We will get an overview of the Data Processing Life Cycle by the end of the module.

  • Read airlines data from the file.

  • Preview the schema and data to understand the characteristics of the data.

  • Get an overview of Data Frame APIs as well as functions used to process the data.

  • Check if there are any duplicates in the data.

  • Get an overview of how to write data in Data Frames to Files using File Formats such as Parquet using Compression.

  • Reorganize the data by month with different file format and using partitioning strategy.

  • We will deep dive into Data Frame APIs to process the data in subsequent modules.

3.2. Starting Spark Context

Let us start Spark Context using SparkSession.

  • SparkSession is a class that is part of pyspark.sql package.

  • It is a wrapper on top of Spark Context.

  • When Spark application is submitted using spark-submit or spark-shell or pyspark, a web service called as Spark Context will be started.

  • Spark Context maintains the context of all the jobs that are submitted until it is killed.

  • SparkSession is nothing but wrapper on top of Spark Context.

  • We need to first create SparkSession object with any name. But typically we use spark. Once it is created, several APIs will be exposed including read.

  • We need to at least set Application Name and also specify the execution mode in which Spark Context should run while creating SparkSession object.

  • We can use appName to specify name for the application and master to specify the execution mode.

  • Below is the sample code snippet which will start the Spark Session object for us.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    config("spark.ui.port", "12903").
    appName("Data Processing - Overview").
    master("yarn").
    getOrCreate
spark = org.apache.spark.sql.SparkSession@41911b81
org.apache.spark.sql.SparkSession@41911b81
spark

3.3. Overview of Spark read APIs

Let us get the overview of Spark read APIs to read files of different formats.

  • spark has a bunch of APIs to read data from files of different formats.

  • All APIs are exposed under spark.read

  • text - to read single column data from text files as well as reading each of the whole text file as one record.

  • csv- to read text files with delimiters. Default is a comma, but we can use other delimiters as well.

  • json - to read data from JSON files

  • orc - to read data from ORC files

  • parquet - to read data from Parquet files.

  • We can also read data from other file formats by plugging in and by using spark.read.format

  • We can also pass options based on the file formats.

  • inferSchema - to infer the data types of the columns based on the data.

  • header - to use header to get the column names in case of text files.

  • schema - to explicitly specify the schema.

  • We can get the help on APIs like spark.read.csv using help(spark.read.csv).

  • Reading delimited data from text files.

spark.
    read.
    schema("""order_id INT, 
              order_date STRING, 
              order_customer_id INT, 
              order_status STRING
           """
          ).
    csv("/public/retail_db/orders").
    show
  • Reading JSON data from text files. We can infer schema from the data as each JSON object contain both column name and value.

  • Example for JSON

{ "order_id": 1, "order_date": "2013-07-25 00:00:00.0", "order_customer_id": 12345, "order_status": "COMPLETE" }
spark.
    read.
    json("/public/retail_db_json/orders").
    show

3.4. Understand airlines data

Let us read one of the files and understand more about the data to determine right API with right options to process data later.

  • Our airlines data is in text file format.

  • We can use spark.read.text on one of the files to preview the data and understand the following

  • Whether header is present in files or not.

  • Field Delimiter that is being used.

  • Once we determine details about header and field delimiter we can use spark.read.csv with appropriate options to read the data.

val airlines = spark.read.
    text("/public/airlines_all/airlines/part-00000")
airlines.show(false)
  • Data have header and each field is delimited by a comma.

3.5. Inferring Schema

Let us understand how we can quickly get schema using one file and apply on other files.

  • We can pass the file name pattern to spark.read.csv and read all the data in files under hdfs://public/airlines_all/airlines into Data Frame.

  • We can use options such as header and inferSchema to assign names and data types.

  • However inferSchema will end up going through the entire data to assign schema. We can use samplingRatio to process fraction of data and then infer the schema.

  • In case if the data in all the files have similar structure, we should be able to get the schema using one file and then apply it on others.

  • In our airlines data schema is consistent across all the files and hence we should be able to get the schema by going through one file and apply on the entire dataset.

val airlines_part_00000 = spark.
    read.
    option("header", "true").
    option("inferSchema", "true").
    csv("/public/airlines_all/airlines/part-00000")
airlines_part_00000.show(false)
airlines_part_00000.printSchema
val airlines_schema = spark.
    read.
    option("header", "true").
    option("inferSchema", "true").
    csv("/public/airlines_all/airlines/part-00000").
    schema
val airlines = spark.
    read.
    option("header", "true").
    schema(airlines_schema).
    csv("/public/airlines_all/airlines/part*")
airlines.count

3.6. Previewing airlines data

Let us preview the airlines data to understand more about it.

  • As we have too many files, we will just process one file and preview the data.

  • File Name: hdfs://public/airlines_all/airlines/part-00000

  • spark.read.csv will create a variable of type Data Frame.

val airlines_schema = spark.
    read.
    option("header", "true").
    option("inferSchema", "true").
    csv("/public/airlines_all/airlines/part-00000").
    schema
val airlines = spark.
    read.
    option("header", "true").
    schema(airlines_schema).
    csv("/public/airlines_all/airlines/part*")

A Data Frame will have structure or schema.

  • We can print the schema using airlines.printSchema()

  • We can preview the data using airlines.show(). By default it shows 20 records and some of the column values might be truncated for readability purpose.

  • We can review the details of show by using help(airlines.show)

  • We can pass custom number of records and say truncate=False to show complete information of all the records requested. It will facilitate us to preview all columns with desired number of records.

airlines.show(100, false)
  • We can get the number of records or rows in a Data Frame using airlines.count()

  • In Databricks Notebook, we can use display to preview the data using Visualization feature

  • We can perform all kinds of standard transformations on our data. We need to have good knowledge of functions on Data Frames as well as functions on columns to apply all standard transformations.

  • Let us also validate if there are duplicates in our data, if yes we will remove duplicates while reorganizing the data later.

val airlines_schema = spark.
    read.
    option("header", "true").
    option("inferSchema", "true").
    csv("/public/airlines_all/airlines/part-00000").
    schema
val airlines = spark.
    read.
    option("header", "true").
    schema(airlines_schema).
    csv("/public/airlines_all/airlines/part*")
airlines.printSchema
airlines.show
airlines.show(100, false)
airlines.count
airlines.distinct.count

3.7. Overview of Data Frame APIs

Let us get an overview of Data Frame APIs to process data in Data Frames.

  • Row Level Transformations or Projection of Data can be done using select, selectExpr, withColumn, drop on Data Frame.

  • We typically apply functions from pyspark.sql.functions on columns using select and withColumn

  • Filtering is typically done either by using filter or where on Data Frame.

  • We can pass the condition to filter or where either by using SQL Style or Programming Language Style.

  • Global Aggregations can be performed directly on the Data Frame.

  • By Key or Grouping Aggregations are typically performed using groupBy and then aggregate functions using agg

  • We can sort the data in Data Frame using sort or orderBy

  • We will talk about Window Functions later. We can use use Window Functions for some advanced Aggregations and Ranking.

3.7.1. Tasks

Let us understand how to project the data using different options such as select, selectExpr, withColumn, drop.

  • Create Dataframe employees using Collection

val employees = List((1, "Scott", "Tiger", 1000.0, "united states"),
                     (2, "Henry", "Ford", 1250.0, "India"),
                     (3, "Nick", "Junior", 750.0, "united KINGDOM"),
                     (4, "Bill", "Gomes", 1500.0, "AUSTRALIA")
                    )
val employeesDF = employees.
    toDF("employee_id", 
         "first_name", 
         "last_name", 
         "salary", 
         "nationality"
        )
employeesDF.printSchema
employeesDF.show
  • Project employee first name and last name.

employeesDF.
    select("first_name", "last_name").
    show
  • Project all the fields except for Nationality

employeesDF.
    drop("nationality").
    show

We will explore most of the APIs to process data in Data Frames as we get into the data processing at a later point in time

3.8. Overview of Functions

Let us get an overview of different functions that are available to process data in columns.

  • While Data Frame APIs work on the Data Frame, at times we might want to apply functions on column values.

  • Functions to process column values are available under pyspark.sql.functions. These are typically used in select or withColumn on top of Data Frame.

  • There are approximately 300 pre-defined functions available for us.

  • Some of the important functions can be broadly categorized into String Manipulation, Date Manipulation, Numeric Functions and Aggregate Functions.

  • String Manipulation Functions

  • Concatenating Strings - concat

  • Getting Length - length

  • Trimming Strings - trim, rtrim, ltrim

  • Padding Strings - lpad, rpad

  • Extracting Strings - split, substring

  • Date Manipulation Functions

  • Date Arithmetic - date_add, date_sub, datediff, add_months

  • Date Extraction - dayofmonth, month, year

  • Get beginning period - trunc, date_trunc

  • Numeric Functions - abs, greatest

  • Aggregate Functions - sum, min, max

3.8.1. Tasks

Let us perform a task to understand how functions are typically used.

  • Project full name by concatenating first name and last name along with other fields excluding first name and last name.

import org.apache.spark.sql.functions.{lit, concat}

employeesDF.
    withColumn("full_name", concat($"first_name", lit(", "), $"last_name")).
    drop("first_name", "last_name").
    show
employeesDF.
    select($"employee_id",
           concat($"first_name", lit(", "), $"last_name").alias("full_name"),
           $"salary",
           $"nationality"
          ).
    show
employeesDF.
    selectExpr("employee_id",
               "concat(first_name, ', ', last_name) AS full_name",
               "salary",
               "nationality"
              ).
    show

We will explore most of the functions as we get into the data processing at a later point in time

3.9. Overview of Spark Write APIs

Let us understand how we can write Data Frames to different file formats.

  • All the batch write APIs are grouped under write which is exposed to Data Frame objects.

  • All APIs are exposed under spark.read

  • text - to write single column data to text files.

  • csv - to write to text files with delimiters. Default is a comma, but we can use other delimiters as well.

  • json - to write data to JSON files

  • orc - to write data to ORC files

  • parquet - to write data to Parquet files.

  • We can also write data to other file formats by plugging in and by using write.format, for example avro

  • We can use options based on the type using which we are writing the Data Frame to.

  • compression - Compression codec (gzip, snappy etc)

  • sep - to specify delimiters while writing into text files using csv

  • We can overwrite the directories or append to existing directories using mode

  • Create copy of orders data in parquet file format with no compression. If the folder already exists overwrite it. Target Location: /user/[YOUR_USER_NAME]/retail_db/orders

  • When you pass options, if there are typos then options will be ignored rather than failing. Be careful and make sure that output is validated.

  • By default the number of files in the output directory is equal to number of tasks that are used to process the data in the last stage. However, we might want to control number of files so that we don”t run into too many small files issue.

  • We can control number of files by using coalesce. It has to be invoked on top of Data Frame before invoking write.

val orders = spark.
    read.
    schema("""order_id INT, 
              order_date STRING, 
              order_customer_id INT, 
              order_status STRING
           """
          ).
    csv("/public/retail_db/orders")
val username = System.getProperty("user.name")

orders.
    write.
    mode("overwrite").
    option("compression", "none").
    parquet(s"/user/${username}/retail_db/orders")
// Alternative approach - using format
val username = System.getProperty("user.name")

orders.
    write.
    mode("overwrite").
    option("compression", "none").
    format("parquet").
    save(s"/user/${username}/retail_db/orders")
import sys.process._

val username = System.getProperty("user.name")

s"hdfs dfs -ls /user/${username}/retail_db/orders" !

// File extension should not contain compression algorithms such as snappy.
  • Read order_items data from /public/retail_db_json/order_items and write it to pipe delimited files with gzip compression. Target Location: /user/[YOUR_USER_NAME]/retail_db/order_items. Make sure to validate.

  • Ignore the error if the target location already exists. Also make sure to write into only one file. We can use coalesce for it.

coalesce will be covered in detail at a later point in time

val order_items = spark.
    read.
    json("/public/retail_db_json/order_items")
// Using format

val username = System.getProperty("user.name")

order_items.
    coalesce(1).
    write.
    mode("ignore").
    option("compression", "gzip").
    option("sep", "|").
    format("csv").
    save(s"/user/${username}/retail_db/order_items")
import sys.process._

val username = System.getProperty("user.name")

s"hdfs dfs -ls /user/${username}/retail_db/order_items" !

3.10. Reorganizing airlines data

Let us reorganize our airlines data to fewer files where data is compressed and also partitioned by Month.

  • We have ~1920 files of ~64MB Size.

  • Data is in the range of 1987 October and 2008 December (255 months)

  • By default it uses ~1920 threads to process the data and it might end up with too many small files. We can avoid that by using repartition and then partition by the month.

  • Here are the steps we are going to follow to partition by flight month and save the data to /user/[YOUR_USER_NAME]/airlines.

  • Read one file first and get the schema.

  • Read the entire data by applying the schema from the previous step.

  • Add additional column flightmonth using withColumn by using lpad on month column and concat functions. We need to do this as the month in our data set is of type integer and we want to pad with 0 for months till september to format it into YYYYMM.

  • Repartition the data into 255 based on the number of months using flightmonth

  • Partition the data by partitionBy while writing the data to the target location.

  • We will use parquet file format which will automatically compresses data using Snappy algorithm.

This process will take time, once it is done we will review the target location to which data is copied by partitioning using month

spark.stop
import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    config("spark.dynamicAllocation.enabled", "false").
    config("spark.executor.instances", 40).
    appName("Data Processing - Overview").
    master("yarn").
    getOrCreate
spark
import org.apache.spark.sql.functions. {concat, lpad}

val airlines_schema = spark.read.
    option("header", "true").
    option("inferSchema", "true").
    csv("/public/airlines_all/airlines/part-00000").
    schema
val airlines = spark.
    read.
    option("header", "true").
    schema(airlines_schema).
    csv("/public/airlines_all/airlines/part*")
airlines.printSchema
airlines.show
spark.conf.set("spark.sql.shuffle.partitions", "255")
val username = System.getProperty("user.name")

airlines.
    distinct.
    withColumn("flightmonth", concat($"year", lpad($"month", 2, "0"))).
    repartition(255, $"flightmonth").
    write.
    mode("overwrite").
    partitionBy("flightmonth").
    format("parquet").
    save(s"/user/${username}/airlines-part")

3.11. Previewing reorganized data

Let us preview the data using reorganized data.

  • We will use new location going forward - /public/airlines_all/airlines-part. Data is already copied into that location.

  • We have partitioned data by month and stored in that location.

  • Instead of using complete data set we will read the data from one partition /public/airlines_all/airlines-part/flightmonth=200801

  • First let us create a DataFrame object by using spark.read.parquet("/public/airlines_all/airlines-part/flightmonth=200801") - let”s say airlines.

  • We can get the schema of the DataFrame using airlines.printSchema()

  • Use airlines.show() or airlines.show(100, truncate=False) to preview the data.

  • We can also use display(airlines) to get airlines data in tabular format as part of Databricks Notebook.

  • We can also use airlines.describe().show() to get some statistics about the Data Frame and airlines.count() to get the number of records in the DataFrame.

3.12. Analyze and Understand Data

Let us analyze and understand more about the data in detail using data of 2008 January.

  • First let us read the data for the month of 2008 January.

val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"

val airlines = spark.
    read.
    parquet(airlines_path)
airlines_path = /public/airlines_all/airlines-part/flightmonth=200801
airlines = [Year: int, Month: int ... 29 more fields]
[Year: int, Month: int ... 29 more fields]
airlines.count
605659
airlines.printSchema
root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: string (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: string (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: string (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: string (nullable = true)
 |-- ArrDelay: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- TaxiIn: string (nullable = true)
 |-- TaxiOut: string (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)
 |-- CarrierDelay: string (nullable = true)
 |-- WeatherDelay: string (nullable = true)
 |-- NASDelay: string (nullable = true)
 |-- SecurityDelay: string (nullable = true)
 |-- LateAircraftDelay: string (nullable = true)
 |-- IsArrDelayed: string (nullable = true)
 |-- IsDepDelayed: string (nullable = true)
  • Get number of records - airlines.count()

  • Go through the list of columns and understand the purpose of them.

    • Year

    • Month

    • DayOfMonth

    • CRSDepTime - Scheduled Departure Time

    • DepTime - Actual Departure Time.

    • DepDelay - Departure Delay in Minutes

    • CRSArrTime - Scheduled Arrival Time

    • ArrTime - Actual Arrival Time.

    • ArrDelay - Arrival Delay in Minutes.

    • UniqueCarrier - Carrier or Airlines

    • FlightNum - Flight Number

    • Distance - Distance between Origin and Destination

    • IsDepDelayed - this is set to yes for those flights where departure is delayed.

    • IsArrDelayed – this is set to yes for those flights where arrival is delayed.

  • Get number of unique origins

airlines.
    select("Origin").
    distinct.
    count
286
  • Get number of unique destinations

airlines.
    select("Dest").
    distinct.
    count
287
  • Get all unique carriers

airlines.
    select("UniqueCarrier").
    distinct.
    show
+-------------+
|UniqueCarrier|
+-------------+
|           UA|
|           AA|
|           NW|
|           EV|
|           B6|
|           DL|
|           OO|
|           F9|
|           YV|
|           US|
|           AQ|
|           MQ|
|           OH|
|           HA|
|           XE|
|           AS|
|           FL|
|           CO|
|           WN|
|           9E|
+-------------+

3.13. Conclusion

Let us recap about key takeaways from this module.

  • APIs to read the data from files into Data Frame.

  • Previewing Schema and the data in Data Frame.

  • Overview of Data Frame APIs and Functions

  • Writing data from Data Frame into Files

  • Reorganizing the airlines data by month

  • Simple APIs to analyze the data. Now it is time for us to deep dive into APIs to perform all the standard transformations as part of Data Processing.