Writing with SQL
INSERT OVERWRITE
INSERT OVERWRITE can replace the partition in a table with the results of a query.
The default overwrite mode of Spark is Static, you can change the overwrite mode by
SET spark.sql.sources.partitionOverwriteMode=dynamic
To demonstrate the behavior of dynamic and static overwrites, a test table is defined using the following DDL:
CREATE TABLE arctic_catalog.db.sample (id int,data string,ts timestamp,primary key (id))USING arcticPARTITIONED BY (days(ts))
When Spark’s overwrite mode is dynamic, the partitions of the rows generated by the SELECT query will be replaced.
INSERT OVERWRITE arctic_catalog.db.sample values(1, 'aaa', timestamp(' 2022-1-1 09:00:00 ')),(2, 'bbb', timestamp(' 2022-1-2 09:00:00 ')),(3, 'ccc', timestamp(' 2022-1-3 09:00:00 '))
When Spark’s overwrite mode is static, the PARTITION clause will be translated into the result set of the SELECT from the table. If the PARTITION clause is omitted, all partitions will be replaced.
INSERT OVERWRITE arctic_catalog.db.samplepartition( dt = '2021-1-1') values(1, 'aaa'), (2, 'bbb'), (3, 'ccc')
In Static mode, it is not supported to define transforms on partitioning columns.
You can enable uniqueness check of the primary key on the source table by setting
spark.sql.arctic.check-source-data-uniqueness.enabled = truein SPARK SQL. If there are duplicate primary keys, an error will be thrown during the write operation.
INSERT INTO
To append new data to a table, use INSERT INTO.
INSERT INTO arctic_catalog.db.sample VALUES (1, 'a'), (2, 'b')INSERT INTO prod.db.table SELECT ...
Upsert to table with primary keys.
To add new data to a table with a primary key, you can control whether to enable the UPSERT function by setting
the write.upsert.enabled parameter.
When UPSERT is enabled, if a row with the same primary key already exists, an UPDATE operation will be performed,
and if it does not exist, an INSERT operation will be performed.
When UPSERT is disabled, only INSERT operation will be performed,
even if there are rows with the same primary key in the table.
CREATE TABLE arctic_catalog.db.keyedTable (id int,data string,primary key (id))USING arcticTBLPROPERTIES ('write.upsert.enabled' = 'true')
INSERT INTO arctic_catalog.db.keyedTable VALUES (1, 'a'), (2, 'b')INSERT INTO prod.db.keyedTable SELECT ...
You can enable uniqueness check of the primary key on the source table by setting
spark.sql.arctic.check-source-data-uniqueness.enabled = truein SPARK SQL. If there are duplicate primary keys, an error will be thrown during the write operation.DELETE FROM
The DELETE FROM statements delete rows from table.
DELETE FROM arctic_catalog.db.sampleWHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'DELETE FROM arctic_catalog.db.sampleWHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)DELETE FROM arctic_catalog.db.sample AS t1WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
UPDATE
The UPDATE statement modifies rows in the table.
UPDATE arctic_catalog.db.sampleSET c1 = 'update_c1', c2 = 'update_c2'WHERE ts >= '2020-05-01 00:00:00' and ts < '2020-06-01 00:00:00'UPDATE arctic_catalog.db.sampleSET session_time = 0, ignored = trueWHERE session_time < (SELECT min(session_time) FROM prod.db.good_events)UPDATE arctic_catalog.db.sample AS t1SET order_status = 'returned'WHERE EXISTS (SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
MERGE INTO
MERGE INTO prod.db.target t -- a target tableUSING (SELECT ...) s -- the source updatesON t.id = s.id -- condition to find updates for target rowsWHEN ... -- updates
The MERGE INTO statement supports multi action WHEN MATCHED ... THEN ... to execute UPDATE, DELETE, INSERT.
MERGE INTO prod.db.target tUSING prod.db.source sON t.id = s.idWHEN MATCHED AND s.op = 'delete' THEN DELETEWHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1WHEN NOT MATCHED THEN INSERT *
Writing with DataFrames
Appending data
Using append() to add data to a MixedFormat table.
val data: DataFrame = ...data.writeTo("arctic_catalog.db.sample").append()
Overwriting data
Using overwritePartitions() to overwriting data.
val data: DataFrame = ...data.writeTo("arctic_catalog.db.sample").overwritePartitions()
Creating tables
The create() will create a table and write data to the table, just like CREATE TABLE AS SELECT
val data: DataFrame = ...data.writeTo("arctic_catalog.db.sample").create()
The primary keys and partition keys could be specified by partitionBy() and option("primary.keys", "'xxx'").
val data: DataFrame = ...data.write().format("arctic").partitionBy("data").option("primary.keys", "'xxx'").save("arctic_catalog.db.sample")
