4. DML and Partitioning¶
As part of this section we will continue understanding further concepts related to DML and also get into the details related to partitioning tables. With respect to DML, earlier we have seen how to use LOAD command, now we will see how to use INSERT command primarily to get query results copied into a table.
Introduction to Partitioning
Creating Tables using Parquet
LOAD vs. INSERT
Inserting Data using Stage Table
Creating Partitioned Tables
Adding Partitions to Tables
Loading data into Partitions
Inserting Data into Partitions
Using Dynamic Partition Mode
Exercise - Partitioned Tables
Unlike Hive, Spark SQL does not support Bucketing which is similar to Hash Partitioning. However, Delta Lake does. Delta Lake is 3rd party library which facilitate us additional capabilities such as ACID transactions on top of Spark Metastore tables
Let us make sure that we have orders table with data as we will be using it to populate partitioned tables very soon.
val username = System.getProperty("user.name")
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
config("spark.sql.warehouse.dir", "/user/itversity/warehouse").
enableHiveSupport.
appName("Spark SQL - Managing Tables - DML and Partitioning").
master("yarn").
getOrCreate
%%sql
USE itversity_retail
%%sql
SHOW tables
%%sql
DROP TABLE orders
%%sql
SELECT current_database()
%%sql
CREATE TABLE IF NOT EXISTS itversity_retail.orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/orders'
OVERWRITE INTO TABLE orders
%%sql
SELECT count(1) FROM orders
4.1. Introduction to Partitioning¶
Let us get an overview of partitioning of Spark Metastore tables.
It is similar to list partitioning where each partition is equal to a particular value for a given column.
Spark Metastore does not support range partitioning and bucketing. Bucketing is supported in Hive which is similar to Hash Partitioning.
Once the table is created, we can add static partitions and then load or insert data into it.
Spark Metastore also support creation of partitions dynamically, where partitions will be created based up on the partition column value.
A Partitioned table can be managed or external.
4.2. Creating Tables using Parquet¶
Let us create order_items table using Parquet file format. By default, the files of table using Parquet file format are compressed using Snappy algorithm.
A table with parquet file format can be external.
In our case we will create managed table with file format as parquet in STORED AS clause.
We will explore INSERT to insert query results into this table of type parquet.
%%sql
USE itversity_retail
%%sql
SHOW tables
Drop order_items, if it already exists
%%sql
DROP TABLE IF EXISTS order_items
%%sql
CREATE TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) STORED AS parquet
To get complete output run the below command using
spark-sql
. Here is the output look like.
%%sql
DESCRIBE FORMATTED order_items
spark.sql("DESCRIBE FORMATTED order_items").show(200, false)
val username = System.getProperty("user.name")
import sys.process._
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/order_items" !
4.3. LOAD vs. INSERT¶
Let us compare and contrast LOAD and INSERT commands. These are the main approaches using which we get data into Spark Metastore tables.
LOAD will copy the files by dividing them into blocks.
LOAD is the fastest way of getting data into Spark Metastore tables. However, there will be minimal validations at File level.
There will be no transformations or validations at data level.
If it require any transformation while getting data into Spark Metastore table, then we need to use INSERT command.
Here are some of the usage scenarios of insert:
Changing delimiters in case of text file format
Changing file format
Loading data into partitioned or bucketed tables (if bucketing is supported).
Apply any other transformations at data level (widely used)
%%sql
USE itversity_retail
%%sql
DROP TABLE IF EXISTS order_items
%%sql
CREATE TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) STORED AS parquet
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/order_items'
INTO TABLE order_items
val username = System.getProperty("user.name")
import sys.process._
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/order_items" !
%%sql
SELECT * FROM order_items LIMIT 10
4.4. Inserting Data using Stage Table¶
Let us understand how to insert data into order_items with Parquet file format.
As data is in text file format and our table is created with Parquet file format, we will not be able to use LOAD command to load the data.
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/order_items'
OVERWRITE INTO TABLE order_items
Above load command will be successful, however when we try to query it will fail as the query expects data to be in Parquet file format.
%%sql
SELECT * FROM order_items LIMIT 10
%%sql
TRUNCATE TABLE order_items
Following are the steps to get data into table which is created using different file format or delimiter than our source data.
We need to create stage table with text file format and comma as delimiter (order_items_stage).
Load data from our files in local file system to stage table.
Using stage table run insert command to insert data into our target table (order_items).
Let us see an example of inserting data into the target table from staging table.
%%sql
USE itversity_retail
%%sql
SHOW tables
%%sql
CREATE TABLE order_items_stage (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
spark.sql("DESCRIBE FORMATTED order_items_stage").show(200, false)
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/order_items' INTO TABLE order_items_stage
%%sql
SELECT * FROM order_items_stage LIMIT 10
%%sql
TRUNCATE TABLE order_items
%%sql
INSERT INTO TABLE order_items
SELECT * FROM order_items_stage
%%sql
SELECT * FROM order_items LIMIT 10
%%sql
SELECT count(1) FROM order_items
INSERT INTO
will append data into the target table by adding new files.
%%sql
INSERT INTO TABLE order_items
SELECT * FROM order_items_stage
%%sql
SELECT * FROM order_items LIMIT 10
%%sql
SELECT count(1) FROM order_items
INSERT OVERWRITE
will overwrite the data in target table by deleting the files related to old data from the directory pointed by the Spark Metastore table.
%%sql
INSERT OVERWRITE TABLE order_items
SELECT * FROM order_items_stage
%%sql
SELECT * FROM order_items
%%sql
SELECT count(1) FROM order_items
import sys.process._
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/order_items" !
4.5. Creating Partitioned Tables¶
Let us understand how to create partitioned table and get data into that table.
Earlier we have already created orders table. We will use that as reference and create partitioned table.
We can use
PARTITIONED BY
clause to define the column along with data type. In our case we will use order_month as partition column.We will not be able to directly load the data into the partitioned table using our original orders data (as data is not in sync with structure).
Here is the example of creating partitioned tables in Spark Metastore.
%%sql
USE itversity_retail
%%sql
SHOW tables
Drop orders_part if it already exists
%%sql
DROP TABLE IF EXISTS orders_part
%%sql
CREATE TABLE orders_part (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) PARTITIONED BY (order_month INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql
DESCRIBE orders_part
spark.sql("DESCRIBE FORMATTED orders_part").show(200, false)
import sys.process._
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders_part" !
4.6. Adding Partitions to Tables¶
Let us understand how we can add static partitions to Partitioned tables in Spark Metastore.
We can add partitions using
ALTER TABLE
command withADD PARTITION
.For each and every partition created, a subdirectory will be created using partition column name and corresponding value under the table directory.
Let us understand how to add partitions to orders_part table under itversity_retail database.
Here is the script to add static partitions to a Partitioned table where partition column type is string.
%%sql
USE itversity_retail
%%sql
DROP TABLE IF EXISTS orders_part
%%sql
CREATE TABLE orders_part (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) PARTITIONED BY (order_month STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
import sys.process._
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders_part" !
%%sql
ALTER TABLE orders_part ADD PARTITION (order_month='2013-07')
import sys.process._
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders_part" !
Here is the script to add static partitions to a Partitioned table where partition column type is integer. We can add one or more partitions at a time. For further demos we will be using this table
%%sql
USE itversity_retail
%%sql
DROP TABLE IF EXISTS orders_part
%%sql
CREATE TABLE orders_part (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) PARTITIONED BY (order_month INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql
DESCRIBE orders_part
import sys.process._
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders_part" !
%%sql
ALTER TABLE orders_part ADD PARTITION (order_month=201307)
%%sql
ALTER TABLE orders_part ADD
PARTITION (order_month=201308)
PARTITION (order_month=201309)
PARTITION (order_month=201310)
import sys.process._
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders_part" !
4.7. Loading into Partitions¶
Let us understand how to use load command to load data into partitioned tables.
We need to make sure that file format of the file which is being loaded into table is same as the file format used while creating the table.
We also need to make sure that delimiters are consistent between files and table for text file format.
Also data should match the criteria for the partition into which data is loaded.
Our
/data/retail_db/orders
have data for the whole year and hence we should not load the data directly into partition.We need to split into files matching partition criteria and then load into the table.
To use load command to load the files into partitions we need to pre-partition the data based on partition logic.
Here is the example of using simple shell commands to partition the data. Use command prompt to run these commands
rm -rf ~/orders
mkdir -p ~/orders
grep 2013-07 /data/retail_db/orders/part-00000 > ~/orders/orders_201307
grep 2013-08 /data/retail_db/orders/part-00000 > ~/orders/orders_201308
grep 2013-09 /data/retail_db/orders/part-00000 > ~/orders/orders_201309
grep 2013-10 /data/retail_db/orders/part-00000 > ~/orders/orders_201310
Let us see how we can load data into corresponding partitions. Data has to be pre-partitioned based on the partitioned column.
%%sql
USE itversity_retail
%%sql
LOAD DATA LOCAL INPATH '/home/itversity/orders/orders_201307'
INTO TABLE orders_part PARTITION (order_month=201307)
import sys.process._
s"hdfs dfs -ls -R /user/${username}/warehouse/${username}_retail.db/orders_part" !
%%sql
LOAD DATA LOCAL INPATH '/home/itversity/orders/orders_201308'
INTO TABLE orders_part PARTITION (order_month=201308)
%%sql
LOAD DATA LOCAL INPATH '/home/itversity/orders/orders_201309'
INTO TABLE orders_part PARTITION (order_month=201309)
%%sql
LOAD DATA LOCAL INPATH '/home/itversity/orders/orders_201310'
INTO TABLE orders_part PARTITION (order_month=201310)
import sys.process._
s"hdfs dfs -ls -R /user/${username}/warehouse/${username}_retail.db/orders_part" !
import sys.process._
s"hdfs dfs -tail /user/${username}/warehouse/${username}_retail.db/orders_part/order_month=201310/orders_201310"!
%%sql
SELECT * FROM orders_part LIMIT 10
4.8. Inserting Data into Partitions¶
Let us understand how to use insert to get data into static partitions in Spark Metastore from existing table called as orders.
Let us recap what is covered so far related to partitioned tables.
We have created a table called as orders_part with order_month of type INT as partitioned column.
We have added 4 static partitions for 201307, 201308, 201309 and 201310 using ALTER TABLE command.
Once the table is created and partitions are added we have pre-processed the data to get data into the partitions using LOAD command.
It is not practical to use LOAD command always. We typically use
INSERT
via stage table to copy data into partitioned table.We can pre-create partitions in partitioned tables and insert data into partitions using appropriate
INSERT
command. One need to ensure that required filter condition is applied to get the data relevant to the partition that is being populated.We can also create partitions dynamically which we will see as part of the next topic.
%%sql
USE itversity_retail
%%sql
ALTER TABLE orders_part ADD PARTITION (order_month=201311)
%%sql
SELECT count(1) FROM orders_part
%%sql
INSERT INTO TABLE orders_part PARTITION (order_month=201311)
SELECT * FROM orders WHERE order_date LIKE '2013-11%'
%%sql
SELECT count(1) FROM orders_part
import sys.process._
s"hdfs dfs -ls -R /user/${username}/warehouse/${username}_retail.db/orders_part" !
4.9. Using Dynamic Partition Mode¶
Let us understand how we can insert data into partitioned table using dynamic partition mode.
Using dynamic partition mode we need not pre create the partitions. Partitions will be automatically created when we issue INSERT command in dynamic partition mode.
To insert data using dynamic partition mode, we need to set the property
hive.exec.dynamic.partition
to trueAlso we need to set
hive.exec.dynamic.partition.mode
to nonstrict
Here is the example of inserting data into partitions using dynamic partition mode.
%%sql
USE itversity_retail
%%sql
SHOW tables
%%sql
SELECT count(1) FROM orders
%%sql
SELECT count(1) FROM orders_part
%%sql
SET hive.exec.dynamic.partition
%%sql
SET hive.exec.dynamic.partition.mode
%%sql
SET hive.exec.dynamic.partition=true
%%sql
SET hive.exec.dynamic.partition.mode=nonstrict
%%sql
INSERT INTO TABLE orders_part PARTITION (order_month)
SELECT o.*, date_format(order_date, 'yyyyMM') order_month
FROM orders o
WHERE order_date >= '2013-12-01 00:00:00.0'
import sys.process._
s"hdfs dfs -ls -R /user/${username}/warehouse/${username}_retail.db/orders_part" !
%%sql
SELECT count(1) FROM orders
%%sql
SELECT count(1) FROM orders_part
You will see new partitions created starting from 201312 to 201407.
4.10. Exercise - Partitioned Tables¶
Let us take care of this exercise related to partitioning to self evaluate our comfort level in working with partitioned tables.
Duration: 30 Minutes
Use data from /data/nyse_all/nyse_data
Use database YOUR_OS_USER_NAME_nyse
Create partitioned table nyse_eod_part
Field Names: stockticker, tradedate, openprice, highprice, lowprice, closeprice, volume
Determine correct data types based on the values
Create Managed table with “,” as delimiter.
Partition Field should be tradeyear and of type INT (one partition for corresponding year)
Insert data into partitioned table using dynamic partition mode.
Here are the steps to come up with the solution.
Review the files under /data/nyse_all/nyse_data - determine data types (For example: tradedate should be INT and volume should be BIGINT)
Create database YOUR_OS_USER_NAME_nyse (if it does not exists)
Create non partitioned stage table
Load data into non partitioned stage table
Validate the count and also see that data is as expected by running simple select query.
Create partitioned table
Set required properties to use dynamic partition
Insert data into partitioned table - here is how you can compute year from tradedate of type int
year(to_date(cast(tradedate AS STRING), 'yyyyMMdd')) AS tradeyear
Run below validate commands to validate
4.10.1. Validation¶
Here are the instructions to validate the results.
Run
hdfs dfs -ls /user/YOUR_OS_USER_NAME/warehouse/YOUR_OS_USER_NAME_nyse.db/nyse_eod_part
Run
SHOW PARTITIONS YOUR_OS_USER_NAME_nyse.nyse_eod_part
. You should see partitions for all the years using which you have loaded the data.Run
SELECT count(1) FROM YOUR_OS_USER_NAME_nyse.nyse_eod_part
. The count should match the number of records in our dataset.You can compare with the output generated by this simple Python code which is validated in our labs.
import pandas as pd
import glob
path = r'/data/nyse_all/nyse_data' # use your path
all_files = glob.glob(path + "/*.txt.gz")
li = []
for filename in all_files:
df = pd.read_csv(filename, index_col=None, header=None)
li.append(df)
frame = pd.concat(li, axis=0, ignore_index=True)
frame.shape