1. Getting Started

1.1. Platforms to Practice

Let us understand different platforms we can leverage to practice Apache Spark using Python.

  • Local Setup

  • Databricks Platform

  • Setting up your own cluster

  • Cloud based labs

1.2. Setup Spark Locally - Ubuntu

Let us setup Spark Locally on Ubuntu.

  • Install latest version of Anaconda

  • Make sure Jupyter Notebook is setup and validated.

  • Setup Spark and Validate.

  • Setup Environment Variables to integrate Pyspark with Jupyter Notebook.

  • Launch Jupyter Notebook using pyspark command.

  • Setup PyCharm (IDE) for application development.

1.3. Setup Spark Locally - Mac

1.3.1. Let us setup Spark Locally on Ubuntu.

  • Install latest version of Anaconda

  • Make sure Jupyter Notebook is setup and validated.

  • Setup Spark and Validate.

  • Setup Environment Variables to integrate Pyspark with Jupyter Notebook.

  • Launch Jupyter Notebook using pyspark command.

  • Setup PyCharm (IDE) for application development.

1.4. Signing up for ITVersity Labs

1.5. Using ITVersity Labs

Let us understand how to submit the Spark Jobs in ITVersity Labs.

  • As we are using Python we can also use the help command to get the documentation - for example help(spark.read.csv)

1.6. Interacting with File Systems

Let us understand how to interact with file system using %fs command from Databricks Notebook.

  • We can access datasets using %fs magic command in Databricks notebook

  • By default, we will see files under dbfs

  • We can list the files using ls command - e. g.: (%fs ls)

  • Databricks provides lot of datasets for free under databricks-datasets

  • If the cluster is integrated with AWS or Azure Blob we can access files by specifying the appropriate protocol (e.g.: s3:// for s3)

  • List of commands available under %fs

  • Copying files or directories -cp

  • Moving files or directories - mv

  • Creating directories - mkdirs

  • Deleting files and directories - rm

  • We can copy or delete directories recursively using -r or --recursive

1.7. Getting File Metadata

Let us review the source location to get number of files and the size of the data we are going to process.

  • Location of airlines data dbfs:/databricks-datasets/airlines

  • We can get first 1000 files using %fs ls dbfs:/databricks-datasets/airlines

  • Location contain 1919 Files, however we will not be able to see all the details using %fs command.

  • Databricks File System commands does not have capability to understand metadata of files such as size in details.

  • When Spark Cluster is started, it will create 2 objects - spark and sc

  • sc is of type SparkContext and spark is of type SparkSession

  • Spark uses HDFS APIs to interact with the file system and we can access HDFS APIs using sc._jsc and sc._jvm to get file metadata.

Here are the steps to get the file metadata.

  • Get Hadoop Configuration using sc._jsc.hadoopConfiguration() - let”s say conf

  • We can pass conf to sc._jvm.org.apache.hadoop.fs.FileSystem. get to get FileSystem object - let”s say fs

  • We can build path object by passing the path as string to sc._jvm.org.apache.hadoop.fs.Path

  • We can invoke listStatus on top of fs by passing path which will return an array of FileStatus objects - let”s say files.

  • Each FileStatus object have all the metadata of each file.

  • We can use len on files to get number of files.

  • We can use getLen on each FileStatus object to get the size of each file.

  • Cumulative size of all files can be achieved using sum(map(lambda file: file.getLen(), files))

  • Let us first get list of files

%fs ls dbfs:/databricks-datasets/airlines

Here is the consolidated script to get number of files and cumulative size of all files in a given folder.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    appName("Getting Started").
    master("yarn").
    getOrCreate
spark = org.apache.spark.sql.SparkSession@6a105b5
org.apache.spark.sql.SparkSession@6a105b5
val conf = spark.sparkContext.hadoopConfiguration
conf = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml
Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml
import org.apache.hadoop.fs.FileSystem
val fs = FileSystem.get(conf)
fs = DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1742269756_44, ugi=training (auth:SIMPLE)]]
DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1742269756_44, ugi=training (auth:SIMPLE)]]
import org.apache.hadoop.fs.Path
val path = new Path("/public/airlines_all/airlines")
path = /public/airlines_all/airlines
/public/airlines_all/airlines
val files = fs.listStatus(path)
files = Array(FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/README.md; isDirectory=false; length=1089; replication=2; blocksize=134217728; modification_time=1572112932387; access_time=1584372852270; owner=hdfs; group=hdfs; permission=rw-r--r--; isSymlink=false}, FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/_SUCCESS; isDirectory=false; length=0; replication=2; blocksize=134217728; modification_time=1572112932485; access_time=1580753965620; owner=hdfs; group=hdfs; permission=rw-r--r--; isSymlink=false}, FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/part-00000; isDirectory=false; length=67108879; replication=2; blocksize=134217728; modification_time=15721129332...
Array(FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/README.md; isDirectory=false; length=1089; replication=2; blocksize=134217728; modification_time=1572112932387; access_time=1584372852270; owner=hdfs; group=hdfs; permission=rw-r--r--; isSymlink=false}, FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/_SUCCESS; isDirectory=false; length=0; replication=2; blocksize=134217728; modification_time=1572112932485; access_time=1580753965620; owner=hdfs; group=hdfs; permission=rw-r--r--; isSymlink=false}, FileStatus{path=hdfs://nn01.itversity.com:8020/public/airlines_all/airlines/part-00000; isDirectory=false; length=67108879; replication=2; blocksize=134217728; modification_time=15721129332...
files.map(file => file.getLen).sum/1024/1024/1024
120