3. Basic DDL and DML

As part of this section we will primarily focus on basic DDL and DML using Spark Metastore.

  • Create Spark Metastore Tables

  • Overview of Data Types

  • Adding Comments

  • Loading Data into Tables - Local

  • Loading Data into Tables - HDFS

  • Loading Data - Append and Overwrite

  • Creating External Tables

  • Managed Tables vs. External Tables

  • Overview of File Formats

  • Dropping Tables and Databases

  • Truncating Tables

import org.apache.spark.sql.SparkSession

val spark = SparkSession.
    builder.
    config("spark.ui.port", "0").
    config("spark.sql.warehouse.dir", "/user/itversity/warehouse").
    enableHiveSupport.
    appName("Spark SQL - Managing Tables - Basic DDL and DML").
    master("yarn").
    getOrCreate

If you are going to use CLIs, you can use Spark SQL using one of the 3 approaches.

Using Spark SQL

spark2-sql \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Scala

spark2-shell \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

Using Pyspark

pyspark2 \
    --master yarn \
    --conf spark.ui.port=0 \
    --conf spark.sql.warehouse.dir=/user/${USER}/warehouse

3.1. Create Spark Metastore Tables

Let us understand how to create tables in Spark Metastore. We will be focusing on syntax and semantics.

  • Let us drop and recreate the table. We need to determine table type, file format based up on the files that will be copied to the table. If the file format is delimited text file then we need to understand field delimiter as well.

    • Managed Table

    • Text File Format

    • Field Delimiter ‘,’

  • We will create the table based on the structure of data in /data/retail_db/orders

  • If you are using spark-sql make sure to end the statements with semi-colon. With spark-shell or pyspark make sure to use spark.sql to pass these commands.

%%sql

USE itversity_retail
%%sql

DROP TABLE orders
%%sql

CREATE TABLE orders (
  order_id INT,
  order_date STRING,
  order_customer_id INT,
  order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
%%sql

SHOW tables
%%sql

USE itversity_retail
%%sql

DROP TABLE orders
%%sql

CREATE TABLE orders (
  order_id INT,
  order_date STRING,
  order_customer_id INT,
  order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
%%sql

SHOW tables
  • Using Spark SQL with Python or Scala

spark.sql("USE itversity_retail")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
  order_id INT,
  order_date STRING,
  order_customer_id INT,
  order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
""")
spark.sql("SHOW tables").show()
spark.sql("USE itversity_retail")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
  order_id INT,
  order_date STRING,
  order_customer_id INT,
  order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
""")
spark.sql("SHOW tables").show()
  • Once the table is created either we can copy files using LOAD or insert query results using INSERT. We will see how to use insert to insert query results as part of the next section.

  • We can also use INSERT to insert individual records. But this approach is used less often. Let us create additional table and see how we can use INSERT to insert individual records.

%%sql
DROP DATABASE IF EXISTS itversity_sms CASCADE
%%sql

CREATE DATABASE IF NOT EXISTS itversity_sms
%%sql

USE itversity_sms
%%sql

CREATE TABLE students (
    student_id INT,
    student_first_name STRING,
    student_last_name STRING,
    student_phone_number STRING,
    student_address STRING
) STORED AS TEXTFILE
%%sql

INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)
%%sql

INSERT INTO students VALUES (2, 'Donald', 'Duck', '1234567890', NULL)
%%sql

INSERT INTO students VALUES 
    (3, 'Mickey', 'Mouse', '2345678901', 'A Street, One City, Some State, 12345'),
    (4, 'Bubble', 'Guppy', '6789012345', 'Bubbly Street, Guppy, La la land, 45678')
%%sql

SELECT * FROM students
%%sql

INSERT INTO students VALUES ('Scott', 'Tiger', 1, NULL, NULL)
%%sql

SELECT * FROM students
  • Using Spark SQL with Python or Scala

spark.sql("DROP DATABASE IF EXISTS itversity_sms CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_sms")
spark.sql("USE itversity_sms")
spark.sql("""
CREATE TABLE students (
    student_id INT,
    student_first_name STRING,
    student_last_name STRING,
    student_phone_number STRING,
    student_address STRING
) STORED AS TEXTFILE
""")
spark.sql("INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)")
spark.sql("INSERT INTO students VALUES (2, 'Donald', 'Duck', '1234567890', NULL)")
spark.sql("""
INSERT INTO students VALUES 
    (3, 'Mickey', 'Mouse', '2345678901', 'A Street, One City, Some State, 12345'),
    (4, 'Bubble', 'Guppy', '6789012345', 'Bubbly Street, Guppy, La la land, 45678')
""")    
spark.sql("SELECT * FROM students").show()

3.2. Overview of Data Types

Let us get an overview of Data Types.

  • Syntactically Hive and Spark SQL are almost same.

  • Go to this hive page and review supported data types.

  • Spark Metastore supports all standard data types.

    • Numeric - INT, BIGINT, FLOAT etc

    • Alpha Numeric or String - CHAR, VARCHAR, STRING

    • Date and Timestamp - DATE, TIMESTAMP

    • Special Data Types - ARRAY, STRUCT etc

    • Boolean - BOOLEAN

  • If the file format is text file with special types, then we need to consider other clauses under DELIMITED ROW FORMAT (if we don’t want to use default delimiters).

%%sql

DROP DATABASE IF EXISTS itversity_sms CASCADE
%%sql

CREATE DATABASE IF NOT EXISTS itversity_sms
%%sql

USE itversity_sms
%%sql

CREATE TABLE students (
    student_id INT,
    student_first_name STRING,
    student_last_name STRING,
    student_phone_numbers ARRAY<STRING>,
    student_address STRUCT<street:STRING, city:STRING, state:STRING, zip:STRING>
) STORED AS TEXTFILE
ROW FORMAT
    DELIMITED FIELDS TERMINATED BY '\t'
    COLLECTION ITEMS TERMINATED BY ','
%%sql

DESCRIBE students
%%sql

INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)
%%sql

SELECT * FROM students
%%sql

INSERT INTO students VALUES (2, 'Donald', 'Duck', ARRAY('1234567890', '2345678901'), NULL)
%%sql

SELECT * FROM students
%%sql

INSERT INTO students VALUES 
    (3, 'Mickey', 'Mouse', ARRAY('1234567890', '2345678901'), STRUCT('A Street', 'One City', 'Some State', '12345')),
    (4, 'Bubble', 'Guppy', ARRAY('5678901234', '6789012345'), STRUCT('Bubbly Street', 'Guppy', 'La la land', '45678'))
%%sql

SELECT * FROM students
val username = System.getProperty("user.name")
import sys.process._
s"hdfs dfs -ls /user/${username}/warehouse/${username}_sms.db/students"!
s"hdfs dfs -cat /user/${username}/warehouse/${username}_sms.db/students/*"!
  • Using Spark SQL with Python or Scala

spark.sql("DROP DATABASE IF EXISTS itversity_sms CASCADE")
spark.sql("CREATE DATABASE IF NOT EXISTS itversity_sms")
spark.sql("USE itversity_sms")
spark.sql("DROP TABLE IF EXISTS students")
spark.sql("""
CREATE TABLE students (
    student_id INT,
    student_first_name STRING,
    student_last_name STRING,
    student_phone_numbers ARRAY<STRING>,
    student_address STRUCT<street:STRING, city:STRING, state:STRING, zip:STRING>
) STORED AS TEXTFILE
ROW FORMAT
    DELIMITED FIELDS TERMINATED BY '\t'
    COLLECTION ITEMS TERMINATED BY ','
    MAP KEYS TERMINATED BY ':'
""")
spark.sql("INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)")
spark.sql("INSERT INTO students VALUES (2, 'Donald', 'Duck', ARRAY('1234567890', '2345678901'), NULL)")
spark.sql("""
INSERT INTO students VALUES 
    (3, 'Mickey', 'Mouse', ARRAY('1234567890', '2345678901'), STRUCT('A Street', 'One City', 'Some State', '12345')),
    (4, 'Bubble', 'Guppy', ARRAY('5678901234', '6789012345'), STRUCT('Bubbly Street', 'Guppy', 'La la land', '45678'))
""")
spark.sql("SELECT * FROM students")

3.3. Adding Comments

Let us understand how to create table with comments in Hive using orders as example.

  • We can specify comments for both columns as well as tables using COMMENT keyword.

%%sql

USE itversity_retail
%%sql

DROP TABLE IF EXISTS orders
%%sql

CREATE TABLE orders (
  order_id INT COMMENT 'Unique order id',
  order_date STRING COMMENT 'Date on which order is placed',
  order_customer_id INT COMMENT 'Customer id who placed the order',
  order_status STRING COMMENT 'Current status of the order'
) COMMENT 'Table to save order level details'
  • Using Spark SQL with Python or Scala

spark.sql("USE itversity_retail")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
  order_id STRING COMMENT 'Unique order id',
  order_date STRING COMMENT 'Date on which order is placed',
  order_customer_id INT COMMENT 'Customer id who placed the order',
  order_status STRING COMMENT 'Current status of the order'
) COMMENT 'Table to save order level details'
""")
  • Default field delimiter is \001 character.

  • We can see the comments using DESCRIBE orders or DESCRIBE FORMATTED orders.

%%sql

DESCRIBE orders
%%sql

DESCRIBE FORMATTED orders
spark.sql("DESCRIBE FORMATTED orders").show(200, false) // Scala

3.4. Loading Data into Tables - Local

Let us understand how to load data into Spark Metastore tables. We can load either from local file system or from HDFS.

  • Data should be in sync with Spark Metastore table structure.

  • We need to create table with the same file format and delimiters so that we can load the data in files into Spark Metastore tables.

  • Our data is in text files, line delimiter is new line character and field delimiter is comma.

  • As our table uses default file format (text file), default line/record delimiter and field delimiter is specified as comma, we should be able to load the data with out any issues.

  • Here is the script which will create table and then load data into the table.

%%sql

USE itversity_retail
%%sql

DROP TABLE orders
%%sql

CREATE TABLE orders (
  order_id INT COMMENT 'Unique order id',
  order_date STRING COMMENT 'Date on which order is placed',
  order_customer_id INT COMMENT 'Customer id who placed the order',
  order_status STRING COMMENT 'Current status of the order'
) COMMENT 'Table to save order level details'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders
  • Using Spark SQL with Python or Scala

spark.sql("USE itversity_retail")
spark.sql("DROP TABLE orders")
spark.sql("""
CREATE TABLE orders (
  order_id INT COMMENT 'Unique order id',
  order_date STRING COMMENT 'Date on which order is placed',
  order_customer_id INT COMMENT 'Customer id who placed the order',
  order_status STRING COMMENT 'Current status of the order'
) COMMENT 'Table to save order level details'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
""")
spark.sql("LOAD DATA LOCAL INPATH '/data/retail_db/orders' INTO TABLE orders")
  • Once the data is loaded we can run these queries to preview the data.

%%sql

SELECT * FROM orders LIMIT 10
%%sql

SELECT count(1) FROM orders

3.5. Loading Data into Tables - HDFS

Let us understand how we can load data from HDFS location into Spark Metastore table.

  • We can use load command with out LOCAL to get data from HDFS location into Spark Metastore Table.

  • User running load command from HDFS location need to have write permissions on the source location as data will be moved (deleted on source and copied to Spark Metastore table)

  • Make sure user have write permissions on the source location.

  • First we need to copy the data into HDFS location where user have write permissions.

import sys.process._
val username = System.getProperty("user.name")
s"hadoop fs -rm -R /user/${username}/retail_db/orders" !
s"hadoop fs -mkdir /user/${username}/retail_db" !
s"hadoop fs -put -f /data/retail_db/orders /user/${username}/retail_db" !
s"hadoop fs -ls /user/${username}/retail_db/orders" !
  • Here is the script which will truncate the table and then load the data from HDFS location to Hive table.

%%sql

USE itversity_retail
%%sql

TRUNCATE TABLE orders
%%sql

LOAD DATA INPATH '/user/itversity/retail_db/orders' 
  INTO TABLE orders
s"hadoop fs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
s"hadoop fs -ls /user/${username}/retail_db/orders" !
%%sql

SELECT * FROM orders LIMIT 10
%%sql

SELECT count(1) FROM orders
  • Using Spark SQL with Python or Scala

spark.sql("USE itversity_retail")
spark.sql("TRUNCATE TABLE orders")
spark.sql("""
LOAD DATA INPATH '/user/itversity/retail_db/orders' 
  INTO TABLE orders""")
s"hadoop fs -ls /user/${username}/retail_db/orders" !
spark.sql("SELECT * FROM orders LIMIT 10")
spark.sql("SELECT count(1) FROM orders")
  • If you look at /user/training/retail_db orders directory would have been deleted.

  • Move is much faster compared to copying the files by moving blocks around, hence Hive load command from HDFS location will always try to move files.

3.6. Loading Data - Append and Overwrite

Let us understand different approaches to load the data into Spark Metastore table.

  • INTO TABLE will append in the existing table

  • If we want to overwrite we have to specify OVERWRITE INTO TABLE

%%sql

USE itversity_retail
%%sql

SELECT count(1) FROM orders
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/orders' 
  INTO TABLE orders
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
%%sql

SELECT count(1) FROM orders
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/orders' 
  OVERWRITE INTO TABLE orders
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
%%sql

SELECT count(1) FROM orders
  • Using Spark SQL with Python or Scala

spark.sql("USE itversity_retail")
spark.sql("SELECT count(1) FROM orders").show()
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/orders' 
  INTO TABLE orders
""")
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
spark.sql("SELECT count(1) FROM orders").show()
spark.sql("""
LOAD DATA LOCAL INPATH '/data/retail_db/orders' 
  OVERWRITE INTO TABLE orders
""")
s"hdfs dfs -ls /user/${username}/warehouse/${username}_retail.db/orders" !
spark.sql("SELECT count(1) FROM orders").show()

3.7. Creating External Tables

Let us understand how to create external table in Spark Metastore using orders as example. Also we will see how to load data into external table.

  • We just need to add EXTERNAL keyword in the CREATE clause and LOCATION after STORED AS clause or just LOCATION as part of CREATE TABLE statement.

  • We can use same LOAD commands to get data from either local file system or HDFS which we have used for Managed table.

  • Once table is created we can run DESCRIBE FORMATTED orders to check the metadata of the table and confirm whether it is managed table or external table.

  • We need to specify the location while creating external tables.

Here is the script to create external table in Spark Metastore.

%%sql

USE itversity_retail
%%sql

DROP TABLE IF EXISTS orders
import sys.process._

val username = System.getProperty("user.name")
s"hdfs dfs -rm -R /user/${username}/external/retail_db/orders" !
s"hdfs dfs -mkdir -p /user/${username}/external/retail_db/orders" !
%%sql

CREATE EXTERNAL TABLE orders (
  order_id INT COMMENT 'Unique order id',
  order_date STRING COMMENT 'Date on which order is placed',
  order_customer_id INT COMMENT 'Customer id who placed the order',
  order_status STRING COMMENT 'Current status of the order'
) COMMENT 'Table to save order level details'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/itversity/external/retail_db/orders'
s"hdfs dfs -ls /user/${username}/external/retail_db/orders" !
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/orders' 
  INTO TABLE orders
s"hdfs dfs -ls /user/${username}/external/retail_db/orders" !
%%sql

SELECT * FROM orders LIMIT 10
%%sql

SELECT count(1) FROM orders
spark.sql("DESCRIBE FORMATTED orders").show(200, false)

3.8. Managed Tables vs. External Tables

Let us compare and contrast between Managed Tables and External Tables.

  • When we say EXTERNAL and specify LOCATION or LOCATION alone as part of CREATE TABLE, it makes the table EXTERNAL.

  • Rest of the syntax is same as Managed Table.

  • However, when we drop Managed Table, it will delete metadata from metastore as well as data from HDFS.

  • When we drop External Table, only metadata will be dropped, not the data.

  • Typically we use External Table when same dataset is processed by multiple frameworks such as Hive, Pig, Spark etc.

  • We cannot run TRUNCATE TABLE command against External Tables.

%%sql

USE itversity_retail
%%sql

SHOW tables
spark.sql("DESCRIBE FORMATTED orders").show(200, false)
%%sql

TRUNCATE TABLE orders
spark.sql("DESCRIBE FORMATTED order_items").show(200, false)
%%sql

TRUNCATE TABLE order_items
%%sql

DROP TABLE orders
%%sql

DROP TABLE order_items
import sys.process._

s"hdfs dfs -ls /user/${username}/retail_db/orders" !

3.9. Overview of File Formats

Let us go through the details about different file formats supported by STORED AS Clause.

  • Go to this page and review supported file formats.

  • Supported File Formats

    • TEXTFILE

    • ORC

    • PARQUET

    • AVRO

    • SEQUENCEFILE - is not important

    • JSONFILE - only available in recent vesions of Hive.

    • and more

  • We can even specify custom file formats (out of scope)

%%sql

DROP DATABASE IF EXISTS itversity_sms CASCADE
%%sql

CREATE DATABASE IF NOT EXISTS itversity_sms
%%sql

USE itversity_sms
%%sql

CREATE TABLE students (
    student_id INT,
    student_first_name STRING,
    student_last_name STRING,
    student_phone_numbers ARRAY<STRING>,
    student_address STRUCT<street:STRING, city:STRING, state:STRING, zip:STRING>
) STORED AS parquet
%%sql

INSERT INTO students VALUES (1, 'Scott', 'Tiger', NULL, NULL)
%%sql

INSERT INTO students VALUES (2, 'Donald', 'Duck', ARRAY('1234567890', '2345678901'), NULL)
%%sql

INSERT INTO students VALUES 
    (3, 'Mickey', 'Mouse', ARRAY('1234567890', '2345678901'), STRUCT('A Street', 'One City', 'Some State', '12345')),
    (4, 'Bubble', 'Guppy', ARRAY('5678901234', '6789012345'), STRUCT('Bubbly Street', 'Guppy', 'La la land', '45678'))
%%sql

SELECT * FROM students
import sys.process._
val username = System.getProperty("user.name")
s"hdfs dfs -ls /user/${username}/warehouse/${username}_sms.db/students"!

3.10. Dropping Tables and Databases

Let us understand how to DROP Spark Metastore Tables as well as Databases.

  • We can use DROP TABLE command to drop the table.. Let us drop orders table

%%sql

CREATE DATABASE IF NOT EXISTS itversity_retail
%%sql

USE itversity_retail
%%sql

SHOW tables
%%sql

CREATE TABLE IF NOT EXISTS orders (
  order_id INT,
  order_date STRING,
  order_customer_id INT,
  order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql

DROP TABLE orders
%%sql

DROP TABLE IF EXISTS orders
  • DROP TABLE on managed table will delete both metadata in metastore as well as data in HDFS, while DROP TABLE on external table will only delete metadata in metastore.

  • We can drop database by using DROP DATABASE Command. However we need to drop all the tables in the database first.

  • Here is the example to drop the database itversity_retail - DROP DATABASE itversity_retail

  • We can also drop all the tables and databases by adding CASCADE.

%%sql

DROP DATABASE itversity_retail
%%sql

DROP DATABASE IF EXISTS itversity_retail CASCADE

3.11. Truncating Tables

Let us understand how to truncate tables.

  • TRUNCATE works only for managed tables. Only data will be deleted, structure will be retained.

  • Launch Spark SQL

%%sql

CREATE DATABASE IF NOT EXISTS itversity_retail
%%sql

SHOW tables
%%sql

DROP TABLE IF EXISTS orders
%%sql

CREATE TABLE orders (
  order_id INT,
  order_date STRING,
  order_customer_id INT,
  order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/orders'
  INTO TABLE orders
%%sql

SELECT * FROM orders LIMIT 10
%%sql

TRUNCATE TABLE orders
%%sql

SELECT * FROM orders LIMIT 10
%%sql

DROP TABLE IF EXISTS orders
%%sql

CREATE EXTERNAL TABLE orders (
  order_id INT,
  order_date STRING,
  order_customer_id INT,
  order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/user/itversity/external/retail_db/orders'
%%sql

LOAD DATA LOCAL INPATH '/data/retail_db/orders'
  OVERWRITE INTO TABLE orders
%%sql

SELECT * FROM orders LIMIT 10
%%sql

TRUNCATE TABLE orders

3.12. Managed Tables - Exercise

Let us use NYSE data and see how we can create tables in Spark Metastore.

  • Duration: 30 Minutes

  • Data Location (Local): /data/nyse_all/nyse_data

  • Create a database with the name - YOUR_OS_USER_NAME_nyse

  • Table Name: nyse_eod

  • File Format: TEXTFILE (default)

  • Review the files by running Linux commands before using data sets. Data is compressed and we can load the files as is.

  • Copy one of the zip file to your home directory and preview the data. There should be 7 fields. You need to determine the delimiter.

  • Field Names: stockticker, tradedate, openprice, highprice, lowprice, closeprice, volume. For example, you need to use BIGINT for volume not INT.

  • Determine correct data types based on the values

  • Create Managed table with default Delimiter.

As delimiters in data and table are not same, you need to figure out how to get data into the target table.

  • Make sure the data is copied into the table as per the structure defined and validate.

3.12.1. Validation

Run the following queries to ensure that you will be able to read the data.

DESCRIBE FORMATTED YOUR_OS_USER_NAME_nyse.nyse_eod;
SELECT * FROM YOUR_OS_USER_NAME_nyse.nyse_eod LIMIT 10
SELECT count(1) FROM YOUR_OS_USER_NAME_nyse.nyse_eod;
// There should not be field delimiter as the requirement is to use default delimiter
spark.sql("DESCRIBE FORMATTED itversity_nyse.nyse_eod").show(200, false)
%%sql

SELECT * FROM itversity_nyse.nyse_eod LIMIT 10
%%sql

SELECT count(1) FROM itversity_nyse.nyse_eod