Spark SQL

SparkSQL provides several Data Manipulation Language (DML) actions for interacting with Hudi tables. These operations allow you to insert, update, merge and delete data from your Hudi tables. Let’s explore them one by one.

Please refer to SQL DDL for creating Hudi tables using SQL.

Insert Into

You can use the INSERT INTO statement to add data to a Hudi table using Spark SQL. Here are some examples:

  1. INSERT INTO <table>
  2. SELECT <columns> FROM <source>;

From 0.14.0, hoodie.sql.bulk.insert.enable and hoodie.sql.insert.mode are deprecated. Users are expected to use hoodie.spark.sql.insert.into.operation instead. To manage duplicates with INSERT INTO, please check out insert dup policy config.

Examples:

  1. -- Insert into a copy-on-write (COW) Hudi table
  2. INSERT INTO hudi_cow_nonpcf_tbl SELECT 1, 'a1', 20;
  3. -- Insert into a merge-on-read (MOR) Hudi table
  4. INSERT INTO hudi_mor_tbl SELECT 1, 'a1', 20, 1000;
  5. -- Insert into a COW Hudi table with static partition
  6. INSERT INTO hudi_cow_pt_tbl PARTITION(dt = '2021-12-09', hh='11') SELECT 2, 'a2', 1000;
  7. -- Insert into a COW Hudi table with dynamic partition
  8. INSERT INTO hudi_cow_pt_tbl PARTITION(dt, hh) SELECT 1 AS id, 'a1' AS name, 1000 AS ts, '2021-12-09' AS dt, '10' AS hh;

Mapping to write operations

Hudi offers flexibility in choosing the underlying write operation of a INSERT INTO statement using the hoodie.spark.sql.insert.into.operation configuration. Possible options include “bulk_insert” (large inserts), “insert” (with small file management), and “upsert” (with deduplication/merging). If a precombine field is not set, “insert” is chosen as the default. For a table with preCombine field set, “upsert” is chosen as the default operation.

Insert Overwrite

The INSERT OVERWRITE statement is used to replace existing data in a Hudi table.

  1. INSERT OVERWRITE <table>
  2. SELECT <columns> FROM <source>;

All existing partitions that are affected by the INSERT OVERWRITE statement will replaced with the source data. Here are some examples:

  1. -- Overwrite non-partitioned table
  2. INSERT OVERWRITE hudi_mor_tbl SELECT 99, 'a99', 20.0, 900;
  3. INSERT OVERWRITE hudi_cow_nonpcf_tbl SELECT 99, 'a99', 20.0;
  4. -- Overwrite partitioned table with dynamic partition
  5. INSERT OVERWRITE TABLE hudi_cow_pt_tbl SELECT 10, 'a10', 1100, '2021-12-09', '10';
  6. -- Overwrite partitioned table with static partition
  7. INSERT OVERWRITE hudi_cow_pt_tbl PARTITION(dt = '2021-12-09', hh='12') SELECT 13, 'a13', 1100;

Update

You can use the UPDATE statement to modify existing data in a Hudi table directly.

  1. UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]

Here’s an example:

  1. -- Update data in a Hudi table
  2. UPDATE hudi_mor_tbl SET price = price * 2, ts = 1111 WHERE id = 1;
  3. -- Update data in a partitioned Hudi table
  4. UPDATE hudi_cow_pt_tbl SET name = 'a1_1', ts = 1001 WHERE id = 1;
  5. -- update using non-PK field
  6. update hudi_cow_pt_tbl set ts = 1001 where name = 'a1';

The UPDATE operation requires the specification of a preCombineField.

Merge Into

The MERGE INTO statement allows you to perform more complex updates and merges against source data. The MERGE INTO statement is similar to the UPDATE statement, but it allows you to specify different actions for matched and unmatched records.

  1. MERGE INTO tableIdentifier AS target_alias
  2. USING (sub_query | tableIdentifier) AS source_alias
  3. ON <merge_condition>
  4. [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
  5. [ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
  6. <merge_condition> =A equal bool condition
  7. <matched_action> =
  8. DELETE |
  9. UPDATE SET * |
  10. UPDATE SET column1 = expression1 [, column2 = expression2 ...]
  11. <not_matched_action> =
  12. INSERT * |
  13. INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

WHEN NOT MATCHED clauses specify the action to perform if the values do not match. There are two kinds of INSERT clauses:

  1. INSERT * clauses require that the source table has the same columns as those in the target table.
  2. INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...]) clauses do not require to specify all the columns of the target table. For unspecified target columns, insert the NULL value.

Examples below

  1. -- source table using hudi for testing merging into non-partitioned table
  2. create table merge_source (id int, name string, price double, ts bigint) using hudi
  3. tblproperties (primaryKey = 'id', preCombineField = 'ts');
  4. insert into merge_source values (1, "old_a1", 22.22, 900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);
  5. merge into hudi_mor_tbl as target
  6. using merge_source as source
  7. on target.id = source.id
  8. when matched then update set *
  9. when not matched then insert *
  10. ;
  11. -- source table using parquet for testing merging into partitioned table
  12. create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
  13. insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');
  14. MERGE into hudi_cow_pt_tbl as target
  15. using (
  16. select id, name, '1000' as ts, flag, dt, hh from merge_source2
  17. ) source
  18. on target.id = source.id
  19. when matched and flag != 'delete' then
  20. update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
  21. when matched and flag = 'delete' then delete
  22. when not matched then
  23. insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
  24. ;

Key requirements

For a Hudi table with user configured primary keys, the join condition in Merge Into is expected to contain the primary keys of the table. For a Table where Hudi auto generates primary keys, the join condition in MIT can be on any arbitrary data columns.

Delete From

You can remove data from a Hudi table using the DELETE FROM statement.

  1. DELETE FROM tableIdentifier [ WHERE boolExpression ]

Examples below

  1. -- Delete data from a Hudi table
  2. DELETE FROM hudi_cow_nonpcf_tbl WHERE uuid = 1;
  3. -- Delete data from a MOR Hudi table based on a condition
  4. DELETE FROM hudi_mor_tbl WHERE id % 2 = 0;
  5. -- Delete data using a non-primary key field
  6. DELETE FROM hudi_cow_pt_tbl WHERE name = 'a1';

Data Skipping and Indexing

DML operations can be sped up using column statistics for data skipping and using indexes to reduce the amount of data scanned. For e.g. the following helps speed up the DELETE operation on a Hudi table, by using the record level index.

  1. SET hoodie.enable.data.skipping=true;
  2. SET hoodie.metadata.record.index.enable=true;
  3. SET hoodie.metadata.enable=true;
  4. DELETE from hudi_table where uuid = 'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa';

These DML operations give you powerful tools for managing your tables using Spark SQL. You can control the behavior of these operations using various configuration options, as explained in the documentation.

Flink SQL provides several Data Manipulation Language (DML) actions for interacting with Hudi tables. These operations allow you to insert, update and delete data from your Hudi tables. Let’s explore them one by one.

Insert Into

You can utilize the INSERT INTO statement to incorporate data into a Hudi table using Flink SQL. Here are a few illustrative examples:

  1. INSERT INTO <table>
  2. SELECT <columns> FROM <source>;

Examples:

  1. -- Insert into a Hudi table
  2. INSERT INTO hudi_table SELECT 1, 'a1', 20;

If the write.operation is ‘upsert,’ the INSERT INTO statement will not only insert new records but also update existing rows with the same record key.

  1. -- Insert into a Hudi table in upsert mode
  2. INSERT INTO hudi_table/*+ OPTIONS('write.operation'='upsert')*/ SELECT 1, 'a1', 20;

Update

With Flink SQL, you can use update command to update the hudi table. Here are a few illustrative examples:

  1. UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
  1. UPDATE hudi_table SET price = price * 2, ts = 1111 WHERE id = 1;

Key requirements

Update query only work with batch excution mode.

Delete From

With Flink SQL, you can use delete command to delete the rows from hudi table. Here are a few illustrative examples:

  1. DELETE FROM tableIdentifier [ WHERE boolExpression ]
  1. DELETE FROM hudi_table WHERE price < 100;
  1. DELETE FROM hudi_table WHERE price < 100;

Key requirements

Delete query only work with batch excution mode.

Setting Writer/Reader Configs

With Flink SQL, you can additionally set the writer/reader writer configs along with the query.

  1. INSERT INTO hudi_table/*+ OPTIONS('${hoodie.config.key1}'='${hoodie.config.value1}')*/
  1. INSERT INTO hudi_table/*+ OPTIONS('hoodie.keep.max.commits'='true')*/

The hudi-flink module defines the Flink SQL connector for both hudi source and sink. There are a number of options available for the sink table:

Option Name Required Default Remarks
path Y N/A Base path for the target hoodie table. The path would be created if it does not exist, otherwise a hudi table expects to be initialized successfully
table.type N COPY_ON_WRITE Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ
write.operation N upsert The write operation, that this write should do (insert or upsert is supported)
write.precombine.field N ts Field used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)
write.payload.class N OverwriteWithLatestAvroPayload.class Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. This will render any value set for the option in-effective
write.insert.drop.duplicates N false Flag to indicate whether to drop duplicates upon insert. By default insert will accept duplicates, to gain extra performance
write.ignore.failed N true Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. By default true (in favor of streaming progressing over data integrity)
hoodie.datasource.write.recordkey.field N uuid Record key field. Value to be used as the recordKey component of HoodieKey. Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using the dot notation eg: a.b.c
hoodie.datasource.write.keygenerator.class N SimpleAvroKeyGenerator.class Key generator class, that implements will extract the key out of incoming record
write.tasks N 4 Parallelism of tasks that do actual write, default is 4
write.batch.size.MB N 128 Batch buffer size in MB to flush data into the underneath filesystem

If the table type is MERGE_ON_READ, you can also specify the asynchronous compaction strategy through options:

Option Name Required Default Remarks
compaction.async.enabled N true Async Compaction, enabled by default for MOR
compaction.trigger.strategy N num_commits Strategy to trigger compaction, options are ‘num_commits’: trigger compaction when reach N delta commits; ‘time_elapsed’: trigger compaction when time elapsed > N seconds since last compaction; ‘num_and_time’: trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; ‘num_or_time’: trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is ‘num_commits’
compaction.delta_commits N 5 Max delta commits needed to trigger compaction, default 5 commits
compaction.delta_seconds N 3600 Max delta seconds time needed to trigger compaction, default 1 hour

You can write the data using the SQL INSERT INTO statements:

  1. INSERT INTO hudi_table select ... from ...;

Note: INSERT OVERWRITE is not supported yet but already on the roadmap.

Non-Blocking Concurrency Control (Experimental)

Hudi Flink supports a new non-blocking concurrency control mode, where multiple writer tasks can be executed concurrently without blocking each other. One can read more about this mode in the concurrency control docs. Let us see it in action here.

In the below example, we have two streaming ingestion pipelines that concurrently update the same table. One of the pipeline is responsible for the compaction and cleaning table services, while the other pipeline is just for data ingestion.

In order to commit the dataset, the checkpoint needs to be enabled, here is an example configuration for a flink-conf.yaml:

  1. -- set the interval as 30 seconds
  2. execution.checkpointing.interval: 30000
  3. state.backend: rocksdb
  1. -- This is a datagen source that can generate records continuously
  2. CREATE TABLE sourceT (
  3. uuid varchar(20),
  4. name varchar(10),
  5. age int,
  6. ts timestamp(3),
  7. `partition` as 'par1'
  8. ) WITH (
  9. 'connector' = 'datagen',
  10. 'rows-per-second' = '200'
  11. );
  12. -- pipeline1: by default enable the compaction and cleaning services
  13. CREATE TABLE t1(
  14. uuid varchar(20),
  15. name varchar(10),
  16. age int,
  17. ts timestamp(3),
  18. `partition` varchar(20)
  19. ) WITH (
  20. 'connector' = 'hudi',
  21. 'path' = '${work_path}/hudi-demo/t1',
  22. 'table.type' = 'MERGE_ON_READ',
  23. 'index.type' = 'BUCKET',
  24. 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
  25. 'write.tasks' = '2'
  26. );
  27. -- pipeline2: disable the compaction and cleaning services manually
  28. CREATE TABLE t1_2(
  29. uuid varchar(20),
  30. name varchar(10),
  31. age int,
  32. ts timestamp(3),
  33. `partition` varchar(20)
  34. ) WITH (
  35. 'connector' = 'hudi',
  36. 'path' = '${work_path}/hudi-demo/t1',
  37. 'table.type' = 'MERGE_ON_READ',
  38. 'index.type' = 'BUCKET',
  39. 'hoodie.write.concurrency.mode' = 'NON_BLOCKING_CONCURRENCY_CONTROL',
  40. 'write.tasks' = '2',
  41. 'compaction.schedule.enabled' = 'false',
  42. 'compaction.async.enabled' = 'false',
  43. 'clean.async.enabled' = 'false'
  44. );
  45. -- submit the pipelines
  46. insert into t1 select * from sourceT;
  47. insert into t1_2 select * from sourceT;
  48. select * from t1 limit 20;

As you can see from the above example, we have two pipelines with multiple tasks that concurrently write to the same table. To use the new concurrency mode, all you need to do is set the hoodie.write.concurrency.mode to NON_BLOCKING_CONCURRENCY_CONTROL. The write.tasks option is used to specify the number of write tasks that will be used for writing to the table. The compaction.schedule.enabled, compaction.async.enabled and clean.async.enabled options are used to disable the compaction and cleaning services for the second pipeline. This is done to ensure that the compaction and cleaning services are not executed twice for the same table.

Consistent hashing index (Experimental)

We have introduced the Consistent Hashing Index since 0.13.0 release. In comparison to the static hashing index (Bucket Index), the consistent hashing index offers dynamic scalability of data buckets for the writer. You can find the RFC for the design of this feature. In the 0.13.X release, the Consistent Hashing Index is supported only for Spark engine. And since release 0.14.0, the index is supported for Flink engine.

To utilize this feature, configure the option index.type as BUCKET and set hoodie.index.bucket.engine to CONSISTENT_HASHING. When enabling the consistent hashing index, it’s important to enable clustering scheduling within the writer. During this process, the writer will perform dual writes for both the old and new data buckets while the clustering is pending. Although the dual write does not impact correctness, it is strongly recommended to execute clustering as quickly as possible.

In the below example, we will create a datagen source and do streaming ingestion into Hudi table with consistent bucket index. In order to commit the dataset, the checkpoint needs to be enabled, here is an example configuration for a flink-conf.yaml:

  1. -- set the interval as 30 seconds
  2. execution.checkpointing.interval: 30000
  3. state.backend: rocksdb
  1. -- This is a datagen source that can generate records continuously
  2. CREATE TABLE sourceT (
  3. uuid varchar(20),
  4. name varchar(10),
  5. age int,
  6. ts timestamp(3),
  7. `partition` as 'par1'
  8. ) WITH (
  9. 'connector' = 'datagen',
  10. 'rows-per-second' = '200'
  11. );
  12. -- Create the hudi table with consistent bucket index
  13. CREATE TABLE t1(
  14. uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  15. name VARCHAR(10),
  16. age INT,
  17. ts TIMESTAMP(3),
  18. `partition` VARCHAR(20)
  19. )
  20. PARTITIONED BY (`partition`)
  21. WITH (
  22. 'connector'='hudi',
  23. 'path' = '${work_path}/hudi-demo/hudiT',
  24. 'table.type' = 'MERGE_ON_READ',
  25. 'index.type' = 'BUCKET',
  26. 'clustering.schedule.enabled'='true',
  27. 'hoodie.index.bucket.engine'='CONSISTENT_HASHING',
  28. 'hoodie.clustering.plan.strategy.class'='org.apache.hudi.client.clustering.plan.strategy.FlinkConsistentBucketClusteringPlanStrategy',
  29. 'hoodie.clustering.execution.strategy.class'='org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy',
  30. 'hoodie.bucket.index.num.buckets'='8',
  31. 'hoodie.bucket.index.max.num.buckets'='128',
  32. 'hoodie.bucket.index.min.num.buckets'='8',
  33. 'hoodie.bucket.index.split.threshold'='1.5',
  34. 'write.tasks'='2'
  35. );
  36. -- submit the pipelines
  37. insert into t1 select * from sourceT;
  38. select * from t1 limit 20;

Consistent Hashing Index is supported for Flink engine since release 0.14.0 and currently there are some limitations to use it as of 0.14.0:

  • This index is supported only for MOR table. This limitation also exists even if using Spark engine.
  • It does not work with metadata table enabled. This limitation also exists even if using Spark engine.
  • Consistent hashing index does not work with bulk-insert using Flink engine yet, please use simple bucket index or Spark engine for bulk-insert pipelines.
  • The resize plan which generated by Flink engine only supports merging small file groups, the file splitting is not supported yet.
  • The resize plan should be executed through an offline Spark job. Flink engine does not support execute resize plan yet.