3. Data Processing - Overview¶
3.1. Pre-requisites and Module Introduction¶
Let us understand prerequisites before getting into the module.
Good understanding of Data Processing using Python.
Data Processing Life Cycle
Reading Data from files
Processing Data using APIs
Writing Processed Data back to files
We can also use Databases as sources and sinks. It will be covered at a later point in time.
We can also read data in streaming fashion which is out of the scope of this course.
We will get an overview of the Data Processing Life Cycle by the end of the module.
Read airlines data from the file.
Preview the schema and data to understand the characteristics of the data.
Get an overview of Data Frame APIs as well as functions used to process the data.
Check if there are any duplicates in the data.
Get an overview of how to write data in Data Frames to Files using File Formats such as Parquet using Compression.
Reorganize the data by month with different file format and using partitioning strategy.
We will deep dive into Data Frame APIs to process the data in subsequent modules.
3.2. Starting Spark Context¶
Let us start Spark Context using SparkSession.
SparkSession
is a class that is part ofpyspark.sql
package.It is a wrapper on top of Spark Context.
When Spark application is submitted using
spark-submit
orspark-shell
orpyspark
, a web service called as Spark Context will be started.Spark Context maintains the context of all the jobs that are submitted until it is killed.
SparkSession
is nothing but wrapper on top of Spark Context.We need to first create SparkSession object with any name. But typically we use
spark
. Once it is created, several APIs will be exposed includingread
.We need to at least set Application Name and also specify the execution mode in which Spark Context should run while creating
SparkSession
object.We can use
appName
to specify name for the application andmaster
to specify the execution mode.Below is the sample code snippet which will start the Spark Session object for us.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
config("spark.ui.port", "12903").
appName("Data Processing - Overview").
master("yarn").
getOrCreate
spark = org.apache.spark.sql.SparkSession@41911b81
org.apache.spark.sql.SparkSession@41911b81
spark
3.3. Overview of Spark read APIs¶
Let us get the overview of Spark read APIs to read files of different formats.
spark
has a bunch of APIs to read data from files of different formats.All APIs are exposed under
spark.read
text
- to read single column data from text files as well as reading each of the whole text file as one record.csv
- to read text files with delimiters. Default is a comma, but we can use other delimiters as well.json
- to read data from JSON filesorc
- to read data from ORC filesparquet
- to read data from Parquet files.We can also read data from other file formats by plugging in and by using
spark.read.format
We can also pass options based on the file formats.
inferSchema
- to infer the data types of the columns based on the data.header
- to use header to get the column names in case of text files.schema
- to explicitly specify the schema.We can get the help on APIs like
spark.read.csv
usinghelp(spark.read.csv)
.Reading delimited data from text files.
spark.
read.
schema("""order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
"""
).
csv("/public/retail_db/orders").
show
Reading JSON data from text files. We can infer schema from the data as each JSON object contain both column name and value.
Example for JSON
{ "order_id": 1, "order_date": "2013-07-25 00:00:00.0", "order_customer_id": 12345, "order_status": "COMPLETE" }
spark.
read.
json("/public/retail_db_json/orders").
show
3.4. Understand airlines data¶
Let us read one of the files and understand more about the data to determine right API with right options to process data later.
Our airlines data is in text file format.
We can use
spark.read.text
on one of the files to preview the data and understand the followingWhether header is present in files or not.
Field Delimiter that is being used.
Once we determine details about header and field delimiter we can use
spark.read.csv
with appropriate options to read the data.
val airlines = spark.read.
text("/public/airlines_all/airlines/part-00000")
airlines.show(false)
Data have header and each field is delimited by a comma.
3.5. Inferring Schema¶
Let us understand how we can quickly get schema using one file and apply on other files.
We can pass the file name pattern to
spark.read.csv
and read all the data in files under hdfs://public/airlines_all/airlines into Data Frame.We can use options such as
header
andinferSchema
to assign names and data types.However
inferSchema
will end up going through the entire data to assign schema. We can use samplingRatio to process fraction of data and then infer the schema.In case if the data in all the files have similar structure, we should be able to get the schema using one file and then apply it on others.
In our airlines data schema is consistent across all the files and hence we should be able to get the schema by going through one file and apply on the entire dataset.
val airlines_part_00000 = spark.
read.
option("header", "true").
option("inferSchema", "true").
csv("/public/airlines_all/airlines/part-00000")
airlines_part_00000.show(false)
airlines_part_00000.printSchema
val airlines_schema = spark.
read.
option("header", "true").
option("inferSchema", "true").
csv("/public/airlines_all/airlines/part-00000").
schema
val airlines = spark.
read.
option("header", "true").
schema(airlines_schema).
csv("/public/airlines_all/airlines/part*")
airlines.count
3.6. Previewing airlines data¶
Let us preview the airlines data to understand more about it.
As we have too many files, we will just process one file and preview the data.
File Name: hdfs://public/airlines_all/airlines/part-00000
spark.read.csv
will create a variable of type Data Frame.
val airlines_schema = spark.
read.
option("header", "true").
option("inferSchema", "true").
csv("/public/airlines_all/airlines/part-00000").
schema
val airlines = spark.
read.
option("header", "true").
schema(airlines_schema).
csv("/public/airlines_all/airlines/part*")
A Data Frame will have structure or schema.
We can print the schema using
airlines.printSchema()
We can preview the data using
airlines.show()
. By default it shows 20 records and some of the column values might be truncated for readability purpose.We can review the details of show by using
help(airlines.show)
We can pass custom number of records and say
truncate=False
to show complete information of all the records requested. It will facilitate us to preview all columns with desired number of records.
airlines.show(100, false)
We can get the number of records or rows in a Data Frame using
airlines.count()
In Databricks Notebook, we can use
display
to preview the data using Visualization featureWe can perform all kinds of standard transformations on our data. We need to have good knowledge of functions on Data Frames as well as functions on columns to apply all standard transformations.
Let us also validate if there are duplicates in our data, if yes we will remove duplicates while reorganizing the data later.
val airlines_schema = spark.
read.
option("header", "true").
option("inferSchema", "true").
csv("/public/airlines_all/airlines/part-00000").
schema
val airlines = spark.
read.
option("header", "true").
schema(airlines_schema).
csv("/public/airlines_all/airlines/part*")
airlines.printSchema
airlines.show
airlines.show(100, false)
airlines.count
airlines.distinct.count
3.7. Overview of Data Frame APIs¶
Let us get an overview of Data Frame APIs to process data in Data Frames.
Row Level Transformations or Projection of Data can be done using
select
,selectExpr
,withColumn
,drop
on Data Frame.We typically apply functions from
pyspark.sql.functions
on columns usingselect
andwithColumn
Filtering is typically done either by using
filter
orwhere
on Data Frame.We can pass the condition to
filter
orwhere
either by using SQL Style or Programming Language Style.Global Aggregations can be performed directly on the Data Frame.
By Key or Grouping Aggregations are typically performed using
groupBy
and then aggregate functions usingagg
We can sort the data in Data Frame using
sort
ororderBy
We will talk about Window Functions later. We can use use Window Functions for some advanced Aggregations and Ranking.
3.7.1. Tasks¶
Let us understand how to project the data using different options such as select
, selectExpr
, withColumn
, drop.
Create Dataframe employees using Collection
val employees = List((1, "Scott", "Tiger", 1000.0, "united states"),
(2, "Henry", "Ford", 1250.0, "India"),
(3, "Nick", "Junior", 750.0, "united KINGDOM"),
(4, "Bill", "Gomes", 1500.0, "AUSTRALIA")
)
val employeesDF = employees.
toDF("employee_id",
"first_name",
"last_name",
"salary",
"nationality"
)
employeesDF.printSchema
employeesDF.show
Project employee first name and last name.
employeesDF.
select("first_name", "last_name").
show
Project all the fields except for Nationality
employeesDF.
drop("nationality").
show
We will explore most of the APIs to process data in Data Frames as we get into the data processing at a later point in time
3.8. Overview of Functions¶
Let us get an overview of different functions that are available to process data in columns.
While Data Frame APIs work on the Data Frame, at times we might want to apply functions on column values.
Functions to process column values are available under
pyspark.sql.functions
. These are typically used in select or withColumn on top of Data Frame.There are approximately 300 pre-defined functions available for us.
Some of the important functions can be broadly categorized into String Manipulation, Date Manipulation, Numeric Functions and Aggregate Functions.
String Manipulation Functions
Concatenating Strings -
concat
Getting Length -
length
Trimming Strings -
trim
,rtrim
,ltrim
Padding Strings -
lpad
,rpad
Extracting Strings -
split
,substring
Date Manipulation Functions
Date Arithmetic -
date_add
,date_sub
,datediff
,add_months
Date Extraction -
dayofmonth
,month
,year
Get beginning period -
trunc
,date_trunc
Numeric Functions -
abs
,greatest
Aggregate Functions -
sum
,min
,max
3.8.1. Tasks¶
Let us perform a task to understand how functions are typically used.
Project full name by concatenating first name and last name along with other fields excluding first name and last name.
import org.apache.spark.sql.functions.{lit, concat}
employeesDF.
withColumn("full_name", concat($"first_name", lit(", "), $"last_name")).
drop("first_name", "last_name").
show
employeesDF.
select($"employee_id",
concat($"first_name", lit(", "), $"last_name").alias("full_name"),
$"salary",
$"nationality"
).
show
employeesDF.
selectExpr("employee_id",
"concat(first_name, ', ', last_name) AS full_name",
"salary",
"nationality"
).
show
We will explore most of the functions as we get into the data processing at a later point in time
3.9. Overview of Spark Write APIs¶
Let us understand how we can write Data Frames to different file formats.
All the batch write APIs are grouped under write which is exposed to Data Frame objects.
All APIs are exposed under spark.read
text
- to write single column data to text files.csv
- to write to text files with delimiters. Default is a comma, but we can use other delimiters as well.json
- to write data to JSON filesorc
- to write data to ORC filesparquet
- to write data to Parquet files.We can also write data to other file formats by plugging in and by using
write.format
, for example avroWe can use options based on the type using which we are writing the Data Frame to.
compression
- Compression codec (gzip
,snappy
etc)sep
- to specify delimiters while writing into text files using csvWe can
overwrite
the directories orappend
to existing directories usingmode
Create copy of orders data in parquet file format with no compression. If the folder already exists overwrite it. Target Location: /user/[YOUR_USER_NAME]/retail_db/orders
When you pass options, if there are typos then options will be ignored rather than failing. Be careful and make sure that output is validated.
By default the number of files in the output directory is equal to number of tasks that are used to process the data in the last stage. However, we might want to control number of files so that we don”t run into too many small files issue.
We can control number of files by using
coalesce
. It has to be invoked on top of Data Frame before invokingwrite
.
val orders = spark.
read.
schema("""order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
"""
).
csv("/public/retail_db/orders")
val username = System.getProperty("user.name")
orders.
write.
mode("overwrite").
option("compression", "none").
parquet(s"/user/${username}/retail_db/orders")
// Alternative approach - using format
val username = System.getProperty("user.name")
orders.
write.
mode("overwrite").
option("compression", "none").
format("parquet").
save(s"/user/${username}/retail_db/orders")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/${username}/retail_db/orders" !
// File extension should not contain compression algorithms such as snappy.
Read order_items data from /public/retail_db_json/order_items and write it to pipe delimited files with gzip compression. Target Location: /user/[YOUR_USER_NAME]/retail_db/order_items. Make sure to validate.
Ignore the error if the target location already exists. Also make sure to write into only one file. We can use
coalesce
for it.
coalesce
will be covered in detail at a later point in time
val order_items = spark.
read.
json("/public/retail_db_json/order_items")
// Using format
val username = System.getProperty("user.name")
order_items.
coalesce(1).
write.
mode("ignore").
option("compression", "gzip").
option("sep", "|").
format("csv").
save(s"/user/${username}/retail_db/order_items")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/${username}/retail_db/order_items" !
3.10. Reorganizing airlines data¶
Let us reorganize our airlines data to fewer files where data is compressed and also partitioned by Month.
We have ~1920 files of ~64MB Size.
Data is in the range of 1987 October and 2008 December (255 months)
By default it uses ~1920 threads to process the data and it might end up with too many small files. We can avoid that by using repartition and then partition by the month.
Here are the steps we are going to follow to partition by flight month and save the data to /user/[YOUR_USER_NAME]/airlines.
Read one file first and get the schema.
Read the entire data by applying the schema from the previous step.
Add additional column flightmonth using withColumn by using lpad on month column and concat functions. We need to do this as the month in our data set is of type integer and we want to pad with 0 for months till september to format it into YYYYMM.
Repartition the data into 255 based on the number of months using flightmonth
Partition the data by partitionBy while writing the data to the target location.
We will use parquet file format which will automatically compresses data using Snappy algorithm.
This process will take time, once it is done we will review the target location to which data is copied by partitioning using month
spark.stop
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
config("spark.dynamicAllocation.enabled", "false").
config("spark.executor.instances", 40).
appName("Data Processing - Overview").
master("yarn").
getOrCreate
spark
import org.apache.spark.sql.functions. {concat, lpad}
val airlines_schema = spark.read.
option("header", "true").
option("inferSchema", "true").
csv("/public/airlines_all/airlines/part-00000").
schema
val airlines = spark.
read.
option("header", "true").
schema(airlines_schema).
csv("/public/airlines_all/airlines/part*")
airlines.printSchema
airlines.show
spark.conf.set("spark.sql.shuffle.partitions", "255")
val username = System.getProperty("user.name")
airlines.
distinct.
withColumn("flightmonth", concat($"year", lpad($"month", 2, "0"))).
repartition(255, $"flightmonth").
write.
mode("overwrite").
partitionBy("flightmonth").
format("parquet").
save(s"/user/${username}/airlines-part")
3.11. Previewing reorganized data¶
Let us preview the data using reorganized data.
We will use new location going forward - /public/airlines_all/airlines-part. Data is already copied into that location.
We have partitioned data by month and stored in that location.
Instead of using complete data set we will read the data from one partition /public/airlines_all/airlines-part/flightmonth=200801
First let us create a DataFrame object by using
spark.read.parquet("/public/airlines_all/airlines-part/flightmonth=200801")
- let”s say airlines.We can get the schema of the DataFrame using
airlines.printSchema()
Use
airlines.show()
orairlines.show(100, truncate=False)
to preview the data.We can also use
display(airlines)
to get airlines data in tabular format as part of Databricks Notebook.We can also use
airlines.describe().show()
to get some statistics about the Data Frame andairlines.count()
to get the number of records in the DataFrame.
3.12. Analyze and Understand Data¶
Let us analyze and understand more about the data in detail using data of 2008 January.
First let us read the data for the month of 2008 January.
val airlines_path = "/public/airlines_all/airlines-part/flightmonth=200801"
val airlines = spark.
read.
parquet(airlines_path)
airlines_path = /public/airlines_all/airlines-part/flightmonth=200801
airlines = [Year: int, Month: int ... 29 more fields]
[Year: int, Month: int ... 29 more fields]
airlines.count
605659
airlines.printSchema
root
|-- Year: integer (nullable = true)
|-- Month: integer (nullable = true)
|-- DayofMonth: integer (nullable = true)
|-- DayOfWeek: integer (nullable = true)
|-- DepTime: string (nullable = true)
|-- CRSDepTime: integer (nullable = true)
|-- ArrTime: string (nullable = true)
|-- CRSArrTime: integer (nullable = true)
|-- UniqueCarrier: string (nullable = true)
|-- FlightNum: integer (nullable = true)
|-- TailNum: string (nullable = true)
|-- ActualElapsedTime: string (nullable = true)
|-- CRSElapsedTime: integer (nullable = true)
|-- AirTime: string (nullable = true)
|-- ArrDelay: string (nullable = true)
|-- DepDelay: string (nullable = true)
|-- Origin: string (nullable = true)
|-- Dest: string (nullable = true)
|-- Distance: string (nullable = true)
|-- TaxiIn: string (nullable = true)
|-- TaxiOut: string (nullable = true)
|-- Cancelled: integer (nullable = true)
|-- CancellationCode: string (nullable = true)
|-- Diverted: integer (nullable = true)
|-- CarrierDelay: string (nullable = true)
|-- WeatherDelay: string (nullable = true)
|-- NASDelay: string (nullable = true)
|-- SecurityDelay: string (nullable = true)
|-- LateAircraftDelay: string (nullable = true)
|-- IsArrDelayed: string (nullable = true)
|-- IsDepDelayed: string (nullable = true)
Get number of records -
airlines.count()
Go through the list of columns and understand the purpose of them.
Year
Month
DayOfMonth
CRSDepTime - Scheduled Departure Time
DepTime - Actual Departure Time.
DepDelay - Departure Delay in Minutes
CRSArrTime - Scheduled Arrival Time
ArrTime - Actual Arrival Time.
ArrDelay - Arrival Delay in Minutes.
UniqueCarrier - Carrier or Airlines
FlightNum - Flight Number
Distance - Distance between Origin and Destination
IsDepDelayed - this is set to yes for those flights where departure is delayed.
IsArrDelayed – this is set to yes for those flights where arrival is delayed.
Get number of unique origins
airlines.
select("Origin").
distinct.
count
286
Get number of unique destinations
airlines.
select("Dest").
distinct.
count
287
Get all unique carriers
airlines.
select("UniqueCarrier").
distinct.
show
+-------------+
|UniqueCarrier|
+-------------+
| UA|
| AA|
| NW|
| EV|
| B6|
| DL|
| OO|
| F9|
| YV|
| US|
| AQ|
| MQ|
| OH|
| HA|
| XE|
| AS|
| FL|
| CO|
| WN|
| 9E|
+-------------+
3.13. Conclusion¶
Let us recap about key takeaways from this module.
APIs to read the data from files into Data Frame.
Previewing Schema and the data in Data Frame.
Overview of Data Frame APIs and Functions
Writing data from Data Frame into Files
Reorganizing the airlines data by month
Simple APIs to analyze the data. Now it is time for us to deep dive into APIs to perform all the standard transformations as part of Data Processing.