2. Basic Transformations¶
As part of this section we will see basic transformations we can perform on top of Data Frames such as filtering, aggregations, joins etc using SQL. We will build end to end solution by taking a simple problem statement.
Spark SQL – Overview
Define Problem Statement
Preparing Tables
Projecting Data
Filtering Data
Joining Tables - Inner
Joining Tables - Outer
Perform Aggregations
Sorting Data
Conclusion - Final Solution
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
2.1. Spark SQL – Overview¶
Let us get an overview of Spark SQL.
Here are the standard operations which we typically perform as part of processing the data. In Spark we can perform these using Data Frame APIs or Spark SQL.
Selection or Projection – select clause
It is also called as row level transformations.
Apply standardization rules (convert names and addresses to upper case).
Mask partial data (SSN and Date of births).
Filtering data – where clause
Get orders based on date or product or category.
Joins – join (supports outer join as well)
Join multiple data sets.
Aggregations – group by and aggregations with support of functions such as sum, avg, min, max etc
Get revenue for a given order
Get revenue for each order
Get daily revenue
Sorting – order by
Sort the final output by date.
Sort the final output by date, then by revenue in descending order.
Sort the final output by state or province, then by revenue in descending order.
Analytics Functions – aggregations, ranking and windowing functions
Get top 5 stores by revenue for each state.
Get top 5 products by revenue in each category.
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
config("spark.sql.warehouse.dir", "/user/itversity/warehouse").
enableHiveSupport.
appName("Spark SQL - Overview").
master("yarn").
getOrCreate
2.2. Define Problem Statement¶
Let us define problemt statement to get an overview of basic transformations using Spark SQL.
Get Daily Product Revenue using orders and order_items data set.
We have following fields in orders.
order_id
order_date
order_customer_id
order_status
We have following fields in order_items.
order_item_id
order_item_order_id
order_item_product_id
order_item_quantity
order_item_subtotal
order_item_product_price
We have one to many relationship between orders and order_items.
orders.order_id is primary key and order_items.order_item_order_id is foreign key to orders.order_id.
By the end of this module we will explore all standard transformation and get daily product revenue using following fields.
orders.order_date
order_items.order_item_product_id
order_items.order_item_subtotal (aggregated using date and product_id).
We will consider only COMPLETE or CLOSED orders.
2.3. Preparing Tables¶
Let us prepare the tables to solve the problem.
Make sure database is created.
Create orders table.
Load data from local path /data/retail_db/orders into newly created orders table.
Preview data and get count from orders
Create order_items table.
Load data from local path /data/retail_db/order_items into newly created orders table.
Preview data and get count from order_items
As tables and data are ready let us get into how to write queries against tables to perform basic transformation.
%%sql
DROP DATABASE itversity_retail CASCADE
%%sql
CREATE DATABASE IF NOT EXISTS itversity_retail
%%sql
USE itversity_retail
%%sql
SHOW tables
%%sql
DROP TABLE orders
%%sql
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
%%sql
SELECT * FROM orders LIMIT 10
%%sql
SELECT count(1) FROM orders
%%sql
DROP TABLE order_items
%%sql
CREATE TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
%%sql
SELECT * FROM order_items LIMIT 10
%%sql
SELECT count(1) FROM order_items
Using Spark SQL with Python or Scala
spark.sql("DROP DATABASE itversity_retail CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_retail")
spark.sql("USE itversity_retail")
spark.sql("SHOW tables")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
spark.sql("LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/orders"!
spark.sql("SELECT * FROM orders LIMIT 10").show()
spark.sql("SELECT count(1) FROM orders").show()
spark.sql("DROP TABLE order_items")
spark.sql("""
CREATE TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
spark.sql("LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items")
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/itversity/warehouse/${username}_retail.db/order_items"!
spark.sql("SELECT * FROM order_items LIMIT 10").show()
spark.sql("SELECT count(1) FROM order_items").show()
2.4. Projecting Data¶
Let us understand different aspects of projecting data. We primarily using SELECT
to project the data.
We can project all columns using
*
or some columns using column names.We can provide aliases to a column or expression using
AS
inSELECT
clause.DISTINCT
can be used to get the distinct records from selected columns. We can also useDISTINCT *
to get unique records using all the columns.As of now Spark SQL does not support projecting all but one or few columns. It is supported in Hive. Following will work in hive and it will project all the columns from orders except for order_id.
SET hive.support.quoted.identifiers=none;
SELECT `(order_id)?+.+` FROM orders;
%%sql
SELECT * FROM orders LIMIT 10
%%sql
DESCRIBE orders
%%sql
SELECT order_customer_id, order_date, order_status FROM orders LIMIT 10
%%sql
SELECT order_customer_id, date_format(order_date, 'yyyy-MM'), order_status FROM orders LIMIT 10
%%sql
SELECT order_customer_id,
date_format(order_date, 'yyyy-MM') AS order_month,
order_status
FROM orders LIMIT 10
%%sql
SELECT DISTINCT order_status FROM orders
%%sql
SELECT DISTINCT * FROM orders LIMIT 10
Using Spark SQL with Python or Scala
spark.sql("SELECT * FROM orders").show()
spark.sql("DESCRIBE orders").show()
spark.sql("SELECT order_customer_id, order_date, order_status FROM orders").show()
spark.sql("""
SELECT order_customer_id,
date_format(order_date, 'yyyy-MM'),
order_status
FROM orders""").show()
spark.sql("""
SELECT order_customer_id,
date_format(order_date, 'yyyy-MM') AS order_month,
order_status
FROM orders
""").show()
spark.sql("SELECT DISTINCT order_status FROM orders").show()
spark.sql("SELECT DISTINCT * FROM orders").show()
2.5. Filtering Data¶
Let us understand how we can filter the data in Spark SQL.
We use
WHERE
clause to filter the data.All comparison operators such as
=
,!=
,>
,<
, etc can be used to compare a column or expression or literal with another column or expression or literal.We can use operators such as LIKE with % and regexp_like for pattern matching.
Boolan OR and AND can be performed when we want to apply multiple conditions.
Get all orders with order_status equals to COMPLETE or CLOSED. We can also use IN operator.
Get all orders from month 2014 January with order_status equals to COMPLETE or CLOSED
We need to use
IS NULL
andIS NOT NULL
to compare against null values.
%%sql
USE itversity_retail
%%sql
SHOW tables
%%sql
SELECT * FROM orders WHERE order_status = 'COMPLETE' LIMIT 10
%%sql
SELECT count(1) FROM orders WHERE order_status = 'COMPLETE'
%%sql
SELECT * FROM orders WHERE order_status IN ('COMPLETE', 'CLOSED') LIMIT 10
%%sql
SELECT * FROM orders WHERE order_status = 'COMPLETE' OR order_status = 'CLOSED' LIMIT 10
%%sql
SELECT count(1) FROM orders WHERE order_status IN ('COMPLETE', 'CLOSED')
%%sql
SELECT count(1) FROM orders WHERE order_status = 'COMPLETE' OR order_status = 'CLOSED'
%%sql
SELECT * FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
AND order_date LIKE '2014-01%'
LIMIT 10
%%sql
SELECT count(1) FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
AND order_date LIKE '2014-01%'
%%sql
SELECT * FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
AND date_format(order_date, 'yyyy-MM') = '2014-01'
LIMIT 10
%%sql
SELECT count(1) FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
AND date_format(order_date, 'yyyy-MM') = '2014-01'
Using Spark SQL with Python or Scala
spark.sql("USE itversity_retail")
spark.sql("SHOW tables").show()
spark.sql("SELECT * FROM orders WHERE order_status = 'COMPLETE'").show()
spark.sql("SELECT count(1) FROM orders WHERE order_status = 'COMPLETE'").show()
spark.sql("SELECT * FROM orders WHERE order_status IN ('COMPLETE', 'CLOSED')").show()
spark.sql("""
SELECT * FROM orders
WHERE order_status = 'COMPLETE' OR order_status = 'CLOSED'
""").show()
spark.sql("""
SELECT count(1) FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
""").show()
spark.sql("""
SELECT count(1) FROM orders
WHERE order_status = 'COMPLETE' OR order_status = 'CLOSED'
""").show()
spark.sql("""
SELECT * FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
AND order_date LIKE '2014-01%'
""").show()
spark.sql("""
SELECT count(1) FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
AND order_date LIKE '2014-01%'
""").show()
spark.sql("""
SELECT * FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
AND date_format(order_date, 'yyyy-MM') = '2014-01'
""").show()
spark.sql("""
SELECT count(1) FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
AND date_format(order_date, 'yyyy-MM') = '2014-01'
""").show()
Let us prepare the table to demonstrate how to deal with null values while filtering the data.
%%sql
DROP DATABASE IF EXISTS itversity_sms CASCADE
%%sql
CREATE DATABASE IF NOT EXISTS itversity_sms
%%sql
DROP TABLE IF EXISTS students
%%sql
CREATE TABLE students (
student_id INT,
student_first_name STRING,
student_last_name STRING,
student_phone_number STRING,
student_address STRING
) STORED AS avro
%%sql
INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)
%%sql
INSERT INTO students VALUES (2, 'Donald', 'Duck', '1234567890', NULL)
%%sql
INSERT INTO students VALUES
(3, 'Mickey', 'Mouse', '2345678901', 'A Street, One City, Some State, 12345'),
(4, 'Bubble', 'Guppy', '6789012345', 'Bubbly Street, Guppy, La la land, 45678')
%%sql
SELECT * FROM students
Using Spark SQL with Python or Scala
spark.sql("DROP DATABASE IF EXISTS itversity_sms CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_sms")
spark.sql("DROP TABLE IF EXISTS students")
spark.sql("""
CREATE TABLE students (
student_id INT,
student_first_name STRING,
student_last_name STRING,
student_phone_number STRING,
student_address STRING
) STORED AS avro
""")
spark.sql("""
INSERT INTO students
VALUES (1, 'Scott', 'Tiger', NULL, NULL)
""")
spark.sql("""
INSERT INTO students
VALUES (2, 'Donald', 'Duck', '1234567890', NULL)
""")
spark.sql("""
INSERT INTO students VALUES
(3, 'Mickey', 'Mouse', '2345678901', 'A Street, One City, Some State, 12345'),
(4, 'Bubble', 'Guppy', '6789012345', 'Bubbly Street, Guppy, La la land, 45678')
""")
spark.sql("SELECT * FROM students").show()
Comparison against null can be done with
IS NULL
andIS NOT NULL
. Below query will not work even though we have one record with phone_numbers as null.
spark.sql("""
SELECT * FROM students
WHERE student_phone_number = NULL
""").show()
spark.sql("""
SELECT * FROM students
WHERE student_phone_number != NULL
""").show()
spark.sql("""
SELECT * FROM students
WHERE student_phone_number IS NULL
""").show()
spark.sql("""
SELECT * FROM students
WHERE student_phone_number IS NOT NULL
""").show()
2.6. Joining Tables - Inner¶
Let us understand how to join data from multiple tables.
Spark SQL supports ASCII style join (JOIN with ON).
There are different types of joins.
INNER JOIN - Get all the records from both the datasets which satisfies JOIN condition.
OUTER JOIN - We will get into the details as part of the next topic
Example for INNER JOIN
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_subtotal
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
LIMIT 10
We can join more than 2 tables in one query. Here is how it will look like.
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_subtotal
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
JOIN products p
ON p.product_id = oi.order_item_product_id
LIMIT 10
If we have to apply additional filters, it is recommended to use WHERE clause. ON clause should only have join conditions.
We can have non equal join conditions as well, but they are not used that often.
Here are some of the examples for INNER JOIN:
Get order id, date, status and item revenue for all order items.
Get order id, date, status and item revenue for all order items for all orders where order status is either COMPLETE or CLOSED.
Get order id, date, status and item revenue for all order items for all orders where order status is either COMPLETE or CLOSED for the orders that are placed in the month of 2014 January.
%%sql
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_subtotal
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
LIMIT 10
%%sql
SELECT count(1)
FROM orders
%%sql
SELECT count(1)
FROM order_items
%%sql
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_subtotal
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
LIMIT 10
%%sql
SELECT count(1)
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
%%sql
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_subtotal
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
LIMIT 10
%%sql
SELECT count(1)
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
LIMIT 10
%%sql
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_subtotal
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
AND date_format(order_date, 'yyyy-MM') = '2014-01'
LIMIT 10
%%sql
SELECT count(1)
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
AND date_format(order_date, 'yyyy-MM') = '2014-01'
LIMIT 10
Using Spark SQL with Python or Scala
spark("""
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_subtotal
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
""").show()
spark("""
SELECT count(1)
FROM orders
""").show()
spark("""
SELECT count(1)
FROM order_items
""").show()
spark("""
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_subtotal
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
""").show()
spark("""
SELECT count(1)
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
""").show()
spark("""
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_subtotal
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
""").show()
spark("""
SELECT count(1)
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
""").show()
spark("""
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_subtotal
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
AND date_format(order_date, 'yyyy-MM') = '2014-01'
""").show()
spark("""
SELECT count(1)
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
AND date_format(order_date, 'yyyy-MM') = '2014-01'
""").show()
2.7. Joining Tables - Outer¶
Let us understand how to perform outer joins using Spark SQL. There are 3 different types of outer joins.
LEFT OUTER JOIN (default) - Get all the records from both the datasets which satisfies JOIN condition along with those records which are in the left side table but not in the right side table.
RIGHT OUTER JOIN - Get all the records from both the datasets which satisfies JOIN condition along with those records which are in the right side table but not in the left side table.
FULL OUTER JOIN - left union right
When we perform the outer join (lets say left outer join), we will see this.
Get all the values from both the tables when join condition satisfies.
If there are rows on left side tables for which there are no corresponding values in right side table, all the projected column values for right side table will be null.
Here are some of the examples for outer join.
Get all the orders where there are no corresponding order items.
Get all the order items where there are no corresponding orders.
%%sql
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_order_id,
oi.order_item_subtotal
FROM orders o LEFT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
LIMIT 10
%%sql
SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
%%sql
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_order_id,
oi.order_item_subtotal
FROM orders o LEFT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
LIMIT 10
%%sql
SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
%%sql
SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
AND o.order_status IN ('COMPLETE', 'CLOSED')
%%sql
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_order_id,
oi.order_item_subtotal
FROM orders o RIGHT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
LIMIT 10
%%sql
SELECT count(1)
FROM orders o RIGHT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
%%sql
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_order_id,
oi.order_item_subtotal
FROM orders o RIGHT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_id IS NULL
LIMIT 10
Using Spark SQL with Python or Scala
spark.sql("""
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_order_id,
oi.order_item_subtotal
FROM orders o LEFT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
""").show()
spark.sql("""
SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
""").show()
spark.sql("""
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_order_id,
oi.order_item_subtotal
FROM orders o LEFT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
""").show()
spark.sql("""
SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
spark.sql("""
SELECT count(1)
FROM orders o LEFT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE oi.order_item_order_id IS NULL
AND o.order_status IN ('COMPLETE', 'CLOSED')
""").show()
spark.sql("""
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_order_id,
oi.order_item_subtotal
FROM orders o RIGHT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
""").show()
spark.sql("""
SELECT count(1)
FROM orders o RIGHT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
""").show()
spark.sql("""
SELECT o.order_id,
o.order_date,
o.order_status,
oi.order_item_order_id,
oi.order_item_subtotal
FROM orders o RIGHT OUTER JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_id IS NULL
""").show()
2.8. Aggregating Data¶
Let us understand how to aggregate the data.
We can perform global aggregations as well as aggregations by key.
Global Aggregations
Get total number of orders.
Get revenue for a given order id.
Get number of records with order_status either COMPLETED or CLOSED.
Aggregations by key - using
GROUP BY
Get number of orders by date or status.
Get revenue for each order_id.
Get daily product revenue (using order date and product id as keys).
We can also use
HAVING
clause to apply filtering on top of aggregated data.Get daily product revenue where revenue is greater than $500 (using order date and product id as keys).
Rules while using
GROUP BY
.We can have the columns which are specified as part of
GROUP BY
inSELECT
clause.On top of those, we can have derived columns using aggregate functions.
We cannot have any other columns that are not used as part of
GROUP BY
on derived column using non aggregate functions.We will not be able to use aggregate functions or aliases used in the select clause as part of the where clause.
If we want to filter based on aggregated results, then we can leverage
HAVING
on top ofGROUP BY
(specifyingWHERE
is not an option)
Typical query execution - FROM -> WHERE -> GROUP BY -> SELECT
%%sql
SELECT count(order_id) FROM orders
%%sql
SELECT count(DISTINCT order_date) FROM orders
%%sql
SELECT round(sum(order_item_subtotal), 2) AS order_revenue
FROM order_items
WHERE order_item_order_id = 2
%%sql
SELECT count(1)
FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
%%sql
SELECT order_date,
count(1)
FROM orders
GROUP BY order_date
%%sql
SELECT order_status,
count(1) AS status_count
FROM orders
GROUP BY order_status
%%sql
SELECT order_item_order_id,
round(sum(order_item_subtotal), 2) AS order_revenue
FROM order_items
GROUP BY order_item_order_id LIMIT 10
%%sql
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
LIMIT 10
%%sql
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
AND revenue >= 500
GROUP BY o.order_date,
oi.order_item_product_id
LIMIT 10
%%sql
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
HAVING revenue >= 500
LIMIT 10
Using Spark SQL with Python or Scala
spark.sql("SELECT count(order_id) FROM orders").show()
spark.sql("SELECT count(DISTINCT order_date) FROM orders").show()
spark.sql("""
SELECT round(sum(order_item_subtotal), 2) AS order_revenue
FROM order_items
WHERE order_item_order_id = 2
""").show()
spark.sql("""
SELECT count(1)
FROM orders
WHERE order_status IN ('COMPLETE', 'CLOSED')
""").show()
spark.sql("""
SELECT order_date,
count(1)
FROM orders
GROUP BY order_date
""").show()
spark.sql("""
SELECT order_status,
count(1) AS status_count
FROM orders
GROUP BY order_status
""").show()
spark.sql("""
SELECT order_item_order_id,
round(sum(order_item_subtotal), 2) AS order_revenue
FROM order_items
GROUP BY order_item_order_id
""").show()
spark.sql("""
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
""").show()
spark.sql("""
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
AND revenue >= 500
GROUP BY o.order_date,
oi.order_item_product_id
""").show()
spark.sql("""
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
HAVING revenue >= 500
""").show()
2.9. Sorting Data¶
Let us understand how to sort the data using Spark SQL.
We typically perform sorting as final step.
Sorting can be done either by using one field or multiple fields.
We can sort the data either in ascending order or descending order by using column or expression.
By default, the sorting order is ascendig and we can change it to descending by using
DESC
.
%%sql
SELECT * FROM orders
ORDER BY order_customer_id
LIMIT 10
%%sql
SELECT * FROM orders
ORDER BY order_customer_id,
order_date
LIMIT 10
%%sql
SELECT * FROM orders
ORDER BY order_customer_id,
order_date DESC
LIMIT 10
%%sql
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
ORDER BY o.order_date,
revenue DESC
LIMIT 10
Using Spark SQL with Python or Scala
spark.sql("""
SELECT * FROM orders
ORDER BY order_customer_id
""").show()
spark.sql("""
SELECT * FROM orders
ORDER BY order_customer_id,
order_date
""").show()
spark.sql("""
SELECT * FROM orders
ORDER BY order_customer_id,
order_date DESC
""").show()
spark.sql("""
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
ORDER BY o.order_date,
revenue DESC
""").show()
2.10. Conclusion - Final Solution¶
Let us review the Final Solution for our problem statement daily_product_revenue.
Prepare tables
Create tables
Load the data into tables
We need to project the fields which we are interested in.
order_date
order_item_product_id
product_revenue
As we have fields from multiple tables, we need to perform join after which we have to filter for COMPLETE or CLOSED orders.
We have to group the data by order_date and order_item_product_id, then we have to perform aggregation on order_item_subtotal to get product_revenue.
%%sql
DROP DATABASE itversity_retail CASCADE
%%sql
CREATE DATABASE IF NOT EXISTS itversity_retail
%%sql
USE itversity_retail
%%sql
SHOW tables
%%sql
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders
%%sql
CREATE TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items
%%sql
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
%%sql
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
ORDER BY o.order_date,
product_revenue DESC
Using Spark SQL with Python or Scala
spark.sql("DROP DATABASE itversity_retail CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_retail")
spark.sql("USE itversity_retail")
spark.sql("SHOW tables").show()
spark.sql("""
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/orders'
INTO TABLE orders
""")
spark.sql("""
CREATE TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/order_items'
INTO TABLE order_items
""")
spark.sql("""
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
""").show()
spark.sql("""
SELECT o.order_date,
oi.order_item_product_id,
round(sum(oi.order_item_subtotal), 2) AS product_revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date,
oi.order_item_product_id
ORDER BY o.order_date,
product_revenue DESC
""").show()