Writing with SQL

INSERT OVERWRITE

INSERT OVERWRITE can replace the partition in a table with the results of a query.

The default overwrite mode of Spark is Static, you can change the overwrite mode by

  1. SET spark.sql.sources.partitionOverwriteMode=dynamic

To demonstrate the behavior of dynamic and static overwrites, a test table is defined using the following DDL:

  1. CREATE TABLE arctic_catalog.db.sample (
  2. id int,
  3. data string,
  4. ts timestamp,
  5. primary key (id))
  6. USING arctic
  7. PARTITIONED BY (days(ts))

When Spark’s overwrite mode is dynamic, the partitions of the rows generated by the SELECT query will be replaced.

  1. INSERT OVERWRITE arctic_catalog.db.sample values
  2. (1, 'aaa', timestamp(' 2022-1-1 09:00:00 ')),
  3. (2, 'bbb', timestamp(' 2022-1-2 09:00:00 ')),
  4. (3, 'ccc', timestamp(' 2022-1-3 09:00:00 '))

When Spark’s overwrite mode is static, the PARTITION clause will be translated into the result set of the SELECT from the table. If the PARTITION clause is omitted, all partitions will be replaced.

  1. INSERT OVERWRITE arctic_catalog.db.sample
  2. partition( dt = '2021-1-1') values
  3. (1, 'aaa'), (2, 'bbb'), (3, 'ccc')

In Static mode, it is not supported to define transforms on partitioning columns.

You can enable uniqueness check of the primary key on the source table by setting spark.sql.arctic.check-source-data-uniqueness.enabled = true in SPARK SQL. If there are duplicate primary keys, an error will be thrown during the write operation.

INSERT INTO

To append new data to a table, use INSERT INTO.

  1. INSERT INTO arctic_catalog.db.sample VALUES (1, 'a'), (2, 'b')
  2. INSERT INTO prod.db.table SELECT ...

Upsert to table with primary keys.

To add new data to a table with a primary key, you can control whether to enable the UPSERT function by setting the write.upsert.enabled parameter.

When UPSERT is enabled, if a row with the same primary key already exists, an UPDATE operation will be performed, and if it does not exist, an INSERT operation will be performed.

When UPSERT is disabled, only INSERT operation will be performed, even if there are rows with the same primary key in the table.

  1. CREATE TABLE arctic_catalog.db.keyedTable (
  2. id int,
  3. data string,
  4. primary key (id))
  5. USING arctic
  6. TBLPROPERTIES ('write.upsert.enabled' = 'true')
  1. INSERT INTO arctic_catalog.db.keyedTable VALUES (1, 'a'), (2, 'b')
  2. INSERT INTO prod.db.keyedTable SELECT ...

You can enable uniqueness check of the primary key on the source table by setting spark.sql.arctic.check-source-data-uniqueness.enabled = true in SPARK SQL. If there are duplicate primary keys, an error will be thrown during the write operation.

DELETE FROM

The DELETE FROM statements delete rows from table.

  1. DELETE FROM arctic_catalog.db.sample
  2. WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'
  3. DELETE FROM arctic_catalog.db.sample
  4. WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)
  5. DELETE FROM arctic_catalog.db.sample AS t1
  6. WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)

UPDATE

The UPDATE statement modifies rows in the table.

  1. UPDATE arctic_catalog.db.sample
  2. SET c1 = 'update_c1', c2 = 'update_c2'
  3. WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'
  4. UPDATE arctic_catalog.db.sample
  5. SET session_time = 0, ignored = true
  6. WHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)
  7. UPDATE arctic_catalog.db.sample AS t1
  8. SET order_status = 'returned'
  9. WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)

MERGE INTO

  1. MERGE INTO prod.db.target t -- a target table
  2. USING (SELECT ...) s -- the source updates
  3. ON t.id = s.id -- condition to find updates for target rows
  4. WHEN ... -- updates

The MERGE INTO statement supports multi action WHEN MATCHED ... THEN ... to execute UPDATE, DELETE, INSERT.

  1. MERGE INTO prod.db.target t
  2. USING prod.db.source s
  3. ON t.id = s.id
  4. WHEN MATCHED AND s.op = 'delete' THEN DELETE
  5. WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
  6. WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
  7. WHEN NOT MATCHED THEN INSERT *

Writing with DataFrames

Appending data

Using append() to add data to a MixedFormat table.

  1. val data: DataFrame = ...
  2. data.writeTo("arctic_catalog.db.sample").append()

Overwriting data

Using overwritePartitions() to overwriting data.

  1. val data: DataFrame = ...
  2. data.writeTo("arctic_catalog.db.sample").overwritePartitions()

Creating tables

The create() will create a table and write data to the table, just like CREATE TABLE AS SELECT

  1. val data: DataFrame = ...
  2. data.writeTo("arctic_catalog.db.sample").create()

The primary keys and partition keys could be specified by partitionBy() and option("primary.keys", "'xxx'").

  1. val data: DataFrame = ...
  2. data.write().format("arctic")
  3. .partitionBy("data")
  4. .option("primary.keys", "'xxx'")
  5. .save("arctic_catalog.db.sample")