This page introduces Flink-Hudi integration. We can feel the unique charm of how Flink brings in the power of streaming into Hudi.

Setup

Hudi Supported Flink version
0.15.x 1.14.x, 1.15.x, 1.16.x, 1.17.x, 1.18.x
0.14.x 1.13.x, 1.14.x, 1.15.x, 1.16.x, 1.17.x
0.13.x 1.13.x, 1.14.x, 1.15.x, 1.16.x
0.12.x 1.13.x, 1.14.x, 1.15.x
0.11.x 1.13.x, 1.14.x

Hudi works with Flink 1.13 (up to Hudi 0.14.x release), Flink 1.14, Flink 1.15, Flink 1.16, Flink 1.17, and Flink 1.18. You can follow the instructions here for setting up Flink. Then, start a standalone Flink cluster within hadoop environment. In case we are trying on local setup, then we could download hadoop binaries and set HADOOP_HOME.

  1. # HADOOP_HOME is your hadoop root directory after unpack the binary package.
  2. export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
  3. # Start the Flink standalone cluster
  4. ./bin/start-cluster.sh

Please note the following:

  • We suggest hadoop 2.9.x+ version because some of the object storage has filesystem implementation only after that
  • The flink-parquet and flink-avro formats are already packaged into the hudi-flink-bundle jar

Flink SQL

We use the Flink Sql Client because it’s a good quick start tool for SQL users.

Hudi supports packaged bundle jar for Flink, which should be loaded in the Flink SQL Client when it starts up. You can build the jar manually under path hudi-source-dir/packaging/hudi-flink-bundle(see Build Flink Bundle Jar), or download it from the Apache Official Repository.

Now start the SQL CLI:

  1. # For Flink versions: 1.13 - 1.18
  2. export FLINK_VERSION=1.17
  3. export HUDI_VERSION=0.15.0
  4. wget https://repo1.maven.org/maven2/org/apache/hudi/hudi-flink${FLINK_VERSION}-bundle/${HUDI_VERSION}/hudi-flink${FLINK_VERSION}-bundle-${HUDI_VERSION}.jar -P $FLINK_HOME/lib/
  5. ./bin/sql-client.sh embedded -j lib/hudi-flink${FLINK_VERSION}-bundle-${HUDI_VERSION}.jar shell

Setup table name, base path and operate using SQL for this guide. The SQL CLI only executes the SQL line by line.

DataStream API

Hudi works with Flink 1.13 (up to Hudi 0.14.x release), Flink 1.14, Flink 1.15, Flink 1.16, Flink 1.17, and Flink 1.18. Please add the desired dependency to your project:

  1. <!-- For Flink versions 1.13 - 1.18-->
  2. <properties>
  3. <flink.version>1.17.0</flink.version>
  4. <flink.binary.version>1.17</flink.binary.version>
  5. <hudi.version>0.15.0</hudi.version>
  6. </properties>
  7. <dependency>
  8. <groupId>org.apache.hudi</groupId>
  9. <artifactId>hudi-flink${flink.binary.version}-bundle</artifactId>
  10. <version>${hudi.version}</version>
  11. </dependency>

Create Table

First, let’s create a Hudi table. Here, we use a partitioned table for illustration, but Hudi also supports non-partitioned tables.

Flink SQL

Here is an example of creating a flink Hudi table.

  1. -- sets up the result mode to tableau to show the results directly in the CLI
  2. set sql-client.execution.result-mode = tableau;
  3. DROP TABLE hudi_table;
  4. CREATE TABLE hudi_table(
  5. ts BIGINT,
  6. uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
  7. rider VARCHAR(20),
  8. driver VARCHAR(20),
  9. fare DOUBLE,
  10. city VARCHAR(20)
  11. )
  12. PARTITIONED BY (`city`)
  13. WITH (
  14. 'connector' = 'hudi',
  15. 'path' = 'file:///tmp/hudi_table',
  16. 'table.type' = 'MERGE_ON_READ'
  17. );

DataStream API

  1. // Java
  2. // First commit will auto-initialize the table, if it did not exist in the specified base path.

Insert Data

Flink SQL

Insert data into the Hudi table using SQL VALUES.

  1. -- insert data using values
  2. INSERT INTO hudi_table
  3. VALUES
  4. (1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
  5. (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
  6. (1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
  7. (1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
  8. (1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),
  9. (1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'),
  10. (1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'),
  11. (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');

DataStream API

Add some streaming source to flink and load the data in hudi table. Since, this is the first write, it will also auto-create the table.

  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.data.RowData;
  4. import org.apache.hudi.common.model.HoodieTableType;
  5. import org.apache.hudi.configuration.FlinkOptions;
  6. import org.apache.hudi.util.HoodiePipeline;
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. String targetTable = "hudi_table";
  9. String basePath = "file:///tmp/hudi_table";
  10. Map<String, String> options = new HashMap<>();
  11. options.put("path", basePath);
  12. options.put("table.type", HoodieTableType.MERGE_ON_READ.name());
  13. options.put("precombine.field", "ts");
  14. DataStream<RowData> dataStream = env.addSource(...);
  15. HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
  16. .column("uuid VARCHAR(20)")
  17. .column("name VARCHAR(10)")
  18. .column("age INT")
  19. .column("ts TIMESTAMP(3)")
  20. .column("`partition` VARCHAR(20)")
  21. .pk("uuid")
  22. .partition("partition")
  23. .options(options);
  24. builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
  25. env.execute("Api_Sink");

Refer Full Quickstart Example here

Query Data

Flink SQL

  1. -- query from the Hudi table
  2. select * from hudi_table;

DataStream API

  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.data.RowData;
  4. import org.apache.hudi.common.model.HoodieTableType;
  5. import org.apache.hudi.configuration.FlinkOptions;
  6. import org.apache.hudi.util.HoodiePipeline;
  7. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8. String targetTable = "hudi_table";
  9. String basePath = "file:///tmp/hudi_table";
  10. Map<String, String> options = new HashMap<>();
  11. options.put("path", basePath);
  12. options.put("table.type", HoodieTableType.MERGE_ON_READ.name());
  13. options.put("read.streaming.enabled", "true"); // this option enable the streaming read
  14. options.put("read.start-commit", "20210316134557"); // specifies the start commit instant time
  15. HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
  16. .column("uuid VARCHAR(20)")
  17. .column("name VARCHAR(10)")
  18. .column("age INT")
  19. .column("ts TIMESTAMP(3)")
  20. .column("`partition` VARCHAR(20)")
  21. .pk("uuid")
  22. .partition("partition")
  23. .options(options);
  24. DataStream<RowData> rowDataDataStream = builder.source(env);
  25. rowDataDataStream.print();
  26. env.execute("Api_Source");

Refer Full Streaming Reader Example here

This statement queries snapshot view of the dataset. Refers to Table types and queries for more info on all table types and query types supported.

Update Data

This is similar to inserting new data.

Flink SQL

Hudi tables can be updated by either inserting reocrds with same primary key or using a standard UPDATE statement shown as below.

  1. -- Update Queries only works with batch execution mode
  2. SET 'execution.runtime-mode' = 'batch';
  3. UPDATE hudi_table SET fare = 25.0 WHERE uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';

The UPDATE statement is supported since Flink 1.17, so only Hudi Flink bundle compiled with Flink 1.17+ supplies this functionality. Only batch queries on Hudi table with primary key work correctly.

DataStream API

Add some streaming source to flink and load the data in hudi table using DataStream API as above. When new rows with the same primary key arrive in stream, then it will be be updated. In the insert example incoming row with same record id will be updated.

Refer Update Example here

Querying the data again will now show updated records. Each write operation generates a new commit denoted by the timestamp.

Delete Data

Flink SQL

Row-level Delete

When consuming data in streaming query, Hudi Flink source can also accept the change logs from the upstream data source if the RowKind is set up per-row, it can then apply the UPDATE and DELETE in row level. You can then sync a NEAR-REAL-TIME snapshot on Hudi for all kinds of RDBMS.

Batch Delete

  1. -- delete all the records with age greater than 23
  2. -- NOTE: only works for batch sql queries
  3. SET 'execution.runtime-mode' = 'batch';
  4. DELETE FROM t1 WHERE age > 23;

The DELETE statement is supported since Flink 1.17, so only Hudi Flink bundle compiled with Flink 1.17+ supplies this functionality. Only batch queries on Hudi table with primary key work correctly.

DataStream API

Creates a Flink Hudi table first and insert data into the Hudi table using DataStream API as below. When new rows with the same primary key and Row Kind as Delete arrive in stream, then it will be be deleted.

Refer Delete Example here

Streaming Query

Hudi Flink also provides capability to obtain a stream of records that changed since given commit timestamp. This can be achieved using Hudi’s streaming querying and providing a start time from which changes need to be streamed. We do not need to specify endTime, if we want all changes after the given commit (as is the common case).

  1. CREATE TABLE t1(
  2. uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  3. name VARCHAR(10),
  4. age INT,
  5. ts TIMESTAMP(3),
  6. `partition` VARCHAR(20)
  7. )
  8. PARTITIONED BY (`partition`)
  9. WITH (
  10. 'connector' = 'hudi',
  11. 'path' = '${path}',
  12. 'table.type' = 'MERGE_ON_READ',
  13. 'read.streaming.enabled' = 'true', -- this option enable the streaming read
  14. 'read.start-commit' = '20210316134557', -- specifies the start commit instant time
  15. 'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
  16. );
  17. -- Then query the table in stream mode
  18. select * from t1;

Change Data Capture Query

Hudi Flink also provides capability to obtain a stream of records with Change Data Capture. CDC queries are useful for applications that need to obtain all the changes, along with before/after images of records.

  1. set sql-client.execution.result-mode = tableau;
  2. CREATE TABLE hudi_table(
  3. ts BIGINT,
  4. uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
  5. rider VARCHAR(20),
  6. driver VARCHAR(20),
  7. fare DOUBLE,
  8. city VARCHAR(20)
  9. )
  10. PARTITIONED BY (`city`)
  11. WITH (
  12. 'connector' = 'hudi',
  13. 'path' = 'file:///tmp/hudi_table',
  14. 'table.type' = 'COPY_ON_WRITE',
  15. 'cdc.enabled' = 'true' -- this option enable the cdc log enabled
  16. );
  17. -- insert data using values
  18. INSERT INTO hudi_table
  19. VALUES
  20. (1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
  21. (1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
  22. (1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
  23. (1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
  24. (1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),
  25. (1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'),
  26. (1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'),
  27. (1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
  28. SET 'execution.runtime-mode' = 'batch';
  29. UPDATE hudi_table SET fare = 25.0 WHERE uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';
  30. -- Query the table in stream mode in another shell to see change logs
  31. SET 'execution.runtime-mode' = 'streaming';
  32. select * from hudi_table/*+ OPTIONS('read.streaming.enabled'='true')*/;

This will give all changes that happened after the read.start-commit commit. The unique thing about this feature is that it now lets you author streaming pipelines on streaming or batch data source.

Where To Go From Here?

If you are relatively new to Apache Hudi, it is important to be familiar with a few core concepts:

See more in the “Concepts” section of the docs.

Take a look at recent blog posts that go in depth on certain topics or use cases.

Hudi tables can be queried from query engines like Hive, Spark, Flink, Presto and much more. We have put together a demo video that show cases all of this on a docker based setup with all dependent systems running locally. We recommend you replicate the same setup and run the demo yourself, by following steps here to get a taste for it. Also, if you are looking for ways to migrate your existing data to Hudi, refer to migration guide.