2. Quick Recap of Scala¶
Let us quickly recap of some of the core programming concepts of Python before we get into Spark.
2.1. Data Engineering Life Cycle¶
Let us first understand the Data Engineering Life Cycle. We typically read the data, process it by applying business rules and write the data back to different targets
Read the data from different sources.
Files
Databases
Mainframes
APIs
Processing the data
Row Level Transformations
Aggregations
Sorting
Ranking
Joining multiple data sets
Write data to different targets.
Files
Databases
Mainframes
APIs
2.2. Python CLI or Jupyter Notebook¶
We can use Python CLI or Jupyter Notebook to explore APIs.
We can launch Python CLI using
python
command.We can launch the Jupyter Notebook using the
jupyter notebook
command.A web service will be started on port number 8888 by default.
We can go to the browser and connect to the web server using IP address and port number.
We should be able to explore code in interactive fashion.
We can issue magic commands such as %%sh to run shell commands, %%md to document using markdown etc.
2.2.1. Tasks¶
Let us perform these tasks to just recollect how to use Python CLI or Jupyter Notebook.
Create variables i and j assigning 10 and 20.5 respectively.
val i = 10
val j = 20.5
i = 10
j = 20.5
20.5
Add the values and assign it to res.
val res = i + j
res = 30.5
30.5
println(res)
30.5
Get the type of i, j and res.
2.3. Basic Programming Constructs¶
Let us recollect some of the basic programming constructs of Python.
Comparison Operations (==, !=, <, >, <=, >=, etc)
All the comparison operators return a True or False (Boolean value)
Conditionals (if)
We typically use comparison operators as part of conditionals.
Loops (for)
We can iterate through collection using
for i in l
where l is a standard collection such as list or set.Python provides special function called as
range
which will return a collection of integers between the given range. It excludes the upper bound value.
In Python, scope is defined by indentation.
2.3.1. Tasks¶
Let us perform few tasks to quickly recap basic programming constructs of Python.
Get all the odd numbers between 1 and 15.
(1 to 15 by 2)
Range(1, 3, 5, 7, 9, 11, 13, 15)
Print all those numbers which are divisible by 3 from the above list.
for (i <- (1 to 15 by 2))
if(i%3 == 0) println(i)
3
9
15
2.4. Developing Functions¶
Let us understand how to develop functions using Python as programming language.
Function starts with
def
followed by function name.Parameters can be of different types.
Required
Keyword
Variable Number
Functions
Functions which take another function as an argument is called higher order functions.
2.4.1. Tasks¶
Let us perform few tasks to understand how to develop functions in Python.
Sum of integers between lower bound and upper bound using formula.
def sumOfN(n: Int) =
(n * (n + 1)) / 2
sumOfN: (n: Int)Int
sumOfN(10)
55
def sumOfIntegers(lb: Int, ub: Int) =
sumOfN(ub) - sumOfN(lb - 1)
sumOfIntegers: (lb: Int, ub: Int)Int
sumOfIntegers(5, 10)
45
Sum of integers between lower bound and upper bound using loops.
def sumOfIntegers(lb: Int, ub: Int) = {
var total = 0
for (e <- (lb to ub))
total += e
total
}
sumOfIntegers: (lb: Int, ub: Int)Int
sumOfIntegers(1, 10)
55
Sum of squares of integers between lower bound and upper bound using loops.
def sumOfSquares(lb: Int, ub: Int) = {
var total = 0
for (e <- (lb to ub))
total += e * e
total
}
sumOfSquares: (lb: Int, ub: Int)Int
sumOfSquares(2, 4)
29
Sum of the even numbers between lower bound and upper bound using loops.
def sumOfEvens(lb: Int, ub: Int) = {
var total = 0
for (e <- (lb to ub))
total += e * e
total
}
sumOfEvens: (lb: Int, ub: Int)Int
sumOfEvens(2, 4)
29
2.5. Lambda Functions¶
Let us recap details related to lambda functions.
We can develop functions with out names. They are called Lambda Functions and also known as Anonymous Functions.
We typically use them to pass as arguments to higher order functions which takes functions as arguments
2.5.1. Tasks¶
Let us perform few tasks related to lambda functions.
Create a generic function mySum which is supposed to perform arithmetic using integers within a range.
It takes 3 arguments - lb, ub and f.
Function f should be invoked inside the function on each element within the range.
def mySum(lb: Int, ub: Int, f: Int => Int) = {
var total = 0
for (e <- (lb to ub))
total += f(e)
total
}
mySum: (lb: Int, ub: Int, f: Int => Int)Int
Sum of integers between lower bound and upper bound using mySum.
mySum(2, 4, i => i)
9
Sum of squares of integers between lower bound and upper bound using mySum.
mySum(2, 4, i => i * i)
29
Sum of the even numbers between lower bound and upper bound using mySum.
mySum(2, 4, i => if(i%2 == 0) i else 0)
6
2.6. Overview of Collections and Tuples¶
Let”s quickly recap about Collections and Tuples in Python. We will primarily talk about collections and tuples that comes as part of Python standard library such as list
, set
, dict
and tuple.
Group of elements with length and index -
list
Group of unique elements -
set
Group of key value pairs -
dict
While list, set and dict contain group of homogeneous elements, tuple contains group of heterogeneous elements.
We can consider list, set and dict as a table in a database and tuple as a row or record in a given table.
Typically we create list of tuples or set of tuples and dict is nothing but collection of tuples with 2 elements and key is unique.
We typically use Map Reduce APIs to process the data in collections. There are also some pre-defined functions such as
len
,sum
,min
,max
etc for aggregating data in collections.
2.6.1. Tasks¶
Let us perform few tasks to quickly recap details about Collections and Tuples in Python. We will also quickly recap about Map Reduce APIs.
Create a collection of orders by reading data from a file.
import sys.process._
"ls -ltr /data/retail_db/orders/part-00000"!
-rw-r--r-- 1 root root 2999944 Feb 20 2017 /data/retail_db/orders/part-00000
warning: there was one feature warning; re-run with -feature for details
0
val ordersPath = "/data/retail_db/orders/part-00000"
ordersPath = /data/retail_db/orders/part-00000
/data/retail_db/orders/part-00000
import scala.io.Source
val orders = Source.fromFile(ordersPath).
getLines
orders = non-empty iterator
non-empty iterator
Get all unique order statuses. Make sure data is sorted in alphabetical order.
val ordersPath = "/data/retail_db/orders/part-00000"
import scala.io.Source
val orders = Source.fromFile(ordersPath).
getLines
orders.
map(order => order.split(",")(3)).
toSet.
toList.
sorted.
foreach(println)
CANCELED
CLOSED
COMPLETE
ON_HOLD
PAYMENT_REVIEW
PENDING
PENDING_PAYMENT
PROCESSING
SUSPECTED_FRAUD
ordersPath = /data/retail_db/orders/part-00000
orders = empty iterator
empty iterator
Get count of all unique dates.
val ordersPath = "/data/retail_db/orders/part-00000"
import scala.io.Source
val orders = Source.fromFile(ordersPath).
getLines
orders.
map(order => order.split(",")(1)).
toSet.
toList.
sorted
ordersPath = /data/retail_db/orders/part-00000
orders = empty iterator
List(2013-07-25 00:00:00.0, 2013-07-26 00:00:00.0, 2013-07-27 00:00:00.0, 2013-07-28 00:00:00.0, 2013-07-29 00:00:00.0, 2013-07-30 00:00:00.0, 2013-07-31 00:00:00.0, 2013-08-01 00:00:00.0, 2013-08-02 00:00:00.0, 2013-08-03 00:00:00.0, 2013-08-04 00:00:00.0, 2013-08-05 00:00:00.0, 2013-08-06 00:00:00.0, 2013-08-07 00:00:00.0, 2013-08-08 00:00:00.0, 2013-08-09 00:00:00.0, 2013-08-10 00:00:00.0, 2013-08-11 00:00:00.0, 2013-08-12 00:00:00.0, 2013-08-13 00:00:00.0, 2013-08-14 00:00:00.0, 2013-08-15 00:00:00.0, 2013-08-16 00:00:00.0, 2013-08-17 00:00:00.0, 2013-08-18 00:00:00.0, 2013-08-19 00:00:00.0, 2013-08-20 00:00:00.0, 2013-08-21 00:00:00.0, 2013-0...
Sort the data in orders in ascending order by order_customer_id and then order_date.
val ordersPath = "/data/retail_db/orders/part-00000"
import scala.io.Source
val orders = Source.fromFile(ordersPath).
getLines
orders.
toList.
sortBy(k => {
val a = k.split(",")
(a(2).toInt, a(1))
}).
take(20).
foreach(println)
22945,2013-12-13 00:00:00.0,1,COMPLETE
57963,2013-08-02 00:00:00.0,2,ON_HOLD
15192,2013-10-29 00:00:00.0,2,PENDING_PAYMENT
67863,2013-11-30 00:00:00.0,2,COMPLETE
33865,2014-02-18 00:00:00.0,2,COMPLETE
22646,2013-12-11 00:00:00.0,3,COMPLETE
61453,2013-12-14 00:00:00.0,3,COMPLETE
23662,2013-12-19 00:00:00.0,3,COMPLETE
35158,2014-02-26 00:00:00.0,3,COMPLETE
46399,2014-05-09 00:00:00.0,3,PROCESSING
56178,2014-07-15 00:00:00.0,3,PENDING
57617,2014-07-24 00:00:00.0,3,COMPLETE
9023,2013-09-19 00:00:00.0,4,COMPLETE
9704,2013-09-24 00:00:00.0,4,COMPLETE
17253,2013-11-09 00:00:00.0,4,PENDING_PAYMENT
37878,2014-03-15 00:00:00.0,4,COMPLETE
49339,2014-05-28 00:00:00.0,4,COMPLETE
51157,2014-06-10 00:00:00.0,4,CLOSED
13705,2013-10-18 00:00:00.0,5,COMPLETE
36472,2014-03-06 00:00:00.0,5,PROCESSING
ordersPath = /data/retail_db/orders/part-00000
orders = empty iterator
empty iterator
Create a collection of order_items by reading data from a file.
val orderItemsPath = "/data/retail_db/order_items/part-00000"
import scala.io.Source
val orderItems = Source.fromFile(orderItemsPath).
getLines.
toList
orderItems.take(10).foreach(println)
1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99
6,4,365,5,299.95,59.99
7,4,502,3,150.0,50.0
8,4,1014,4,199.92,49.98
9,5,957,1,299.98,299.98
10,5,365,5,299.95,59.99
orderItemsPath = /data/retail_db/order_items/part-00000
orderItems = List(1,1,957,1,299.98,299.98, 2,2,1073,1,199.99,199.99, 3,2,502,5,250.0,50.0, 4,2,403,1,129.99,129.99, 5,4,897,2,49.98,24.99, 6,4,365,5,299.95,59.99, 7,4,502,3,150.0,50.0, 8,4,1014,4,199.92,49.98, 9,5,957,1,299.98,299.98, 10,5,365,5,299.95,59.99, 11,5,1014,2,99.96,49.98, 12,5,957,1,299.98,299.98, 13,5,403,1,129.99,129.99, 14,7,1073,1,199.99,199.99, 15,7,957,1,299.98,299.98, 16,7,926,5,79.95,15.99, 17,8,365,3,179.97,59.99, 18,8,365,5,299.95,59.99, 19,8,1014,4,199.92,49.98, 20,8,502,1,50.0,50.0, 21,9,191,2,199.98,99.99, 22,9,1073,1,199.99,199.99, 23,9,1073,1,199.99,199.99, 24,10,1073,1,199.99,199.99, 25,10,1014,2,99.96,49.98, 26,10,403,1,129.99,129.99, 27,10,917,1,21.99,21.99,...
List(1,1,957,1,299.98,299.98, 2,2,1073,1,199.99,199.99, 3,2,502,5,250.0,50.0, 4,2,403,1,129.99,129.99, 5,4,897,2,49.98,24.99, 6,4,365,5,299.95,59.99, 7,4,502,3,150.0,50.0, 8,4,1014,4,199.92,49.98, 9,5,957,1,299.98,299.98, 10,5,365,5,299.95,59.99, 11,5,1014,2,99.96,49.98, 12,5,957,1,299.98,299.98, 13,5,403,1,129.99,129.99, 14,7,1073,1,199.99,199.99, 15,7,957,1,299.98,299.98, 16,7,926,5,79.95,15.99, 17,8,365,3,179.97,59.99, 18,8,365,5,299.95,59.99, 19,8,1014,4,199.92,49.98, 20,8,502,1,50.0,50.0, 21,9,191,2,199.98,99.99, 22,9,1073,1,199.99,199.99, 23,9,1073,1,199.99,199.99, 24,10,1073,1,199.99,199.99, 25,10,1014,2,99.96,49.98, 26,10,403,1,129.99,129.99, 27,10,917,1,21.99,21.99,...
Get revenue for a given order_item_order_id.
def getOrderRevenue(orderItems: List[String], orderId: Int) = {
val orderItemsFiltered = orderItems.
filter(orderItem => orderItem.split(",")(1).toInt == orderId)
val orderItemsMap = orderItemsFiltered.
map(orderItem => orderItem.split(",")(4).toFloat)
orderItemsMap.sum
}
getOrderRevenue: (orderItems: List[String], orderId: Int)Float
val orderItemsPath = "/data/retail_db/order_items/part-00000"
import scala.io.Source
val orderItems = Source.fromFile(orderItemsPath).
getLines.
toList
orderItemsPath = /data/retail_db/order_items/part-00000
orderItems = List(1,1,957,1,299.98,299.98, 2,2,1073,1,199.99,199.99, 3,2,502,5,250.0,50.0, 4,2,403,1,129.99,129.99, 5,4,897,2,49.98,24.99, 6,4,365,5,299.95,59.99, 7,4,502,3,150.0,50.0, 8,4,1014,4,199.92,49.98, 9,5,957,1,299.98,299.98, 10,5,365,5,299.95,59.99, 11,5,1014,2,99.96,49.98, 12,5,957,1,299.98,299.98, 13,5,403,1,129.99,129.99, 14,7,1073,1,199.99,199.99, 15,7,957,1,299.98,299.98, 16,7,926,5,79.95,15.99, 17,8,365,3,179.97,59.99, 18,8,365,5,299.95,59.99, 19,8,1014,4,199.92,49.98, 20,8,502,1,50.0,50.0, 21,9,191,2,199.98,99.99, 22,9,1073,1,199.99,199.99, 23,9,1073,1,199.99,199.99, 24,10,1073,1,199.99,199.99, 25,10,1014,2,99.96,49.98, 26,10,403,1,129.99,129.99, 27,10,917,1,21.99,21.99,...
List(1,1,957,1,299.98,299.98, 2,2,1073,1,199.99,199.99, 3,2,502,5,250.0,50.0, 4,2,403,1,129.99,129.99, 5,4,897,2,49.98,24.99, 6,4,365,5,299.95,59.99, 7,4,502,3,150.0,50.0, 8,4,1014,4,199.92,49.98, 9,5,957,1,299.98,299.98, 10,5,365,5,299.95,59.99, 11,5,1014,2,99.96,49.98, 12,5,957,1,299.98,299.98, 13,5,403,1,129.99,129.99, 14,7,1073,1,199.99,199.99, 15,7,957,1,299.98,299.98, 16,7,926,5,79.95,15.99, 17,8,365,3,179.97,59.99, 18,8,365,5,299.95,59.99, 19,8,1014,4,199.92,49.98, 20,8,502,1,50.0,50.0, 21,9,191,2,199.98,99.99, 22,9,1073,1,199.99,199.99, 23,9,1073,1,199.99,199.99, 24,10,1073,1,199.99,199.99, 25,10,1014,2,99.96,49.98, 26,10,403,1,129.99,129.99, 27,10,917,1,21.99,21.99,...
print(getOrderRevenue(orderItems, 2))
579.98
2.7. Development Life Cycle¶
Let us understand the development life cycle. We typically use IDEs such as PyCharm to develop Python based applications.
Create Project - retail
Choose the interpreter 3.x
Make sure plugins such as pandas are installed.
Create config.py script for externalizing run time parameters such as input path, output path etc.
Create app folder for the source code.
2.8. Exercises¶
Let us perform few exercises to understand how to process the data. We will use LinkedIn data to perform some basic data processing using Python.
Get LinkedIn archive.
Go to https://linkedin.com
Me on top -> Settings & Privacy
Then go to “How LinkedIn users your data” -> Getting a copy of your data
Register and download. You will get a link as part of the email.
Data contain multiple CSV files. We will limit the analysis to Contacts.csv and Connections.csv.
Get the number of contacts with out email ids.
Get the number of contacts from each source.
Get the number of connections with each title.
Get the number of connections from each company.
Get the number of contacts for each month in the year 2018.
Use Postgres or MySQL as databases (you can setup in your laptop) and write connections data to the database