4. Processing Column Data

As part of this module we will explore the functions available under org.apache.spark.sql.functions to derive new values from existing column values with in a Data Frame.

4.1. Introduction to Pre-defined Functions

We typically process data in the columns using functions in org.apache.spark.sql.functions. Let us understand details about these functions in detail as part of this module.

  • Let us recap about Functions or APIs to process Data Frames.

  • Projection - select or withColumn

  • Filtering - filter or where

  • Grouping data by key and perform aggregations - groupBy

  • Sorting data - sort or orderBy

  • We can pass column names or literals or expressions to all the Data Frame APIs.

  • Expressions include arithmetic operations, transformations using functions from org.apache.spark.sql.functions.

  • There are approximately 300 functions under org.apache.spark.sql.functions.

  • We will talk about some of the important functions used for String Manipulation, Date Manipulation etc.

4.2. Starting Spark Context

Let us start spark context for this Notebook so that we can execute the code provided.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    appName("Processing Column Data").
    master("yarn").
    getOrCreate
spark
import spark.implicits._

4.3. Create Dummy Data Frame

Let us go ahead and create dummy data from to explore functions.

val l = List("X")
// Oracle dual (view)
// dual - dummy CHAR(1)
// "X" - One record

Once Data Frame is created, we can use to understand how to use functions. For example, to get current date, we can run df.select(current_date()).show().

It is similar to Oracle Query SELECT sysdate FROM dual

val l = List("X")
val df = l.toDF("dummy")

Here is another example of creating Data Frame using collection of employees. We will be using this Data Frame to explore all the important functions to process column data in detail.

val employees = List((1, "Scott", "Tiger", 1000.0, 
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, 
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, 
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                    )
employees.size

4.4. Categories of Functions

There are approximately 300 functions under org.apache.spark.sql.functions. At a higher level they can be grouped into a few categories.

  • String Manipulation Functions

  • Case Conversion - lower, upper

  • Getting Length - length

  • Extracting substrings - substring, split

  • Trimming - trim, ltrim, rtrim

  • Padding - lpad, rpad

  • Concatenating string - concat

  • Date Manipulation Functions

  • Getting current date and time - current_date, current_timestamp

  • Date Arithmetic - date_add, date_sub, datediff, months_between, add_months, next_day

  • Beginning and Ending Date or Time - last_day, trunc, date_trunc

  • Formatting Date - date_format

  • Extracting Information - dayofyear, dayofmonth, dayofweek, year, month

  • Aggregate Functions

  • count, countDistinct

  • sum, avg

  • min, max

  • Other Functions - We will explore depending on the use cases.

4.5. Special Functions - col and lit

Let us understand special functions such as col and lit.

  • First let us create Data Frame for demo purposes.

val employees = List((1, "Scott", "Tiger", 1000.0, 
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, 
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, 
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                    )
val employeesDF = employees.
    toDF("employee_id", "first_name",
         "last_name", "salary",
         "nationality", "phone_number",
         "ssn"
        )
  • For Data Frame APIs such as select, groupBy, orderBy etc we can pass column names as strings.

// to use operators such as $ in place of functions like col
// Alternative using col function
// $ is shorthand operator for col from implicits
// Alternative by passing column names as strings.
// We have to pass all the column names as strings or column type (using col or $)
// This will not work
  • If there are no transformations on any column in any function then we should be able to pass all column names as strings.

  • If not we need to pass all columns as type column by using col function or its shorthand operator $.

// Passing columns as part of groupBy
// Passing columns as part of orderBy or sort
  • However, if we want to apply any transformation using functions then passing column names as strings to some of the functions will not suffice. We have to pass them as column type.

import org.apache.spark.sql.functions.upper
//This code fails as upper is not valid function on string
employeesDF.
    select(upper("first_name")).
    show
  • col is the function which will convert column name from string type to Column type. We can also refer column names as Column type using Data Frame name.

// Using col and upper
// Alternate using $ and upper
// Using as part of groupBy
// Using as part of orderBy
// Alternative - we can also refer column names using Data Frame like this
  • Sometimes, we want to add a literal to the column values. For example, we might want to concatenate first_name and last_name with separated by comma and space in between.

// Below approaches fail.
import org.apache.spark.sql.functions.concat
employeesDF.
    select(concat($"first_name", ", ", $"last_name")).
    show()
// Same as above
employeesDF.
    select(concat(col("first_name"), ", ", col("last_name"))).
    show
// Referring columns using Data Frame
employeesDF.
    select(concat(employeesDF("first_name"), ", ", employeesDF("last_name"))).
    show
  • If we pass the literals directly in the form of string or numeric type, then it will fail. We have to convert literals to column type by using lit function.

// Using lit to use literals to derive new expressions

4.6. String Manipulation - Case Conversion and Length

Let us check the functions which can convert the case of the column values which are of type string and also get the length.

  • Convert all the alphabetic characters in a string to uppercase - upper

  • Convert all the alphabetic characters in a string to lowercase - lower

  • Convert first character in a string to uppercase - initcap

  • Get number of characters in a string - length

  • All the 4 functions take column type argument.

4.6.1. Tasks

Let us perform tasks to understand the behavior of case conversion functions and length.

  • Use employees data and create a Data Frame.

  • Apply all 4 functions on nationality and see the results.

val employees = List((1, "Scott", "Tiger", 1000.0, 
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, 
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, 
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                    )
val employeesDF = employees.
    toDF("employee_id", "first_name",
         "last_name", "salary",
         "nationality", "phone_number",
         "ssn"
        )

4.7. String Manipulation - substring

Let us understand how we can extract substrings using function substring.

  • If we are processing fixed length columns then we use substring to extract the information.

  • Here are some of the examples for fixed length columns and the use cases for which we typically extract information..

  • 9 Digit Social Security Number. We typically extract last 4 digits and provide it to the tele verification applications..

  • 16 Digit Credit Card Number. We typically use first 4 digit number to identify Credit Card Provider and last 4 digits for the purpose of tele verification.

  • Data coming from MainFrames systems are quite often fixed length. We might have to extract the information and store in multiple columns.

  • substring function takes 3 arguments, column, position, length. We can also provide position from the end by passing negative value.

val s = "Hello World"
s.substring(0, 5)
s.substring(1, 4)
val l = List("X")
val df = l.toDF("dummy")

4.7.1. Tasks

Let us perform few tasks to extract information from fixed length strings.

  • Create a list for employees with name, ssn and phone_number.

  • SSN Format 3 2 4 - Fixed Length with 9 digits

  • Phone Number Format - Country Code is variable and remaining phone number have 10 digits:

  • Country Code - one to 3 digits

  • Area Code - 3 digits

  • Phone Number Prefix - 3 digits

  • Phone Number Remaining - 4 digits

  • All the 4 parts are separated by spaces

  • Create a Dataframe with column names name, ssn and phone_number

  • Extract last 4 digits from the phone number.

  • Extract last 4 digits from SSN.

val employees = List((1, "Scott", "Tiger", 1000.0, 
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, 
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, 
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                    )
val employeesDF = employees.
    toDF("employee_id", "first_name",
         "last_name", "salary",
         "nationality", "phone_number",
         "ssn"
        )

4.8. String Manipulation - split

Let us understand how we can extract substrings using split.

  • If we are processing variable length columns with delimiter then we use split to extract the information.

  • Here are some of the examples for variable length columns and the use cases for which we typically extract information.

  • Address where we store House Number, Street Name, City, State and Zip Code comma separated. We might want to extract City and State for demographics reports.

  • split takes 2 arguments, column and delimiter.

  • split convert each string into array and we can access the elements using index.

val l = List("X")
val df = l.toDF("dummy")
  • Most of the problems can be solved either by using substring or split.

4.8.1. Tasks

Let us perform few tasks to extract information from fixed length strings as well as delimited variable length strings.

  • Create a list for employees with name, ssn and phone_number.

  • SSN Format 3 2 4 - Fixed Length with 9 digits

  • Phone Number Format - Country Code is variable and remaining phone number have 10 digits:

  • Country Code - one to 3 digits

  • Area Code - 3 digits

  • Phone Number Prefix - 3 digits

  • Phone Number Remaining - 4 digits

  • All the 4 parts are separated by spaces

  • Create a Dataframe with column names name, ssn and phone_number

  • Extract area code and last 4 digits from the phone number.

  • Extract last 4 digits from SSN.

val employees = List((1, "Scott", "Tiger", 1000.0, 
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, 
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, 
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                    )
val employeesDF = employees.
    toDF("employee_id", "first_name",
         "last_name", "salary",
         "nationality", "phone_number",
         "ssn"
        )

4.9. String Manipulation - Concatenating of Strings

Let us understand how to concatenate strings using concat function.

  • We can pass a variable number of strings to concat function.

  • It will return one string concatenating all the strings.

  • If we have to concatenate literal in between then we have to use lit function.

4.9.1. Tasks

Let us perform few tasks to understand more about concat function.

  • Let’s create a Data Frame and explore concat function.

val employees = List((1, "Scott", "Tiger", 1000.0, 
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, 
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, 
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                    )
val employeesDF = employees.
    toDF("employee_id", "first_name",
         "last_name", "salary",
         "nationality", "phone_number",
         "ssn"
        )
employeesDF.show
  • Create a new column by name full_name concatenating first_name and last_name.

  • Improvise by adding a comma followed by a space in between first_name and last_name.

4.10. String Manipulation - Padding

Let us understand how to pad characters at the beginning or at the end of strings.

  • We typically pad characters to build fixed length values or records.

  • Fixed length values or records are extensively used in Mainframes based systems.

  • Length of each and every field in fixed length records is predetermined and if the value of the field is less than the predetermined length then we pad with a standard character.

  • In terms of numeric fields we pad with zero on the leading or left side. For non numeric fields, we pad with some standard character on leading or trailing side.

  • We use lpad to pad a string with a specific character on leading or left side and rpad to pad on trailing or right side.

  • Both lpad and rpad, take 3 arguments - column or expression, desired length and the character need to be padded.

4.10.1. Tasks

Let us perform simple tasks to understand the syntax of lpad or rpad.

  • Create a Dataframe with single value and single column.

  • Apply lpad to pad with - to Hello to make it 10 characters.

val l = List("X")
val df = l.toDF("dummy")

4.10.2. Tasks

Let us perform the task to understand how to use pad functions to convert our data into fixed length records.

  • Let’s take the employees Dataframe

val employees = List((1, "Scott", "Tiger", 1000.0, 
                      "united states", "+1 123 456 7890", "123 45 6789"
                     ),
                     (2, "Henry", "Ford", 1250.0, 
                      "India", "+91 234 567 8901", "456 78 9123"
                     ),
                     (3, "Nick", "Junior", 750.0, 
                      "united KINGDOM", "+44 111 111 1111", "222 33 4444"
                     ),
                     (4, "Bill", "Gomes", 1500.0, 
                      "AUSTRALIA", "+61 987 654 3210", "789 12 6118"
                     )
                    )
val employeesDF = employees.
    toDF("employee_id", "first_name",
         "last_name", "salary",
         "nationality", "phone_number",
         "ssn"
        )
  • Use pad functions to convert each of the field into fixed length and concatenate. Here are the details for each of the fields.

  • Length of the employee_id should be 5 characters and should be padded with zero.

  • Length of first_name and last_name should be 10 characters and should be padded with - on the right side.

  • Length of salary should be 10 characters and should be padded with zero.

  • Length of the nationality should be 15 characters and should be padded with - on the right side.

  • Length of the phone_number should be 17 characters and should be padded with - on the right side.

  • Length of the ssn can be left as is. It is 11 characters.

  • Create a new Dataframe empFixedDF with column name employee. Preview the data by disabling truncate.

4.11. String Manipulation - Trimming

Let us understand how to trim unwanted leading and trailing characters around a string.

  • We typically use trimming to remove unnecessary characters from fixed length records.

  • Fixed length records are extensively used in Mainframes and we might have to process it using Spark.

  • As part of processing we might want to remove leading or trailing characters such as 0 in case of numeric types and space or some standard character in case of alphanumeric types.

  • As of now Spark trim functions take the column as argument and remove leading or trailing spaces.

  • Trim spaces towards left - ltrim

  • Trim spaces towards right - rtrim

  • Trim spaces on both sides - trim

4.11.1. Tasks

Let us understand how to use trim functions to remove spaces on left or right or both.

  • Create a Dataframe with one column and one record.

  • Apply trim functions to trim spaces.

import org.apache.spark.sql.functions.{ltrim, rtrim, trim}
val l = List("   Hello.    ")
val df = l.toDF("dummy")

4.12. Date and Time - Overview

Let us get an overview about Date and Time using available functions.

  • We can use current_date to get today’s server date.

  • Date will be returned using yyyy-MM-dd format.

  • We can use current_timestamp to get current server time.

  • Timestamp will be returned using yyyy-MM-dd HH:mm:ss:SSS format.

  • Hours will be by default in 24 hour format.

val l = List("X")
val df = l.toDF("dummy")

4.13. Date and Time - Arithmetic

Let us perform Date and Time Arithmetic using relevant functions.

  • Adding days to a date or timestamp - date_add

  • Subtracting days from a date or timestamp - date_sub

  • Getting difference between 2 dates or timestamps - datediff

  • Getting a number of months between 2 dates or timestamps - months_between

  • Adding months to a date or timestamp - add_months

  • Getting next day from a given date - next_day

  • All the functions are self explanatory. We can apply these on standard date or timestamp. All the functions return date even when applied on timestamp field.

4.13.1. Tasks

Let us perform some tasks related to date arithmetic.

  • Get help on each and every function first and understand what all arguments need to be passed.

  • Create a Dataframe by name datetimesDF with columns date and time.

val datetimes = List(("2014-02-28", "2014-02-28 10:00:00.123"),
                     ("2016-02-29", "2016-02-29 08:08:08.999"),
                     ("2017-10-31", "2017-12-31 11:59:59.123"),
                     ("2019-11-30", "2019-08-31 00:00:00.000")
                    )
  • Add 10 days to both date and time values.

  • Subtract 10 days from both date and time values.

  • Get the difference between current_date and date values as well as current_timestamp and time values.

  • Get the number of months between current_date and date values as well as current_timestamp and time values.

  • Add 3 months to both date values as well as time values.

4.14. Date and Time - trunc and date_trunc

In Data Warehousing we quite often run to date reports such as week to date, month to date, year to date etc.

  • We can use trunc or date_trunc for the same to get the beginning date of the week, month, current year etc by passing date or timestamp to it.

  • We can use trunc to get beginning date of the month or year by passing date or timestamp to it - for example trunc(current_date(), "MM") will give the first of the current month.

  • We can use date_trunc to get beginning date of the month or year as well as beginning time of the day or hour by passing timestamp to it.

  • Get beginning date based on month - date_trunc("MM", current_timestamp())

  • Get beginning time based on day - date_trunc("DAY", current_timestamp())

4.14.1. Tasks

Let us perform few tasks to understand trunc and date_trunc in detail.

  • Create a Dataframe by name datetimesDF with columns date and time.

val datetimes = List(("2014-02-28", "2014-02-28 10:00:00.123"),
                     ("2016-02-29", "2016-02-29 08:08:08.999"),
                     ("2017-10-31", "2017-12-31 11:59:59.123"),
                     ("2019-11-30", "2019-08-31 00:00:00.000")
                    )
val datetimesDF = datetimes.toDF("date", "time")
datetimesDF.show(truncate=false)
  • Get beginning month date using date field and beginning year date using time field.

  • Get beginning hour time using date and time field.

4.15. Date and Time - Extracting Information

Let us understand how to extract information from dates or times using functions.

  • We can use date_format to extract the required information in a desired format from date or timestamp.

  • There are also specific functions to extract year, month, day with in a week, a day with in a month, day with in a year etc.

4.15.1. Tasks

Let us perform few tasks to extract the information we need from date or timestamp.

  • Create a Dataframe by name datetimesDF with columns date and time.

val datetimes = List(("2014-02-28", "2014-02-28 10:00:00.123"),
                     ("2016-02-29", "2016-02-29 08:08:08.999"),
                     ("2017-10-31", "2017-12-31 11:59:59.123"),
                     ("2019-11-30", "2019-08-31 00:00:00.000")
                    )
val datetimesDF = datetimes.toDF("date", "time")
datetimesDF.show(false)
  • Get year from fields date and time.

  • Get one or two digit month from fields date and time.

  • Get year and month in yyyyMM format from date and time.

  • Get day with in a week, a day with in a month and day within a year from date and time.

  • Get the information from time in yyyyMMddHHmmss format.

4.16. Dealing with Unix Timestamp

Let us understand how to deal with Unix Timestamp in Spark.

  • It is an integer and started from January 1st 1970 Midnight UTC.

  • Beginning time is also known as epoch and is incremented by 1 every second.

  • We can convert Unix Timestamp to regular date or timestamp and vice versa.

  • We can use unix_timestamp to convert regular date or timestamp to a unix timestamp value. For example unix_timestamp(lit("2019-11-19 00:00:00"))

  • We can use from_unixtime to convert unix timestamp to regular date or timestamp. For example from_unixtime(lit(1574101800))

  • We can also pass format to both the functions.

4.16.1. Tasks

Let us perform few tasks to understand how to deal with Unix Timestamp.

  • Create a Dataframe by name datetimesDF with columns dateid, date and time.

val datetimes = List((20140228, "2014-02-28", "2014-02-28 10:00:00.123"),
                     (20160229, "2016-02-29", "2016-02-29 08:08:08.999"),
                     (20171031, "2017-10-31", "2017-12-31 11:59:59.123"),
                     (20191130, "2019-11-30", "2019-08-31 00:00:00.000")
                    )
  • Get unix timestamp for dateid, date and time.

  • Create a Dataframe by name unixtimesDF with one column unixtime using 4 values. You can use the unix timestamp generated for time column in previous task.

val unixtimes = List(1393561800,
                     1456713488,
                     1514701799,
                     1567189800
                    )
  • Get date in yyyyMMdd format and also complete timestamp.

4.17. Conclusion

As part of this module we have gone through list of functions that can be applied on top of columns for row level transformations.

  • There are approximately 300 pre-defined functions.

  • Functions can be broadly categorized into String Manipulation Functions, Date Manipulation Functions, Numeric Functions etc.

  • Typically when we read data from source, we get data in the form of strings and we need to apply functions to apply standardization rules, data type conversion, transformation rules etc.

  • Most of these functions can be used while projection using select, selectExpr, withColumn etc as well as part of filter or where, groupBy, orderBy or sort etc.

  • For selectExpr we need to use the functions using SQL Style syntax.

  • There are special functions such as col and lit. col is used to pass column names as column type for some of the functions while lit is used to pass literals as values as part of expressions (eg: concat($"first_name", lit(", "), $"last_name")).