2. Basic Transformations

As part of this section we will see basic transformations we can perform on top of Data Frames such as filtering, aggregations, joins etc using SQL. We will build end to end solution by taking a simple problem statement.

  • Spark SQL – Overview

  • Define Problem Statement

  • Preparing Tables

  • Projecting Data

  • Filtering Data

  • Joining Tables - Inner

  • Joining Tables - Outer

  • Perform Aggregations

  • Sorting Data

  • Conclusion - Final Solution

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

2.1. Spark SQL – Overview

Let us get an overview of Spark SQL.

Here are the standard operations which we typically perform as part of processing the data. In Spark we can perform these using Data Frame APIs or Spark SQL.

  • Selection or Projection – select clause

    • It is also called as row level transformations.

    • Apply standardization rules (convert names and addresses to upper case).

    • Mask partial data (SSN and Date of births).

  • Filtering data – where clause

    • Get orders based on date or product or category.

  • Joins – join (supports outer join as well)

    • Join multiple data sets.

  • Aggregations – group by and aggregations with support of functions such as sum, avg, min, max etc

    • Get revenue for a given order

    • Get revenue for each order

    • Get daily revenue

  • Sorting – order by

    • Sort the final output by date.

    • Sort the final output by date, then by revenue in descending order.

    • Sort the final output by state or province, then by revenue in descending order.

  • Analytics Functions – aggregations, ranking and windowing functions

    • Get top 5 stores by revenue for each state.

    • Get top 5 products by revenue in each category.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", "/user/itversity/warehouse").
    enableHiveSupport.
    appName("Spark SQL - Overview").
    master("yarn").
    getOrCreate

2.2. Define Problem Statement

Let us define problemt statement to get an overview of basic transformations using Spark SQL.

  • Get Daily Product Revenue using orders and order_items data set.

  • We have following fields in orders.

    • order_id

    • order_date

    • order_customer_id

    • order_status

  • We have following fields in order_items.

    • order_item_id

    • order_item_order_id

    • order_item_product_id

    • order_item_quantity

    • order_item_subtotal

    • order_item_product_price

  • We have one to many relationship between orders and order_items.

  • orders.order_id is primary key and order_items.order_item_order_id is foreign key to orders.order_id.

  • By the end of this module we will explore all standard transformation and get daily product revenue using following fields.

    • orders.order_date

    • order_items.order_item_product_id

    • order_items.order_item_subtotal (aggregated using date and product_id).

  • We will consider only COMPLETE or CLOSED orders.

2.3. Preparing Tables

Let us prepare the tables to solve the problem.

  • Make sure database is created.

  • Create orders table.

  • Load data from local path /data/retail_db/orders into newly created orders table.

  • Preview data and get count from orders

  • Create order_items table.

  • Load data from local path /data/retail_db/order_items into newly created orders table.

  • Preview data and get count from order_items

As tables and data are ready let us get into how to write queries against tables to perform basic transformation.

%%sql

DROP DATABASE itversity_retail CASCADE
%%sql

CREATE DATABASE IF NOT EXISTS itversity_retail
%%sql
USE itversity_retail
%%sql
SHOW tables
%%sql

DROP TABLE orders
%%sql

CREATE TABLE orders (
    order_id INT,
    order_date STRING,
    order_customer_id INT,
    order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
%%sql

SELECT * FROM orders LIMIT 10
%%sql

SELECT count(1) FROM orders
%%sql

DROP TABLE order_items
%%sql 

CREATE TABLE order_items (
    order_item_id INT,
    order_item_order_id INT,
    order_item_product_id INT,
    order_item_quantity INT,
    order_item_subtotal FLOAT,
    order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
%%sql

SELECT * FROM order_items LIMIT 10
%%sql

SELECT count(1) FROM order_items
  • Using Spark SQL with Python or Scala

spark.sql("DROP DATABASE itversity_retail CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_retail")
spark.sql("USE itversity_retail")
spark.sql("SHOW tables")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
    order_id INT,
    order_date STRING,
    order_customer_id INT,
    order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
spark.sql("LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
spark.sql("SELECT * FROM orders LIMIT 10").show()
spark.sql("SELECT count(1) FROM orders").show()
spark.sql("DROP TABLE order_items")
spark.sql("""
CREATE TABLE order_items (
    order_item_id INT,
    order_item_order_id INT,
    order_item_product_id INT,
    order_item_quantity INT,
    order_item_subtotal FLOAT,
    order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
spark.sql("LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
spark.sql("SELECT * FROM order_items LIMIT 10").show()
spark.sql("SELECT count(1) FROM order_items").show()

2.4. Projecting Data

Let us understand different aspects of projecting data. We primarily using SELECT to project the data.

  • We can project all columns using * or some columns using column names.

  • We can provide aliases to a column or expression using AS in SELECT clause.

  • DISTINCT can be used to get the distinct records from selected columns. We can also use DISTINCT * to get unique records using all the columns.

  • As of now Spark SQL does not support projecting all but one or few columns. It is supported in Hive. Following will work in hive and it will project all the columns from orders except for order_id.

SET hive.support.quoted.identifiers=none;
SELECT `(order_id)?+.+` FROM orders;
%%sql

SELECT * FROM orders LIMIT 10
%%sql

DESCRIBE orders
%%sql

SELECT order_customer_id, order_date, order_status FROM orders LIMIT 10
%%sql

SELECT order_customer_id, date_format(order_date, 'yyyy-MM'), order_status FROM orders LIMIT 10
%%sql

SELECT order_customer_id, 
    date_format(order_date, 'yyyy-MM') AS order_month, 
    order_status 
FROM orders LIMIT 10
%%sql

SELECT DISTINCT order_status FROM orders
%%sql

SELECT DISTINCT * FROM orders LIMIT 10
  • Using Spark SQL with Python or Scala

spark.sql("SELECT * FROM orders").show()
spark.sql("DESCRIBE orders").show()
spark.sql("SELECT order_customer_id, order_date, order_status FROM orders").show()
spark.sql("""
SELECT order_customer_id, 
    date_format(order_date, 'yyyy-MM'), 
    order_status 
FROM orders""").show()
spark.sql("""
SELECT order_customer_id, 
    date_format(order_date, 'yyyy-MM') AS order_month, 
    order_status 
FROM orders
""").show()
spark.sql("SELECT DISTINCT order_status FROM orders").show()
spark.sql("SELECT DISTINCT * FROM orders").show()

2.5. Filtering Data

Let us understand how we can filter the data in Spark SQL.

  • We use WHERE clause to filter the data.

  • All comparison operators such as =, !=, >, <, etc can be used to compare a column or expression or literal with another column or expression or literal.

  • We can use operators such as LIKE with % and regexp_like for pattern matching.

  • Boolan OR and AND can be performed when we want to apply multiple conditions.

    • Get all orders with order_status equals to COMPLETE or CLOSED. We can also use IN operator.

    • Get all orders from month 2014 January with order_status equals to COMPLETE or CLOSED

  • We need to use IS NULL and IS NOT NULL to compare against null values.

%%sql

USE itversity_retail
%%sql

SHOW tables
%%sql

SELECT * FROM orders WHERE order_status = 'COMPLETE' LIMIT 10
%%sql

SELECT count(1) FROM orders WHERE order_status = 'COMPLETE'
%%sql

SELECT * FROM orders WHERE order_status IN ('COMPLETE', 'CLOSED') LIMIT 10
%%sql

SELECT * FROM orders WHERE order_status = 'COMPLETE' OR order_status = 'CLOSED' LIMIT 10
%%sql

SELECT count(1) FROM orders WHERE order_status IN ('COMPLETE', 'CLOSED')
%%sql

SELECT count(1) FROM orders WHERE order_status = 'COMPLETE' OR order_status = 'CLOSED'
%%sql

SELECT * FROM orders 
WHERE order_status IN ('COMPLETE', 'CLOSED')
    AND order_date LIKE '2014-01%'
LIMIT 10
%%sql

SELECT count(1) FROM orders 
WHERE order_status IN ('COMPLETE', 'CLOSED')
    AND order_date LIKE '2014-01%'
%%sql

SELECT * FROM orders 
WHERE order_status IN ('COMPLETE', 'CLOSED')
    AND date_format(order_date, 'yyyy-MM') = '2014-01'
LIMIT 10
%%sql

SELECT count(1) FROM orders 
WHERE order_status IN ('COMPLETE', 'CLOSED')
    AND date_format(order_date, 'yyyy-MM') = '2014-01'
  • Using Spark SQL with Python or Scala

spark.sql("USE itversity_retail")
spark.sql("SHOW tables").show()
spark.sql("SELECT * FROM orders WHERE order_status = 'COMPLETE'").show()
spark.sql("SELECT count(1) FROM orders WHERE order_status = 'COMPLETE'").show()
spark.sql("SELECT * FROM orders WHERE order_status IN ('COMPLETE', 'CLOSED')").show()
spark.sql("""
SELECT * FROM orders 
WHERE order_status = 'COMPLETE' OR order_status = 'CLOSED'
""").show()
spark.sql("""
SELECT count(1) FROM orders 
WHERE order_status IN ('COMPLETE', 'CLOSED')
""").show()
spark.sql("""
SELECT count(1) FROM orders
WHERE order_status = 'COMPLETE' OR order_status = 'CLOSED'
""").show()
spark.sql("""
SELECT * FROM orders 
WHERE order_status IN ('COMPLETE', 'CLOSED')
    AND order_date LIKE '2014-01%'
""").show()
spark.sql("""
SELECT count(1) FROM orders 
WHERE order_status IN ('COMPLETE', 'CLOSED')
    AND order_date LIKE '2014-01%'
""").show()
spark.sql("""
SELECT * FROM orders 
WHERE order_status IN ('COMPLETE', 'CLOSED')
    AND date_format(order_date, 'yyyy-MM') = '2014-01'
""").show()
spark.sql("""
SELECT count(1) FROM orders 
WHERE order_status IN ('COMPLETE', 'CLOSED')
    AND date_format(order_date, 'yyyy-MM') = '2014-01'
""").show()
  • Let us prepare the table to demonstrate how to deal with null values while filtering the data.

%%sql

DROP DATABASE IF EXISTS itversity_sms CASCADE
%%sql

CREATE DATABASE IF NOT EXISTS itversity_sms
%%sql

DROP TABLE IF EXISTS students
%%sql

CREATE TABLE students (
    student_id INT,
    student_first_name STRING,
    student_last_name STRING,
    student_phone_number STRING,
    student_address STRING
) STORED AS avro
%%sql

INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)
%%sql

INSERT INTO students VALUES (2, 'Donald', 'Duck', '1234567890', NULL)
%%sql

INSERT INTO students VALUES 
    (3, 'Mickey', 'Mouse', '2345678901', 'A Street, One City, Some State, 12345'),
    (4, 'Bubble', 'Guppy', '6789012345', 'Bubbly Street, Guppy, La la land, 45678')
%%sql

SELECT * FROM students
  • Using Spark SQL with Python or Scala

spark.sql("DROP DATABASE IF EXISTS itversity_sms CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_sms")
spark.sql("DROP TABLE IF EXISTS students")
spark.sql("""
CREATE TABLE students (
    student_id INT,
    student_first_name STRING,
    student_last_name STRING,
    student_phone_number STRING,
    student_address STRING
) STORED AS avro
""")
spark.sql("""
INSERT INTO students 
VALUES (1, 'Scott', 'Tiger', NULL, NULL)
""")
spark.sql("""
INSERT INTO students 
VALUES (2, 'Donald', 'Duck', '1234567890', NULL)
""")
spark.sql("""
INSERT INTO students VALUES 
    (3, 'Mickey', 'Mouse', '2345678901', 'A Street, One City, Some State, 12345'),
    (4, 'Bubble', 'Guppy', '6789012345', 'Bubbly Street, Guppy, La la land, 45678')
""")
spark.sql("SELECT * FROM students").show()
  • Comparison against null can be done with IS NULL and IS NOT NULL. Below query will not work even though we have one record with phone_numbers as null.

spark.sql("""
SELECT * FROM students 
WHERE student_phone_number = NULL
""").show()
spark.sql("""
SELECT * FROM students 
WHERE student_phone_number != NULL
""").show()
spark.sql("""
SELECT * FROM students
WHERE student_phone_number IS NULL
""").show()
spark.sql("""
SELECT * FROM students
WHERE student_phone_number IS NOT NULL
""").show()

2.6. Joining Tables - Inner

Let us understand how to join data from multiple tables.

  • Spark SQL supports ASCII style join (JOIN with ON).

  • There are different types of joins.

    • INNER JOIN - Get all the records from both the datasets which satisfies JOIN condition.

    • OUTER JOIN - We will get into the details as part of the next topic

  • Example for INNER JOIN

SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_subtotal
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
LIMIT 10
  • We can join more than 2 tables in one query. Here is how it will look like.

SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_subtotal
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
    JOIN products p
    ON p.product_id = oi.order_item_product_id
LIMIT 10
  • If we have to apply additional filters, it is recommended to use WHERE clause. ON clause should only have join conditions.

  • We can have non equal join conditions as well, but they are not used that often.

  • Here are some of the examples for INNER JOIN:

    • Get order id, date, status and item revenue for all order items.

    • Get order id, date, status and item revenue for all order items for all orders where order status is either COMPLETE or CLOSED.

    • Get order id, date, status and item revenue for all order items for all orders where order status is either COMPLETE or CLOSED for the orders that are placed in the month of 2014 January.

%%sql

SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_subtotal
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
LIMIT 10
%%sql

SELECT count(1)
FROM orders
%%sql

SELECT count(1)
FROM order_items
%%sql

SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_subtotal
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
LIMIT 10
%%sql

SELECT count(1)
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
%%sql

SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_subtotal
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
LIMIT 10
%%sql

SELECT count(1)
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
LIMIT 10
%%sql

SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_subtotal
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
    AND date_format(order_date, 'yyyy-MM') = '2014-01'
LIMIT 10
%%sql

SELECT count(1)
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
    AND date_format(order_date, 'yyyy-MM') = '2014-01'
LIMIT 10
  • Using Spark SQL with Python or Scala

spark("""
SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_subtotal
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
""").show()
spark("""
SELECT count(1)
FROM orders
""").show()
spark("""
SELECT count(1)
FROM order_items
""").show()
spark("""
SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_subtotal
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
""").show()
spark("""
SELECT count(1)
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
""").show()
spark("""
SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_subtotal
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
""").show()
spark("""
SELECT count(1)
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
""").show()
spark("""
SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_subtotal
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
    AND date_format(order_date, 'yyyy-MM') = '2014-01'
""").show()
spark("""
SELECT count(1)
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
    AND date_format(order_date, 'yyyy-MM') = '2014-01'
""").show()

2.7. Joining Tables - Outer

Let us understand how to perform outer joins using Spark SQL. There are 3 different types of outer joins.

  • LEFT OUTER JOIN (default) - Get all the records from both the datasets which satisfies JOIN condition along with those records which are in the left side table but not in the right side table.

  • RIGHT OUTER JOIN - Get all the records from both the datasets which satisfies JOIN condition along with those records which are in the right side table but not in the left side table.

  • FULL OUTER JOIN - left union right

  • When we perform the outer join (lets say left outer join), we will see this.

    • Get all the values from both the tables when join condition satisfies.

    • If there are rows on left side tables for which there are no corresponding values in right side table, all the projected column values for right side table will be null.

  • Here are some of the examples for outer join.

    • Get all the orders where there are no corresponding order items.

    • Get all the order items where there are no corresponding orders.

%%sql

SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_order_id,
    oi.order_item_subtotal
FROM orders o LEFT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
LIMIT 10
%%sql

SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
%%sql

SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_order_id,
    oi.order_item_subtotal
FROM orders o LEFT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
LIMIT 10
%%sql

SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
%%sql

SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
    AND o.order_status IN ('COMPLETE', 'CLOSED')
%%sql

SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_order_id,
    oi.order_item_subtotal
FROM orders o RIGHT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
LIMIT 10
%%sql

SELECT count(1)
FROM orders o RIGHT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
%%sql

SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_order_id,
    oi.order_item_subtotal
FROM orders o RIGHT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_id IS NULL
LIMIT 10
  • Using Spark SQL with Python or Scala

spark.sql("""
SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_order_id,
    oi.order_item_subtotal
FROM orders o LEFT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
""").show()
spark.sql("""
SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
""").show()
spark.sql("""
SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_order_id,
    oi.order_item_subtotal
FROM orders o LEFT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
""").show()
spark.sql("""
SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
spark.sql("""
SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
    AND o.order_status IN ('COMPLETE', 'CLOSED')
""").show()
spark.sql("""
SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_order_id,
    oi.order_item_subtotal
FROM orders o RIGHT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
""").show()
spark.sql("""
SELECT count(1)
FROM orders o RIGHT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
""").show()
spark.sql("""
SELECT o.order_id,
    o.order_date,
    o.order_status,
    oi.order_item_order_id,
    oi.order_item_subtotal
FROM orders o RIGHT OUTER JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_id IS NULL
""").show()

2.8. Aggregating Data

Let us understand how to aggregate the data.

  • We can perform global aggregations as well as aggregations by key.

  • Global Aggregations

    • Get total number of orders.

    • Get revenue for a given order id.

    • Get number of records with order_status either COMPLETED or CLOSED.

  • Aggregations by key - using GROUP BY

    • Get number of orders by date or status.

    • Get revenue for each order_id.

    • Get daily product revenue (using order date and product id as keys).

  • We can also use HAVING clause to apply filtering on top of aggregated data.

    • Get daily product revenue where revenue is greater than $500 (using order date and product id as keys).

  • Rules while using GROUP BY.

    • We can have the columns which are specified as part of GROUP BY in SELECT clause.

    • On top of those, we can have derived columns using aggregate functions.

    • We cannot have any other columns that are not used as part of GROUP BY on derived column using non aggregate functions.

    • We will not be able to use aggregate functions or aliases used in the select clause as part of the where clause.

    • If we want to filter based on aggregated results, then we can leverage HAVING on top of GROUP BY (specifying WHERE is not an option)

  • Typical query execution - FROM -> WHERE -> GROUP BY -> SELECT

%%sql

SELECT count(order_id) FROM orders
%%sql

SELECT count(DISTINCT order_date) FROM orders
%%sql

SELECT round(sum(order_item_subtotal), 2) AS order_revenue
FROM order_items 
WHERE order_item_order_id = 2
%%sql

SELECT count(1) 
FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
%%sql

SELECT order_date,
    count(1)
FROM orders
GROUP BY order_date
%%sql

SELECT order_status,
    count(1) AS status_count
FROM orders
GROUP BY order_status
%%sql

SELECT order_item_order_id,
    round(sum(order_item_subtotal), 2) AS order_revenue
FROM order_items
GROUP BY order_item_order_id LIMIT 10
%%sql

SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
LIMIT 10
%%sql

SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
    AND revenue >= 500
GROUP BY o.order_date,
    oi.order_item_product_id
LIMIT 10
%%sql

SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
HAVING revenue >= 500
LIMIT 10
  • Using Spark SQL with Python or Scala

spark.sql("SELECT count(order_id) FROM orders").show()
spark.sql("SELECT count(DISTINCT order_date) FROM orders").show()
spark.sql("""
SELECT round(sum(order_item_subtotal), 2) AS order_revenue
FROM order_items 
WHERE order_item_order_id = 2
""").show()
spark.sql("""
SELECT count(1) 
FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
""").show()
spark.sql("""
SELECT order_date,
    count(1)
FROM orders
GROUP BY order_date
""").show()
spark.sql("""
SELECT order_status,
    count(1) AS status_count
FROM orders
GROUP BY order_status
""").show()
spark.sql("""
SELECT order_item_order_id,
    round(sum(order_item_subtotal), 2) AS order_revenue
FROM order_items
GROUP BY order_item_order_id
""").show()
spark.sql("""
SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
""").show()
spark.sql("""
SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
    AND revenue >= 500
GROUP BY o.order_date,
    oi.order_item_product_id
""").show()
spark.sql("""
SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
HAVING revenue >= 500
""").show()

2.9. Sorting Data

Let us understand how to sort the data using Spark SQL.

  • We typically perform sorting as final step.

  • Sorting can be done either by using one field or multiple fields.

  • We can sort the data either in ascending order or descending order by using column or expression.

  • By default, the sorting order is ascendig and we can change it to descending by using DESC.

%%sql

SELECT * FROM orders
ORDER BY order_customer_id
LIMIT 10
%%sql

SELECT * FROM orders
ORDER BY order_customer_id,
    order_date
LIMIT 10
%%sql

SELECT * FROM orders
ORDER BY order_customer_id,
    order_date DESC
LIMIT 10
%%sql

SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
ORDER BY o.order_date,
    revenue DESC
LIMIT 10
  • Using Spark SQL with Python or Scala

spark.sql("""
SELECT * FROM orders
ORDER BY order_customer_id
""").show()
spark.sql("""
SELECT * FROM orders
ORDER BY order_customer_id,
    order_date
""").show()
spark.sql("""
SELECT * FROM orders
ORDER BY order_customer_id,
    order_date DESC
""").show()
spark.sql("""
SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
ORDER BY o.order_date,
    revenue DESC
""").show()

2.10. Conclusion - Final Solution

Let us review the Final Solution for our problem statement daily_product_revenue.

  • Prepare tables

    • Create tables

    • Load the data into tables

  • We need to project the fields which we are interested in.

    • order_date

    • order_item_product_id

    • product_revenue

  • As we have fields from multiple tables, we need to perform join after which we have to filter for COMPLETE or CLOSED orders.

  • We have to group the data by order_date and order_item_product_id, then we have to perform aggregation on order_item_subtotal to get product_revenue.

%%sql

DROP DATABASE itversity_retail CASCADE
%%sql

CREATE DATABASE IF NOT EXISTS itversity_retail
%%sql

USE itversity_retail
%%sql

SHOW tables
%%sql

CREATE TABLE orders (
    order_id INT,
    order_date STRING,
    order_customer_id INT,
    order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders
%%sql 

CREATE TABLE order_items (
    order_item_id INT,
    order_item_order_id INT,
    order_item_product_id INT,
    order_item_quantity INT,
    order_item_subtotal FLOAT,
    order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items
%%sql

SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
%%sql

SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
ORDER BY o.order_date,
    product_revenue DESC
  • Using Spark SQL with Python or Scala

spark.sql("DROP DATABASE itversity_retail CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_retail")
spark.sql("USE itversity_retail")
spark.sql("SHOW tables").show()
spark.sql("""
CREATE TABLE orders (
    order_id INT,
    order_date STRING,
    order_customer_id INT,
    order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/orders' 
INTO TABLE orders
""")
spark.sql("""
CREATE TABLE order_items (
    order_item_id INT,
    order_item_order_id INT,
    order_item_product_id INT,
    order_item_quantity INT,
    order_item_subtotal FLOAT,
    order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/order_items' 
INTO TABLE order_items
""")
spark.sql("""
SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
""").show()
spark.sql("""
SELECT o.order_date,
    oi.order_item_product_id,
    round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
    oi.order_item_product_id
ORDER BY o.order_date,
    product_revenue DESC
""").show()