1. Getting Started¶
1.1. Platforms to Practice¶
Let us understand different platforms we can leverage to practice Apache Spark using Python.
Local Setup
Databricks Platform
Setting up your own cluster
Cloud based labs
1.2. Setup Spark Locally - Ubuntu¶
Let us setup Spark Locally on Ubuntu.
Install latest version of Anaconda
Make sure Jupyter Notebook is setup and validated.
Setup Spark and Validate.
Setup Environment Variables to integrate Pyspark with Jupyter Notebook.
Launch Jupyter Notebook using
pyspark
command.Setup PyCharm (IDE) for application development.
1.3. Setup Spark Locally - Mac¶
1.3.1. Let us setup Spark Locally on Ubuntu.¶
Install latest version of Anaconda
Make sure Jupyter Notebook is setup and validated.
Setup Spark and Validate.
Setup Environment Variables to integrate Pyspark with Jupyter Notebook.
Launch Jupyter Notebook using
pyspark
command.Setup PyCharm (IDE) for application development.
1.4. Signing up for ITVersity Labs¶
1.5. Using ITVersity Labs¶
Let us understand how to submit the Spark Jobs in ITVersity Labs.
As we are using Python we can also use the help command to get the documentation - for example
help(spark.read.csv)
1.6. Interacting with File Systems¶
Let us understand how to interact with file system using %fs command from Databricks Notebook.
We can access datasets using %fs magic command in Databricks notebook
By default, we will see files under dbfs
We can list the files using ls command - e. g.:
(%fs ls)
Databricks provides lot of datasets for free under databricks-datasets
If the cluster is integrated with AWS or Azure Blob we can access files by specifying the appropriate protocol (e.g.: s3:// for s3)
List of commands available under %fs
Copying files or directories
-cp
Moving files or directories
- mv
Creating directories
- mkdirs
Deleting files and directories
- rm
We can copy or delete directories recursively using
-r
or--recursive
1.7. Getting File Metadata¶
Let us review the source location to get number of files and the size of the data we are going to process.
Location of airlines data dbfs:/databricks-datasets/airlines
We can get first 1000 files using %fs ls dbfs:/databricks-datasets/airlines
Location contain 1919 Files, however we will not be able to see all the details using %fs command.
Databricks File System commands does not have capability to understand metadata of files such as size in details.
When Spark Cluster is started, it will create 2 objects - spark and sc
sc is of type SparkContext and spark is of type SparkSession
Spark uses HDFS APIs to interact with the file system and we can access HDFS APIs using sc._jsc and sc._jvm to get file metadata.
Here are the steps to get the file metadata.
Get Hadoop Configuration using
sc._jsc.hadoopConfiguration()
- let”s sayconf
We can pass conf to
sc._jvm.org.apache.hadoop.fs.FileSystem.
get to get FileSystem object - let”s sayfs
We can build
path
object by passing the path as string tosc._jvm.org.apache.hadoop.fs.Path
We can invoke
listStatus
on top of fs by passing path which will return an array of FileStatus objects - let”s say files.Each
FileStatus
object have all the metadata of each file.We can use
len
on files to get number of files.We can use
getLen
on eachFileStatus
object to get the size of each file.Cumulative size of all files can be achieved using
sum(map(lambda file: file.getLen(), files))
Let us first get list of files
%fs ls dbfs:/databricks-datasets/airlines
Here is the consolidated script to get number of files and cumulative size of all files in a given folder.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
appName("Getting Started").
master("yarn").
getOrCreate
spark = org.apache.spark.sql.SparkSession@6a105b5
org.apache.spark.sql.SparkSession@6a105b5
val conf = spark.sparkContext.hadoopConfiguration
conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml
Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml
import org.apache.hadoop.fs.FileSystem
val fs = FileSystem.get(conf)
fs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1742269756_44, ugi=training (auth:SIMPLE)]]
DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1742269756_44, ugi=training (auth:SIMPLE)]]
import org.apache.hadoop.fs.Path
val path = new Path("/public/airlines_all/airlines")
path = /public/airlines_all/airlines
/public/airlines_all/airlines
val files = fs.listStatus(path)
files = Array(FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/README.md; isDirectory=false; length=1089; replication=2; blocksize=134217728; modification_time=1572112932387; access_time=1584372852270; owner=hdfs; group=hdfs; permission=rw-r--r--; isSymlink=false}, FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/_SUCCESS; isDirectory=false; length=0; replication=2; blocksize=134217728; modification_time=1572112932485; access_time=1580753965620; owner=hdfs; group=hdfs; permission=rw-r--r--; isSymlink=false}, FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/part-00000; isDirectory=false; length=67108879; replication=2; blocksize=134217728; modification_time=15721129332...
Array(FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/README.md; isDirectory=false; length=1089; replication=2; blocksize=134217728; modification_time=1572112932387; access_time=1584372852270; owner=hdfs; group=hdfs; permission=rw-r--r--; isSymlink=false}, FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/_SUCCESS; isDirectory=false; length=0; replication=2; blocksize=134217728; modification_time=1572112932485; access_time=1580753965620; owner=hdfs; group=hdfs; permission=rw-r--r--; isSymlink=false}, FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/part-00000; isDirectory=false; length=67108879; replication=2; blocksize=134217728; modification_time=15721129332...
files.map(file => file.getLen).sum/1024/1024/1024
120