3. Basic DDL and DML¶
As part of this section we will primarily focus on basic DDL and DML using Spark Metastore.
Create Spark Metastore Tables
Overview of Data Types
Adding Comments
Loading Data into Tables - Local
Loading Data into Tables - HDFS
Loading Data - Append and Overwrite
Creating External Tables
Managed Tables vs. External Tables
Overview of File Formats
Dropping Tables and Databases
Truncating Tables
import org.apache.spark.sql.SparkSession
val spark = SparkSession.
builder.
config("spark.ui.port", "0").
config("spark.sql.warehouse.dir", "/user/itversity/warehouse").
enableHiveSupport.
appName("Spark SQL - Managing Tables - Basic DDL and DML").
master("yarn").
getOrCreate
If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.
Using Spark SQL
spark2-sql \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Scala
spark2-shell \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
Using Pyspark
pyspark2 \
--master yarn \
--conf spark.ui.port=0 \
--conf spark.sql.warehouse.dir=/user/${USER}/warehouse
3.1. Create Spark Metastore Tables¶
Let us understand how to create tables in Spark Metastore. We will be focusing on syntax and semantics.
Let us drop and recreate the table. We need to determine table type, file format based up on the files that will be copied to the table. If the file format is delimited text file then we need to understand field delimiter as well.
Managed Table
Text File Format
Field Delimiter ‘,’
We will create the table based on the structure of data in /data/retail_db/orders
If you are using
spark-sql
make sure to end the statements with semi-colon. Withspark-shell
orpyspark
make sure to usespark.sql
to pass these commands.
%%sql
USE itversity_retail
%%sql
DROP TABLE orders
%%sql
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
%%sql
SHOW tables
%%sql
USE itversity_retail
%%sql
DROP TABLE orders
%%sql
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
%%sql
SHOW tables
Using Spark SQL with Python or Scala
spark.sql("USE itversity_retail")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
""")
spark.sql("SHOW tables").show()
spark.sql("USE itversity_retail")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
""")
spark.sql("SHOW tables").show()
Once the table is created either we can copy files using LOAD or insert query results using INSERT. We will see how to use insert to insert query results as part of the next section.
We can also use INSERT to insert individual records. But this approach is used less often. Let us create additional table and see how we can use INSERT to insert individual records.
%%sql
DROP DATABASE IF EXISTS itversity_sms CASCADE
%%sql
CREATE DATABASE IF NOT EXISTS itversity_sms
%%sql
USE itversity_sms
%%sql
CREATE TABLE students (
student_id INT,
student_first_name STRING,
student_last_name STRING,
student_phone_number STRING,
student_address STRING
) STORED AS TEXTFILE
%%sql
INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)
%%sql
INSERT INTO students VALUES (2, 'Donald', 'Duck', '1234567890', NULL)
%%sql
INSERT INTO students VALUES
(3, 'Mickey', 'Mouse', '2345678901', 'A Street, One City, Some State, 12345'),
(4, 'Bubble', 'Guppy', '6789012345', 'Bubbly Street, Guppy, La la land, 45678')
%%sql
SELECT * FROM students
%%sql
INSERT INTO students VALUES ('Scott', 'Tiger', 1, NULL, NULL)
%%sql
SELECT * FROM students
Using Spark SQL with Python or Scala
spark.sql("DROP DATABASE IF EXISTS itversity_sms CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_sms")
spark.sql("USE itversity_sms")
spark.sql("""
CREATE TABLE students (
student_id INT,
student_first_name STRING,
student_last_name STRING,
student_phone_number STRING,
student_address STRING
) STORED AS TEXTFILE
""")
spark.sql("INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)")
spark.sql("INSERT INTO students VALUES (2, 'Donald', 'Duck', '1234567890', NULL)")
spark.sql("""
INSERT INTO students VALUES
(3, 'Mickey', 'Mouse', '2345678901', 'A Street, One City, Some State, 12345'),
(4, 'Bubble', 'Guppy', '6789012345', 'Bubbly Street, Guppy, La la land, 45678')
""")
spark.sql("SELECT * FROM students").show()
3.2. Overview of Data Types¶
Let us get an overview of Data Types.
Syntactically Hive and Spark SQL are almost same.
Go to this hive page and review supported data types.
Spark Metastore supports all standard data types.
Numeric - INT, BIGINT, FLOAT etc
Alpha Numeric or String - CHAR, VARCHAR, STRING
Date and Timestamp - DATE, TIMESTAMP
Special Data Types - ARRAY, STRUCT etc
Boolean - BOOLEAN
If the file format is text file with special types, then we need to consider other clauses under DELIMITED ROW FORMAT (if we don’t want to use default delimiters).
%%sql
DROP DATABASE IF EXISTS itversity_sms CASCADE
%%sql
CREATE DATABASE IF NOT EXISTS itversity_sms
%%sql
USE itversity_sms
%%sql
CREATE TABLE students (
student_id INT,
student_first_name STRING,
student_last_name STRING,
student_phone_numbers ARRAY<STRING>,
student_address STRUCT<street:STRING, city:STRING, state:STRING, zip:STRING>
) STORED AS TEXTFILE
ROW FORMAT
DELIMITED FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY ','
%%sql
DESCRIBE students
%%sql
INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)
%%sql
SELECT * FROM students
%%sql
INSERT INTO students VALUES (2, 'Donald', 'Duck', ARRAY('1234567890', '2345678901'), NULL)
%%sql
SELECT * FROM students
%%sql
INSERT INTO students VALUES
(3, 'Mickey', 'Mouse', ARRAY('1234567890', '2345678901'), STRUCT('A Street', 'One City', 'Some State', '12345')),
(4, 'Bubble', 'Guppy', ARRAY('5678901234', '6789012345'), STRUCT('Bubbly Street', 'Guppy', 'La la land', '45678'))
%%sql
SELECT * FROM students
val username = System.getProperty("user.name")
import sys.process._
s"hdfs dfs -ls /user/${username}/warehouse/${username}_sms.db/students"!
s"hdfs dfs -cat /user/${username}/warehouse/${username}_sms.db/students/*"!
Using Spark SQL with Python or Scala
spark.sql("DROP DATABASE IF EXISTS itversity_sms CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_sms")
spark.sql("USE itversity_sms")
spark.sql("DROP TABLE IF EXISTS students")
spark.sql("""
CREATE TABLE students (
student_id INT,
student_first_name STRING,
student_last_name STRING,
student_phone_numbers ARRAY<STRING>,
student_address STRUCT<street:STRING, city:STRING, state:STRING, zip:STRING>
) STORED AS TEXTFILE
ROW FORMAT
DELIMITED FIELDS TERMINATED BY '\t'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
""")
spark.sql("INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)")
spark.sql("INSERT INTO students VALUES (2, 'Donald', 'Duck', ARRAY('1234567890', '2345678901'), NULL)")
spark.sql("""
INSERT INTO students VALUES
(3, 'Mickey', 'Mouse', ARRAY('1234567890', '2345678901'), STRUCT('A Street', 'One City', 'Some State', '12345')),
(4, 'Bubble', 'Guppy', ARRAY('5678901234', '6789012345'), STRUCT('Bubbly Street', 'Guppy', 'La la land', '45678'))
""")
spark.sql("SELECT * FROM students")
3.3. Adding Comments¶
Let us understand how to create table with comments in Hive using orders as example.
We can specify comments for both columns as well as tables using COMMENT keyword.
%%sql
USE itversity_retail
%%sql
DROP TABLE IF EXISTS orders
%%sql
CREATE TABLE orders (
order_id INT COMMENT 'Unique order id',
order_date STRING COMMENT 'Date on which order is placed',
order_customer_id INT COMMENT 'Customer id who placed the order',
order_status STRING COMMENT 'Current status of the order'
) COMMENT 'Table to save order level details'
Using Spark SQL with Python or Scala
spark.sql("USE itversity_retail")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
order_id STRING COMMENT 'Unique order id',
order_date STRING COMMENT 'Date on which order is placed',
order_customer_id INT COMMENT 'Customer id who placed the order',
order_status STRING COMMENT 'Current status of the order'
) COMMENT 'Table to save order level details'
""")
Default field delimiter is \001 character.
We can see the comments using
DESCRIBE orders
orDESCRIBE FORMATTED orders
.
%%sql
DESCRIBE orders
%%sql
DESCRIBE FORMATTED orders
spark.sql("DESCRIBE FORMATTED orders").show(200, false) // Scala
3.4. Loading Data into Tables - Local¶
Let us understand how to load data into Spark Metastore tables. We can load either from local file system or from HDFS.
Data should be in sync with Spark Metastore table structure.
We need to create table with the same file format and delimiters so that we can load the data in files into Spark Metastore tables.
Our data is in text files, line delimiter is new line character and field delimiter is comma.
As our table uses default file format (text file), default line/record delimiter and field delimiter is specified as comma, we should be able to load the data with out any issues.
Here is the script which will create table and then load data into the table.
%%sql
USE itversity_retail
%%sql
DROP TABLE orders
%%sql
CREATE TABLE orders (
order_id INT COMMENT 'Unique order id',
order_date STRING COMMENT 'Date on which order is placed',
order_customer_id INT COMMENT 'Customer id who placed the order',
order_status STRING COMMENT 'Current status of the order'
) COMMENT 'Table to save order level details'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders
Using Spark SQL with Python or Scala
spark.sql("USE itversity_retail")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
order_id INT COMMENT 'Unique order id',
order_date STRING COMMENT 'Date on which order is placed',
order_customer_id INT COMMENT 'Customer id who placed the order',
order_status STRING COMMENT 'Current status of the order'
) COMMENT 'Table to save order level details'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders")
Once the data is loaded we can run these queries to preview the data.
%%sql
SELECT * FROM orders LIMIT 10
%%sql
SELECT count(1) FROM orders
3.5. Loading Data into Tables - HDFS¶
Let us understand how we can load data from HDFS location into Spark Metastore table.
We can use load command with out LOCAL to get data from HDFS location into Spark Metastore Table.
User running load command from HDFS location need to have write permissions on the source location as data will be moved (deleted on source and copied to Spark Metastore table)
Make sure user have write permissions on the source location.
First we need to copy the data into HDFS location where user have write permissions.
import sys.process._
val username = System.getProperty("user.name")
s"hadoop fs -rm -R /user/${username}/retail_db/orders" !
s"hadoop fs -mkdir /user/${username}/retail_db" !
s"hadoop fs -put -f /data/retail_db/orders /user/${username}/retail_db" !
s"hadoop fs -ls /user/${username}/retail_db/orders" !
Here is the script which will truncate the table and then load the data from HDFS location to Hive table.
%%sql
USE itversity_retail
%%sql
TRUNCATE TABLE orders
%%sql
LOAD DATA INPATH '/user/itversity/retail_db/orders'
INTO TABLE orders
s"hadoop fs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
s"hadoop fs -ls /user/${username}/retail_db/orders" !
%%sql
SELECT * FROM orders LIMIT 10
%%sql
SELECT count(1) FROM orders
Using Spark SQL with Python or Scala
spark.sql("USE itversity_retail")
spark.sql("TRUNCATE TABLE orders")
spark.sql("""
LOAD DATA INPATH '/user/itversity/retail_db/orders'
INTO TABLE orders""")
s"hadoop fs -ls /user/${username}/retail_db/orders" !
spark.sql("SELECT * FROM orders LIMIT 10")
spark.sql("SELECT count(1) FROM orders")
If you look at /user/training/retail_db orders directory would have been deleted.
Move is much faster compared to copying the files by moving blocks around, hence Hive load command from HDFS location will always try to move files.
3.6. Loading Data - Append and Overwrite¶
Let us understand different approaches to load the data into Spark Metastore table.
INTO TABLE
will append in the existing tableIf we want to overwrite we have to specify
OVERWRITE INTO TABLE
%%sql
USE itversity_retail
%%sql
SELECT count(1) FROM orders
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/orders'
INTO TABLE orders
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
%%sql
SELECT count(1) FROM orders
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/orders'
OVERWRITE INTO TABLE orders
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
%%sql
SELECT count(1) FROM orders
Using Spark SQL with Python or Scala
spark.sql("USE itversity_retail")
spark.sql("SELECT count(1) FROM orders").show()
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/orders'
INTO TABLE orders
""")
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
spark.sql("SELECT count(1) FROM orders").show()
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/orders'
OVERWRITE INTO TABLE orders
""")
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
spark.sql("SELECT count(1) FROM orders").show()
3.7. Creating External Tables¶
Let us understand how to create external table in Spark Metastore using orders as example. Also we will see how to load data into external table.
We just need to add EXTERNAL keyword in the CREATE clause and LOCATION after STORED AS clause or just LOCATION as part of CREATE TABLE statement.
We can use same LOAD commands to get data from either local file system or HDFS which we have used for Managed table.
Once table is created we can run
DESCRIBE FORMATTED orders
to check the metadata of the table and confirm whether it is managed table or external table.We need to specify the location while creating external tables.
Here is the script to create external table in Spark Metastore.
%%sql
USE itversity_retail
%%sql
DROP TABLE IF EXISTS orders
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -rm -R /user/${username}/external/retail_db/orders" !
s"hdfs dfs -mkdir -p /user/${username}/external/retail_db/orders" !
%%sql
CREATE EXTERNAL TABLE orders (
order_id INT COMMENT 'Unique order id',
order_date STRING COMMENT 'Date on which order is placed',
order_customer_id INT COMMENT 'Customer id who placed the order',
order_status STRING COMMENT 'Current status of the order'
) COMMENT 'Table to save order level details'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/itversity/external/retail_db/orders'
s"hdfs dfs -ls /user/${username}/external/retail_db/orders" !
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/orders'
INTO TABLE orders
s"hdfs dfs -ls /user/${username}/external/retail_db/orders" !
%%sql
SELECT * FROM orders LIMIT 10
%%sql
SELECT count(1) FROM orders
spark.sql("DESCRIBE FORMATTED orders").show(200, false)
3.8. Managed Tables vs. External Tables¶
Let us compare and contrast between Managed Tables and External Tables.
When we say EXTERNAL and specify LOCATION or LOCATION alone as part of CREATE TABLE, it makes the table EXTERNAL.
Rest of the syntax is same as Managed Table.
However, when we drop Managed Table, it will delete metadata from metastore as well as data from HDFS.
When we drop External Table, only metadata will be dropped, not the data.
Typically we use External Table when same dataset is processed by multiple frameworks such as Hive, Pig, Spark etc.
We cannot run TRUNCATE TABLE command against External Tables.
%%sql
USE itversity_retail
%%sql
SHOW tables
spark.sql("DESCRIBE FORMATTED orders").show(200, false)
%%sql
TRUNCATE TABLE orders
spark.sql("DESCRIBE FORMATTED order_items").show(200, false)
%%sql
TRUNCATE TABLE order_items
%%sql
DROP TABLE orders
%%sql
DROP TABLE order_items
import sys.process._
s"hdfs dfs -ls /user/${username}/retail_db/orders" !
3.9. Overview of File Formats¶
Let us go through the details about different file formats supported by STORED AS Clause.
Go to this page and review supported file formats.
Supported File Formats
TEXTFILE
ORC
PARQUET
AVRO
SEQUENCEFILE - is not important
JSONFILE - only available in recent vesions of Hive.
and more
We can even specify custom file formats (out of scope)
%%sql
DROP DATABASE IF EXISTS itversity_sms CASCADE
%%sql
CREATE DATABASE IF NOT EXISTS itversity_sms
%%sql
USE itversity_sms
%%sql
CREATE TABLE students (
student_id INT,
student_first_name STRING,
student_last_name STRING,
student_phone_numbers ARRAY<STRING>,
student_address STRUCT<street:STRING, city:STRING, state:STRING, zip:STRING>
) STORED AS parquet
%%sql
INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)
%%sql
INSERT INTO students VALUES (2, 'Donald', 'Duck', ARRAY('1234567890', '2345678901'), NULL)
%%sql
INSERT INTO students VALUES
(3, 'Mickey', 'Mouse', ARRAY('1234567890', '2345678901'), STRUCT('A Street', 'One City', 'Some State', '12345')),
(4, 'Bubble', 'Guppy', ARRAY('5678901234', '6789012345'), STRUCT('Bubbly Street', 'Guppy', 'La la land', '45678'))
%%sql
SELECT * FROM students
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/${username}/warehouse/${username}_sms.db/students"!
3.10. Dropping Tables and Databases¶
Let us understand how to DROP Spark Metastore Tables as well as Databases.
We can use DROP TABLE command to drop the table.. Let us drop orders table
%%sql
CREATE DATABASE IF NOT EXISTS itversity_retail
%%sql
USE itversity_retail
%%sql
SHOW tables
%%sql
CREATE TABLE IF NOT EXISTS orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql
DROP TABLE orders
%%sql
DROP TABLE IF EXISTS orders
DROP TABLE on managed table will delete both metadata in metastore as well as data in HDFS, while DROP TABLE on external table will only delete metadata in metastore.
We can drop database by using DROP DATABASE Command. However we need to drop all the tables in the database first.
Here is the example to drop the database itversity_retail -
DROP DATABASE itversity_retail
We can also drop all the tables and databases by adding CASCADE.
%%sql
DROP DATABASE itversity_retail
%%sql
DROP DATABASE IF EXISTS itversity_retail CASCADE
3.11. Truncating Tables¶
Let us understand how to truncate tables.
TRUNCATE works only for managed tables. Only data will be deleted, structure will be retained.
Launch Spark SQL
%%sql
CREATE DATABASE IF NOT EXISTS itversity_retail
%%sql
SHOW tables
%%sql
DROP TABLE IF EXISTS orders
%%sql
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/orders'
INTO TABLE orders
%%sql
SELECT * FROM orders LIMIT 10
%%sql
TRUNCATE TABLE orders
%%sql
SELECT * FROM orders LIMIT 10
%%sql
DROP TABLE IF EXISTS orders
%%sql
CREATE EXTERNAL TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/itversity/external/retail_db/orders'
%%sql
LOAD DATA LOCAL INPATH '/data/retail_db/orders'
OVERWRITE INTO TABLE orders
%%sql
SELECT * FROM orders LIMIT 10
%%sql
TRUNCATE TABLE orders
3.12. Managed Tables - Exercise¶
Let us use NYSE data and see how we can create tables in Spark Metastore.
Duration: 30 Minutes
Data Location (Local): /data/nyse_all/nyse_data
Create a database with the name - YOUR_OS_USER_NAME_nyse
Table Name: nyse_eod
File Format: TEXTFILE (default)
Review the files by running Linux commands before using data sets. Data is compressed and we can load the files as is.
Copy one of the zip file to your home directory and preview the data. There should be 7 fields. You need to determine the delimiter.
Field Names: stockticker, tradedate, openprice, highprice, lowprice, closeprice, volume. For example, you need to use
BIGINT
for volume notINT
.Determine correct data types based on the values
Create Managed table with default Delimiter.
As delimiters in data and table are not same, you need to figure out how to get data into the target table.
Make sure the data is copied into the table as per the structure defined and validate.
3.12.1. Validation¶
Run the following queries to ensure that you will be able to read the data.
DESCRIBE FORMATTED YOUR_OS_USER_NAME_nyse.nyse_eod;
SELECT * FROM YOUR_OS_USER_NAME_nyse.nyse_eod LIMIT 10
SELECT count(1) FROM YOUR_OS_USER_NAME_nyse.nyse_eod;
// There should not be field delimiter as the requirement is to use default delimiter
spark.sql("DESCRIBE FORMATTED itversity_nyse.nyse_eod").show(200, false)
%%sql
SELECT * FROM itversity_nyse.nyse_eod LIMIT 10
%%sql
SELECT count(1) FROM itversity_nyse.nyse_eod