Spark DataSource API

The hudi-spark module offers the DataSource API to write a Spark DataFrame into a Hudi table.

There are a number of options available:

HoodieWriteConfig:

TABLE_NAME

DataSourceWriteOptions:

RECORDKEY_FIELD: Primary key field(s). Record keys uniquely identify a record/row within each partition. If one wants to have a global uniqueness, there are two options. You could either make the dataset non-partitioned, or, you can leverage Global indexes to ensure record keys are unique irrespective of the partition path. Record keys can either be a single column or refer to multiple columns. KEYGENERATOR_CLASS_OPT_KEY property should be set accordingly based on whether it is a simple or complex key. For eg: "col1" for simple field, "col1,col2,col3,etc" for complex field. Nested fields can be specified using the dot notation eg: a.b.c.
Default value: "uuid"

PARTITIONPATH_FIELD: Columns to be used for partitioning the table. To prevent partitioning, provide empty string as value eg: "". Specify partitioning/no partitioning using KEYGENERATOR_CLASS_OPT_KEY. If partition path needs to be url encoded, you can set URL_ENCODE_PARTITIONING_OPT_KEY. If synchronizing to hive, also specify using HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY.
Default value: "partitionpath"

PRECOMBINE_FIELD: When two records within the same batch have the same key value, the record with the largest value from the field specified will be choosen. If you are using default payload of OverwriteWithLatestAvroPayload for HoodieRecordPayload (WRITE_PAYLOAD_CLASS), an incoming record will always takes precendence compared to the one in storage ignoring this PRECOMBINE_FIELD_OPT_KEY.
Default value: "ts"

OPERATION: The write operations to use.
Available values:
"upsert" (default), "bulk_insert", "insert", "delete"

TABLE_TYPE: The type of table to write to. Note: After the initial creation of a table, this value must stay consistent when writing to (updating) the table using the Spark SaveMode.Append mode.
Available values:
COW_TABLE_TYPE_OPT_VAL (default), MOR_TABLE_TYPE_OPT_VAL

KEYGENERATOR_CLASS_NAME: Refer to Key Generation section below.

Example: Upsert a DataFrame, specifying the necessary field names for recordKey => _row_key, partitionPath => partition, and precombineKey => timestamp

  1. inputDF.write()
  2. .format("hudi")
  3. .options(clientOpts) //Where clientOpts is of type Map[String, String]. clientOpts can include any other options necessary.
  4. .option("hoodie.datasource.write.recordkey.field", "_row_key")
  5. .option("hoodie.datasource.write.partitionpath.field", "partition")
  6. .option("hoodie.datasource.write.precombine.field"(), "timestamp")
  7. .option("hoodie.table.name", tableName)
  8. .mode(SaveMode.Append)
  9. .save(basePath);

Scala

Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.

  1. // spark-shell
  2. val inserts = convertToStringList(dataGen.generateInserts(10))
  3. val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
  4. df.write.format("hudi").
  5. options(getQuickstartWriteConfigs).
  6. option("hoodie.datasource.write.precombine.field", "ts").
  7. option("hoodie.datasource.write.recordkey.field", "uuid").
  8. option("hoodie.datasource.write.partitionpath.field", "partitionpath").
  9. option("hoodie.table.name", tableName).
  10. mode(Overwrite).
  11. save(basePath)

mode(Overwrite) overwrites and recreates the table if it already exists. You can check the data generated under /tmp/hudi_trips_cow/<region>/<country>/<city>/. We provided a record key (uuid in schema), partition field (region/country/city) and combine logic (ts in schema) to ensure trip records are unique within each partition. For more info, refer to Modeling data stored in Hudi and for info on ways to ingest data into Hudi, refer to Writing Hudi Tables. Here we are using the default write operation : upsert. If you have a workload without updates, you can also issue insert or bulk_insert operations which could be faster. To know more, refer to Write operations

Python

Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.

  1. # pyspark
  2. inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
  3. df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
  4. hudi_options = {
  5. 'hoodie.table.name': tableName,
  6. 'hoodie.datasource.write.recordkey.field': 'uuid',
  7. 'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  8. 'hoodie.datasource.write.table.name': tableName,
  9. 'hoodie.datasource.write.operation': 'upsert',
  10. 'hoodie.datasource.write.precombine.field': 'ts',
  11. 'hoodie.upsert.shuffle.parallelism': 2,
  12. 'hoodie.insert.shuffle.parallelism': 2
  13. }
  14. df.write.format("hudi").
  15. options(**hudi_options).
  16. mode("overwrite").
  17. save(basePath)

mode(Overwrite) overwrites and recreates the table if it already exists. You can check the data generated under /tmp/hudi_trips_cow/<region>/<country>/<city>/. We provided a record key (uuid in schema), partition field (region/country/city) and combine logic (ts in schema) to ensure trip records are unique within each partition. For more info, refer to Modeling data stored in Hudi and for info on ways to ingest data into Hudi, refer to Writing Hudi Tables. Here we are using the default write operation : upsert. If you have a workload without updates, you can also issue insert or bulk_insert operations which could be faster. To know more, refer to Write operations

SparkSQL

  1. insert into h0 select 1, 'a1', 20;
  2. -- insert static partition
  3. insert into h_p0 partition(dt = '2021-01-02') select 1, 'a1';
  4. -- insert dynamic partition
  5. insert into h_p0 select 1, 'a1', dt;
  6. -- insert dynamic partition
  7. insert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh;
  8. -- insert overwrite table
  9. insert overwrite table h0 select 1, 'a1', 20;
  10. -- insert overwrite table with static partition
  11. insert overwrite h_p0 partition(dt = '2021-01-02') select 1, 'a1';
  12. -- insert overwrite table with dynamic partition
  13. insert overwrite table h_p1 select 2 as id, 'a2', '2021-01-03' as dt, '19' as hh;

NOTICE

  • Insert mode : Hudi supports two insert modes when inserting data to a table with primary key(we call it pk-table as followed):
    Using strict mode, insert statement will keep the primary key uniqueness constraint for COW table which do not allow duplicate records. If a record already exists during insert, a HoodieDuplicateKeyException will be thrown for COW table. For MOR table, updates are allowed to existing record.
    Using non-strict mode, hudi uses the same code path used by insert operation in spark data source for the pk-table.
    One can set the insert mode by using the config: hoodie.sql.insert.mode

  • Bulk Insert : By default, hudi uses the normal insert operation for insert statements. Users can set hoodie.sql.bulk.insert.enable to true to enable the bulk insert for insert statement.

Checkout https://hudi.apache.org/blog/2021/02/13/hudi-key-generators for various key generator options, like Timestamp based, complex, custom, NonPartitioned Key gen, etc.

Insert Overwrite Table

Generate some new trips, overwrite the table logically at the Hudi metadata level. The Hudi cleaner will eventually clean up the previous table snapshot’s file groups. This can be faster than deleting the older table and recreating in Overwrite mode.

Scala

  1. // spark-shell
  2. spark.
  3. read.format("hudi").
  4. load(basePath).
  5. select("uuid","partitionpath").
  6. show(10, false)
  7. val inserts = convertToStringList(dataGen.generateInserts(10))
  8. val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
  9. df.write.format("hudi").
  10. options(getQuickstartWriteConfigs).
  11. option("hoodie.datasource.write.operation","insert_overwrite_table").
  12. option("hoodie.datasource.write.precombine.field", "ts").
  13. option("hoodie.datasource.write.recordkey.field", "uuid").
  14. option("hoodie.datasource.write.partitionpath.field", "partitionpath").
  15. option("hoodie.table.name", tableName).
  16. mode(Append).
  17. save(basePath)
  18. // Should have different keys now, from query before.
  19. spark.
  20. read.format("hudi").
  21. load(basePath).
  22. select("uuid","partitionpath").
  23. show(10, false)

SparkSQL

The insert overwrite non-partitioned table sql statement will convert to the insert_overwrite_table operation. e.g.

  1. insert overwrite table h0 select 1, 'a1', 20;

Insert Overwrite

Generate some new trips, overwrite the all the partitions that are present in the input. This operation can be faster than upsert for batch ETL jobs, that are recomputing entire target partitions at once (as opposed to incrementally updating the target tables). This is because, we are able to bypass indexing, precombining and other repartitioning steps in the upsert write path completely.

Scala

  1. // spark-shell
  2. spark.
  3. read.format("hudi").
  4. load(basePath).
  5. select("uuid","partitionpath").
  6. sort("partitionpath","uuid").
  7. show(100, false)
  8. val inserts = convertToStringList(dataGen.generateInserts(10))
  9. val df = spark.
  10. read.json(spark.sparkContext.parallelize(inserts, 2)).
  11. filter("partitionpath = 'americas/united_states/san_francisco'")
  12. df.write.format("hudi").
  13. options(getQuickstartWriteConfigs).
  14. option("hoodie.datasource.write.operation","insert_overwrite").
  15. option("hoodie.datasource.write.precombine.field", "ts").
  16. option("hoodie.datasource.write.recordkey.field", "uuid").
  17. option("hoodie.datasource.write.partitionpath.field", "partitionpath").
  18. option("hoodie.table.name", tableName).
  19. mode(Append).
  20. save(basePath)
  21. // Should have different keys now for San Francisco alone, from query before.
  22. spark.
  23. read.format("hudi").
  24. load(basePath).
  25. select("uuid","partitionpath").
  26. sort("partitionpath","uuid").
  27. show(100, false)

SparkSQL

The insert overwrite partitioned table sql statement will convert to the insert_overwrite operation. e.g.

  1. insert overwrite table h_p1 select 2 as id, 'a2', '2021-01-03' as dt, '19' as hh;

Deletes

Hudi supports implementing two types of deletes on data stored in Hudi tables, by enabling the user to specify a different record payload implementation. For more info refer to Delete support in Hudi.

  • Soft Deletes : Retain the record key and just null out the values for all the other fields. This can be achieved by ensuring the appropriate fields are nullable in the table schema and simply upserting the table after setting these fields to null. Note that soft deletes are always persisted in storage and never removed, but all values are set to nulls. So for GDPR or other compliance reasons, users should consider doing hard deletes if record key and partition path contain PII.

For example:

  1. // fetch two records for soft deletes
  2. val softDeleteDs = spark.sql("select * from hudi_trips_snapshot").limit(2)
  3. // prepare the soft deletes by ensuring the appropriate fields are nullified
  4. val nullifyColumns = softDeleteDs.schema.fields.
  5. map(field => (field.name, field.dataType.typeName)).
  6. filter(pair => (!HoodieRecord.HOODIE_META_COLUMNS.contains(pair._1)
  7. && !Array("ts", "uuid", "partitionpath").contains(pair._1)))
  8. val softDeleteDf = nullifyColumns.
  9. foldLeft(softDeleteDs.drop(HoodieRecord.HOODIE_META_COLUMNS: _*))(
  10. (ds, col) => ds.withColumn(col._1, lit(null).cast(col._2)))
  11. // simply upsert the table after setting these fields to null
  12. softDeleteDf.write.format("hudi").
  13. options(getQuickstartWriteConfigs).
  14. option("hoodie.datasource.write.operation", "upsert").
  15. option("hoodie.datasource.write.precombine.field", "ts").
  16. option("hoodie.datasource.write.recordkey.field", "uuid").
  17. option("hoodie.datasource.write.partitionpath.field", "partitionpath").
  18. option("hoodie.table.name", tableName).
  19. mode(Append).
  20. save(basePath)
  • Hard Deletes : A stronger form of deletion is to physically remove any trace of the record from the table. This can be achieved in 3 different ways.
  1. Using Datasource, set "hoodie.datasource.write.operation" to "delete". This will remove all the records in the DataSet being submitted.

Example, first read in a dataset:

  1. val roViewDF = spark.
  2. read.
  3. format("org.apache.hudi").
  4. load(basePath + "/*/*/*/*")
  5. roViewDF.createOrReplaceTempView("hudi_ro_table")
  6. spark.sql("select count(*) from hudi_ro_table").show() // should return 10 (number of records inserted above)
  7. val riderValue = spark.sql("select distinct rider from hudi_ro_table").show()
  8. // copy the value displayed to be used in next step

Now write a query of which records you would like to delete:

  1. val df = spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'")

Lastly, execute the deletion of these records:

  1. val deletes = dataGen.generateDeletes(df.collectAsList())
  2. val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2));
  3. df.write.format("org.apache.hudi").
  4. options(getQuickstartWriteConfigs).
  5. option("hoodie.datasource.write.operation","delete").
  6. option("hoodie.datasource.write.precombine.field", "ts").
  7. option("hoodie.datasource.write.recordkey.field", "uuid").
  8. option("hoodie.datasource.write.partitionpath.field", "partitionpath").
  9. option("hoodie.table.name", tableName).
  10. mode(Append).
  11. save(basePath);
  1. Using DataSource, set PAYLOAD_CLASS_OPT_KEY to "org.apache.hudi.EmptyHoodieRecordPayload". This will remove all the records in the DataSet being submitted.

This example will remove all the records from the table that exist in the DataSet deleteDF:

  1. deleteDF // dataframe containing just records to be deleted
  2. .write().format("org.apache.hudi")
  3. .option(...) // Add HUDI options like record-key, partition-path and others as needed for your setup
  4. // specify record_key, partition_key, precombine_fieldkey & usual params
  5. .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.EmptyHoodieRecordPayload")
  1. Using DataSource or DeltaStreamer, add a column named _hoodie_is_deleted to DataSet. The value of this column must be set to true for all the records to be deleted and either false or left null for any records which are to be upserted.

Let’s say the original schema is:

  1. {
  2. "type":"record",
  3. "name":"example_tbl",
  4. "fields":[{
  5. "name": "uuid",
  6. "type": "String"
  7. }, {
  8. "name": "ts",
  9. "type": "string"
  10. }, {
  11. "name": "partitionPath",
  12. "type": "string"
  13. }, {
  14. "name": "rank",
  15. "type": "long"
  16. }
  17. ]}

Make sure you add _hoodie_is_deleted column:

  1. {
  2. "type":"record",
  3. "name":"example_tbl",
  4. "fields":[{
  5. "name": "uuid",
  6. "type": "String"
  7. }, {
  8. "name": "ts",
  9. "type": "string"
  10. }, {
  11. "name": "partitionPath",
  12. "type": "string"
  13. }, {
  14. "name": "rank",
  15. "type": "long"
  16. }, {
  17. "name" : "_hoodie_is_deleted",
  18. "type" : "boolean",
  19. "default" : false
  20. }
  21. ]}

Then any record you want to delete you can mark _hoodie_is_deleted as true:

  1. {"ts": 0.0, "uuid": "19tdb048-c93e-4532-adf9-f61ce6afe10", "rank": 1045, "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : true}

Concurrency Control

Following is an example of how to use optimistic_concurrency_control via Spark DataSource API.

Read more in-depth details about concurrency control in the concurrency control concepts section.

  1. inputDF.write.format("hudi")
  2. .options(getQuickstartWriteConfigs)
  3. .option("hoodie.datasource.write.precombine.field", "ts")
  4. .option("hoodie.cleaner.policy.failed.writes", "LAZY")
  5. .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
  6. .option("hoodie.write.lock.zookeeper.url", "zookeeper")
  7. .option("hoodie.write.lock.zookeeper.port", "2181")
  8. .option("hoodie.write.lock.zookeeper.lock_key", "test_table")
  9. .option("hoodie.write.lock.zookeeper.base_path", "/test")
  10. .option("hoodie.datasource.write.recordkey.field", "uuid")
  11. .option("hoodie.datasource.write.partitionpath.field", "partitionpath")
  12. .option("hoodie.table.name", tableName)
  13. .mode(Overwrite)
  14. .save(basePath)

Java Client

We can use plain java to write to hudi tables. To use Java client we can refere here