6. Windowing Functions

As part of this section we will primarily talk about Windowing Functions. These are also known as Analytic Functions in Databases like Oracle.

  • Prepare HR Database

  • Overview of Windowing Functions

  • Aggregations using Windowing Functions

  • Getting LEAD and LAG values

  • Getting first and last values

  • Ranking using Windowing Functions

  • Understanding order of execution of SQL

  • Overview of Nested Sub Queries

  • Filtering - Window Function Results

import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", "/user/itversity/warehouse").
    enableHiveSupport.
    appName("Spark SQL - Windowing Functions").
    master("yarn").
    getOrCreate
%%sql
SET spark.sql.shuffle.partitions=2

6.1. Prepare HR Database

Let us prepare HR database with EMPLOYEES Table. We will be using this for some of the examples as well as exercises related to Window Functions.

  • Create Database itversity_hr (replace itversity with your OS User Name)

  • Create table employees in itversity_hr database.

  • Load data into the table.

First let us start with creating the database.

%%sql

DROP DATABASE itversity_hr CASCADE
%%sql

CREATE DATABASE itversity_hr
%%sql

USE itversity_hr
%%sql

SELECT current_database()

As the database is created, let us go ahead and add table to it.

%%sql

CREATE TABLE employees (
  employee_id     int,
  first_name      varchar(20),
  last_name       varchar(25),
  email           varchar(25),
  phone_number    varchar(20),
  hire_date       date,
  job_id          varchar(10),
  salary          decimal(8,2),
  commission_pct  decimal(2,2),
  manager_id      int,
  department_id   int
) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

Let us load the data and validate the table.

%%sql

LOAD DATA LOCAL INPATH '/data/hr_db/employees' 
INTO TABLE employees
%%sql

SELECT * FROM employees LIMIT 10
%%sql

SELECT employee_id, department_id, salary FROM employees LIMIT 10
%%sql

SELECT count(1) FROM employees

6.2. Overview of Windowing Functions

Let us get an overview of Analytics or Windowing Functions in Spark SQL.

  • Aggregate Functions (sum, min, max, avg)

  • Window Functions (lead, lag, first_value, last_value)

  • Rank Functions (rank, dense_rank, row_number etc)

  • For all the functions we use OVER clause.

  • For aggregate functions we typically use PARTITION BY

  • For ranking and windowing functions we might use ORDER BY sorting_column or PARTITION BY partition_column ORDER BY sorting_column.

%%sql

USE itversity_hr
%%sql

SELECT employee_id, department_id, salary FROM employees LIMIT 10
%%sql

SELECT employee_id, department_id, salary,
    count(1) OVER (PARTITION BY department_id) AS employee_count,
    rank() OVER (ORDER BY salary DESC) AS rnk,
    lead(employee_id) OVER (PARTITION BY department_id ORDER BY salary DESC) AS lead_emp_id,
    lead(salary) OVER (PARTITION BY department_id ORDER BY salary DESC) AS lead_emp_sal
FROM employees
ORDER BY employee_id

6.3. Aggregations using Windowing Functions

Let us see how we can perform aggregations with in a partition or group using Windowing/Analytics Functions.

  • For simple aggregations where we have to get grouping key and aggregated results we can use GROUP BY.

  • If we want to get the raw data along with aggregated results, then using GROUP BY is not possible or overly complicated.

  • Using aggregate functions with OVER Clause not only simplifies the process of writing query, but also better with respect to performance.

  • Let us take an example of getting employee salary percentage when compared to department salary expense.

%%sql

USE itversity_hr
++
||
++
++
%%sql

SELECT employee_id, department_id, salary 
FROM employees 
ORDER BY department_id, salary
LIMIT 10
+-----------+-------------+--------+
|employee_id|department_id|  salary|
+-----------+-------------+--------+
|        178|         null| 7000.00|
|        200|           10| 4400.00|
|        202|           20| 6000.00|
|        201|           20|13000.00|
|        119|           30| 2500.00|
|        118|           30| 2600.00|
|        117|           30| 2800.00|
|        116|           30| 2900.00|
|        115|           30| 3100.00|
|        114|           30|11000.00|
+-----------+-------------+--------+

Let us write the query using GROUP BY approach.

%%sql

SELECT department_id,
       sum(salary) AS department_salary_expense
FROM employees
GROUP BY department_id
ORDER BY department_id
+-------------+-------------------------+
|department_id|department_salary_expense|
+-------------+-------------------------+
|         null|                  7000.00|
|           10|                  4400.00|
|           20|                 19000.00|
|           30|                 24900.00|
|           40|                  6500.00|
|           50|                156400.00|
|           60|                 28800.00|
|           70|                 10000.00|
|           80|                304500.00|
|           90|                 58000.00|
+-------------+-------------------------+
only showing top 10 rows
%%sql

SELECT e.employee_id, e.department_id, e.salary,
       ae.department_salary_expense,
       ae.avg_salary_expense
FROM employees e JOIN (
     SELECT department_id, 
            sum(salary) AS department_salary_expense,
            avg(salary) AS avg_salary_expense
     FROM employees
     GROUP BY department_id
) ae
ON e.department_id = ae.department_id
ORDER BY department_id, salary
|        117|           30| 2800.00|                 24900.0...
+-----------+-------------+--------+-------------------------+------------------+
|employee_id|department_id|  salary|department_salary_expense|avg_salary_expense|
+-----------+-------------+--------+-------------------------+------------------+
|        200|           10| 4400.00|                  4400.00|       4400.000000|
|        202|           20| 6000.00|                 19000.00|       9500.000000|
|        201|           20|13000.00|                 19000.00|       9500.000000|
|        119|           30| 2500.00|                 24900.00|       4150.000000|
|        118|           30| 2600.00|                 24900.00|       4150.000000|
|        117|           30| 2800.00|                 24900.00|       4150.000000|
|        116|           30| 2900.00|                 24900.00|       4150.000000|
|        115|           30| 3100.00|                 24900.00|       4150.000000|
|        114|           30|11000.00|                 24900.00|       4150.000000|
|        203|           40| 6500.00|                  6500.00|       6500.000000|
+-----------+-------------+--------+-------------------------+------------------+
only showing top 10 rows

Let us see how we can get it using Analytics/Windowing Functions.

  • We can use all standard aggregate functions such as count, sum, min, max, avg etc.

%%sql

SELECT e.employee_id, e.department_id, e.salary,
       sum(e.salary) 
         OVER (PARTITION BY e.department_id)
         AS department_salary_expense
FROM employees e
ORDER BY e.department_id
|        117|          ...
+-----------+-------------+--------+-------------------------+
|employee_id|department_id|  salary|department_salary_expense|
+-----------+-------------+--------+-------------------------+
|        178|         null| 7000.00|                  7000.00|
|        200|           10| 4400.00|                  4400.00|
|        202|           20| 6000.00|                 19000.00|
|        201|           20|13000.00|                 19000.00|
|        116|           30| 2900.00|                 24900.00|
|        119|           30| 2500.00|                 24900.00|
|        115|           30| 3100.00|                 24900.00|
|        118|           30| 2600.00|                 24900.00|
|        117|           30| 2800.00|                 24900.00|
|        114|           30|11000.00|                 24900.00|
+-----------+-------------+--------+-------------------------+
only showing top 10 rows
%%sql

SELECT e.employee_id, e.department_id, e.salary,
    sum(e.salary) OVER (PARTITION BY e.department_id) AS sum_sal_expense,
    avg(e.salary) OVER (PARTITION BY e.department_id) AS avg_sal_expense,
    min(e.salary) OVER (PARTITION BY e.department_id) AS min_sal_expense,
    max(e.salary) OVER (PARTITION BY e.department_id) AS max_sal_expense,
    count(e.salary) OVER (PARTITION BY e.department_id) AS cnt_sal_expense
FROM employees e
ORDER BY e.department_id
|        201| ...
+-----------+-------------+--------+---------------+---------------+---------------+---------------+---------------+
|employee_id|department_id|  salary|sum_sal_expense|avg_sal_expense|min_sal_expense|max_sal_expense|cnt_sal_expense|
+-----------+-------------+--------+---------------+---------------+---------------+---------------+---------------+
|        178|         null| 7000.00|        7000.00|    7000.000000|        7000.00|        7000.00|              1|
|        200|           10| 4400.00|        4400.00|    4400.000000|        4400.00|        4400.00|              1|
|        202|           20| 6000.00|       19000.00|    9500.000000|        6000.00|       13000.00|              2|
|        201|           20|13000.00|       19000.00|    9500.000000|        6000.00|       13000.00|              2|
|        116|           30| 2900.00|       24900.00|    4150.000000|        2500.00|       11000.00|              6|
|        119|           30| 2500.00|       24900.00|    4150.000000|        2500.00|       11000.00|              6|
|        115|           30| 3100.00|       24900.00|    4150.000000|        2500.00|       11000.00|              6|
|        118|           30| 2600.00|       24900.00|    4150.000000|        2500.00|       11000.00|              6|
|        117|           30| 2800.00|       24900.00|    4150.000000|        2500.00|       11000.00|              6|
|        114|           30|11000.00|       24900.00|    4150.000000|        2500.00|       11000.00|              6|
+-----------+-------------+--------+---------------+---------------+---------------+---------------+---------------+
only showing top 10 rows

6.3.1. Create tables to get daily revenue

Let us create couple of tables which will be used for the demonstrations of Windowing and Ranking functions.

  • We have ORDERS and ORDER_ITEMS tables.

  • Let us take care of computing daily revenue as well as daily product revenue.

  • As we will be using same data set several times, let us create the tables to pre compute the data.

  • daily_revenue will have the order_date and revenue, where data is aggregated using order_date as partition key.

  • daily_product_revenue will have order_date, order_item_product_id and revenue. In this case data is aggregated using order_date and order_item_product_id as partition keys.

Let us create table to compute daily revenue.

%%sql

USE itversity_retail
++
||
++
++
%%sql

DROP TABLE IF EXISTS daily_revenue
++
||
++
++
%%sql

CREATE TABLE daily_revenue
AS
SELECT o.order_date,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date
++
||
++
++
%%sql

SELECT * 
FROM daily_revenue
ORDER BY order_date
LIMIT 10
+--------------------+--------+
|          order_date| revenue|
+--------------------+--------+
|2013-07-25 00:00:...|31547.23|
|2013-07-26 00:00:...|54713.23|
|2013-07-27 00:00:...|48411.48|
|2013-07-28 00:00:...|35672.03|
|2013-07-29 00:00:...| 54579.7|
|2013-07-30 00:00:...|49329.29|
|2013-07-31 00:00:...|59212.49|
|2013-08-01 00:00:...|49160.08|
|2013-08-02 00:00:...|50688.58|
|2013-08-03 00:00:...|43416.74|
+--------------------+--------+

Let us create table to compute daily product revenue.

%%sql

USE itversity_retail
++
||
++
++
%%sql

DROP TABLE IF EXISTS daily_product_revenue
++
||
++
++
%%sql

CREATE TABLE daily_product_revenue
AS
SELECT o.order_date,
       oi.order_item_product_id,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date, oi.order_item_product_id
++
||
++
++
%%sql

SELECT * 
FROM daily_product_revenue
ORDER BY order_date, order_item_product_id
LIMIT 10
+--------------------+-----...
+--------------------+---------------------+-------+
|          order_date|order_item_product_id|revenue|
+--------------------+---------------------+-------+
|2013-07-25 00:00:...|                   24| 319.96|
|2013-07-25 00:00:...|                   93|  74.97|
|2013-07-25 00:00:...|                  134|  100.0|
|2013-07-25 00:00:...|                  191|5099.49|
|2013-07-25 00:00:...|                  226| 599.99|
|2013-07-25 00:00:...|                  365|3359.44|
|2013-07-25 00:00:...|                  403|1949.85|
|2013-07-25 00:00:...|                  502| 1650.0|
|2013-07-25 00:00:...|                  572| 119.97|
|2013-07-25 00:00:...|                  625| 199.99|
+--------------------+---------------------+-------+

6.4. Getting LEAD and LAG values

Let us understand LEAD and LAG functions to get column values from following or prior rows.

Here is the example where we can get prior or following records based on ORDER BY Clause.

%%sql

USE itversity_retail
%%sql

SELECT * FROM daily_revenue
ORDER BY order_date DESC
LIMIT 10
%%sql

SELECT t.*,
  lead(order_date) OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue t
ORDER BY order_date DESC
LIMIT 10

We can also pass number of rows as well as default values for nulls as arguments.

%%sql

USE itversity_retail
%%sql

SELECT t.*,
  lead(order_date, 7) OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue, 7) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue t
ORDER BY order_date DESC
LIMIT 10
%%sql

SELECT t.*,
  lead(order_date, 7) OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue, 7) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue t
ORDER BY order_date
LIMIT 10
%%sql

SELECT t.*,
  lead(order_date, 7) OVER (ORDER BY order_date DESC) AS prior_date,
  lead(revenue, 7, 0) OVER (ORDER BY order_date DESC) AS prior_revenue
FROM daily_revenue t
ORDER BY order_date
LIMIT 10

Let us see how we can get prior or following records with in a group based on particular order.

Here is the example where we can get prior or following records based on PARTITION BY and then ORDER BY Clause.

%%sql

USE itversity_retail
%%sql

DESCRIBE daily_product_revenue
%%sql

SELECT * FROM daily_product_revenue LIMIT 10
%%sql

SELECT t.*,
  LEAD(order_item_product_id) OVER (
    PARTITION BY order_date 
    ORDER BY revenue DESC
  ) next_product_id,
  LEAD(revenue) OVER (
    PARTITION BY order_date 
    ORDER BY revenue DESC
  ) next_revenue
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100

We can also pass number of rows as well as default values for nulls as arguments.

%%sql

SELECT t.*,
  LEAD(order_item_product_id) OVER (
    PARTITION BY order_date ORDER BY revenue DESC
  ) next_product_id,
  LEAD(revenue, 1, 0) OVER (
    PARTITION BY order_date ORDER BY revenue DESC
  ) next_revenue
FROM daily_product_revenue t
LIMIT 100

6.5. Getting first and last values

Let us see how we can get first and last value based on the criteria. We can also use min or max as well.

Here is the example of using first_value.

%%sql

USE itversity_retail
%%sql

SELECT t.*,
  first_value(order_item_product_id) OVER (
    PARTITION BY order_date ORDER BY revenue DESC
  ) first_product_id,
  first_value(revenue) OVER (
    PARTITION BY order_date ORDER BY revenue DESC
  ) first_revenue
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100

Let us see an example with last_value. While using last_value we need to specify ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING/PRECEEDING. By default it uses

%%sql

USE itversity_retail
%%sql

SELECT t.*,
  last_value(order_item_product_id) OVER (
    PARTITION BY order_date ORDER BY revenue
    ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
  ) last_product_id,
  last_value(revenue) OVER (
    PARTITION BY order_date ORDER BY revenue
    ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING
  )  last_revenue
FROM daily_product_revenue AS t
ORDER BY order_date, revenue DESC
LIMIT 100

6.6. Ranking using Windowing Functions

Let us see how we can get sparse ranks using rank function.

  • If we have to get ranks globally, we just need to specify ORDER BY

  • If we have to get ranks with in a key then we need to specify PARTITION BY and then ORDER BY.

  • By default ORDER BY will sort the data in ascending order. We can change the order by passing DESC after order by.

Here is an example to assign sparse ranks using daily_product_revenue with in each day based on revenue.

%%sql

USE itversity_retail
%%sql

SELECT t.*,
  rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS rnk
FROM daily_product_revenue t
ORDER BY order_date, revenue DESC
LIMIT 100

Here is another example to assign sparse ranks using employees data set with in each department.

%%sql

USE itversity_hr
%%sql

SELECT
  employee_id,
  department_id,
  salary,
  rank() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) rnk,
  dense_rank() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) drnk,
  row_number() OVER (
    PARTITION BY department_id
    ORDER BY salary DESC
  ) rn
FROM employees
ORDER BY department_id, salary DESC
%%sql

SELECT * FROM employees ORDER BY salary LIMIT 10
%%sql

SELECT employee_id, salary,
    dense_rank() OVER (ORDER BY salary DESC) AS drnk
FROM employees

Let us understand the difference between rank, dense_rank and row_number.

  • We can either of the functions to generate ranks when the rank field does not have duplicates.

  • When rank field have duplicates then row_number should not be used as it generate unique number for each record with in the partition.

  • rank will skip the ranks in between if multiple people get the same rank while dense_rank continue with the next number.

6.7. Understanding order of execution of SQL

Let us review the order of execution of SQL. First let us review the order of writing the query.

  1. SELECT

  2. FROM

  3. JOIN or OUTER JOIN with ON

  4. WHERE

  5. GROUP BY and optionally HAVING

  6. ORDER BY

Let us come up with a query which will compute daily revenue using COMPLETE or CLOSED orders and also ordered by order_date.

%%sql

USE itversity_retail
%%sql

SELECT o.order_date,
  round(sum(oi.order_item_order_id), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date
ORDER BY o.order_date
LIMIT 10
%%sql

SELECT o.order_date,
    round(sum(oi.order_item_order_id), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date
    HAVING revenue >= 2000000
ORDER BY order_date
LIMIT 10

However order of execution is different.

  1. FROM

  2. JOIN or OUTER JOIN with ON

  3. WHERE

  4. GROUP BY and optionally HAVING

  5. SELECT

  6. ORDER BY

As SELECT is executed before ORDER BY Clause, we will not be able to refer the aliases in SELECT in other clauses except for ORDER BY.

6.8. Overview of Nested Sub Queries

Let us recap about Nested Sub Queries.

  • We typically have Nested Sub Queries in FROM Clause.

  • We need to provide alias to the Nested Sub Queries in FROM Clause in Hive.

  • We use nested queries quite often over queries using Analytics/Windowing Functions

%%sql

SELECT * FROM (SELECT current_date) AS q

Let us see few more examples with respected to Nested Sub Queries.

%%sql

SELECT * FROM (
  SELECT order_date, count(1) AS order_count
  FROM orders
  GROUP BY order_date
) q
LIMIT 10
%%sql

SELECT * FROM (
  SELECT order_date, count(1) AS order_count
  FROM orders
  GROUP BY order_date
) q
WHERE q.order_count > 0
  • We can achieve using HAVING clause (no need to be nested to filter)

6.9. Filtering - Window Function Results

Let us understand how to filter on top of results of Window Functions.

  • We can use Window Functions only in SELECT Clause.

  • If we have to filter based on Window Function results, then we need to use Nested Sub Queries.

  • Once the query is nested, we can apply filter using aliases of the Window Functions.

Here is the example where we can filter data based on Window Functions.

%%sql

SELECT * FROM (
  SELECT t.*,
    dense_rank() OVER (
      PARTITION BY order_date
      ORDER BY revenue DESC
    ) AS drnk
  FROM daily_product_revenue t
) q
WHERE drnk <= 5
ORDER BY q.order_date, q.revenue DESC
LIMIT 100

6.9.1. Ranking and Filtering - Recap

Let us recap the procedure to get top 5 orders by revenue for each day.

  • We have our original data in orders and order_items

  • We can pre-compute the data or create a view with the logic to generate daily product revenue

  • Then, we have to use the view or table or even nested query to compute rank

  • Once the ranks are computed, we need to nest it to filter based up on our requirement.

  • Let us see using the query example.

Let us come up with the query to compute daily product revenue.

%%sql

USE itversity_retail
%%sql

DESCRIBE orders
%%sql

DESCRIBE order_items
%%sql

SELECT o.order_date,
       oi.order_item_product_id,
       round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date, oi.order_item_product_id
ORDER BY o.order_date, revenue DESC
LIMIT 100

Let us compute the rank for each product with in each date using revenue as criteria.

%%sql

SELECT q.*,
  rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS rnk
FROM (SELECT o.order_date,
        oi.order_item_product_id,
        round(sum(oi.order_item_subtotal), 2) AS revenue
      FROM orders o JOIN order_items oi
      ON o.order_id = oi.order_item_order_id
      WHERE o.order_status IN ('COMPLETE', 'CLOSED')
      GROUP BY o.order_date, oi.order_item_product_id) q
ORDER BY order_date, revenue DESC
LIMIT 35

Now let us see how we can filter the data.

%%sql

SELECT * FROM (SELECT q.*,
  dense_rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS drnk
FROM (SELECT o.order_date,
        oi.order_item_product_id,
        round(sum(oi.order_item_subtotal), 2) AS revenue
      FROM orders o JOIN order_items oi
      ON o.order_id = oi.order_item_order_id
      WHERE o.order_status IN ('COMPLETE', 'CLOSED')
      GROUP BY o.order_date, oi.order_item_product_id) q) q1
WHERE drnk <= 5
ORDER BY order_date, revenue DESC
LIMIT 35
spark.sql("DESCRIBE daily_product_revenue").show(false)
%%sql

SELECT * FROM (SELECT dpr.*,
  dense_rank() OVER (
    PARTITION BY order_date
    ORDER BY revenue DESC
  ) AS drnk
FROM daily_product_revenue AS dpr)
WHERE drnk <= 5
ORDER BY order_date, revenue DESC
LIMIT 35