Apache Iceberg sink connector

Support Iceberg Version

  • 1.4.2

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Description

Sink connector for Apache Iceberg. It can support cdc mode 、auto create table and table schema evolution.

Supported DataSource Info

Datasource Dependent Maven
Iceberg hive-exec Download
Iceberg libfb303 Download

Database Dependency

In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to /lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.

  1. hive-exec-xxx.jar
  2. libfb303-xxx.jar

Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package.

Data Type Mapping

SeaTunnel Data type Iceberg Data type
BOOLEAN BOOLEAN
INT INTEGER
BIGINT LONG
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
STRING STRING
BYTES FIXED
BINARY
DECIMAL DECIMAL
ROW STRUCT
ARRAY LIST
MAP MAP

Sink Options

Name Type Required Default Description
catalog_name string yes default User-specified catalog name. default is default
namespace string yes default The iceberg database name in the backend catalog. default is default
table string yes - The iceberg table name in the backend catalog.
iceberg.catalog.config map yes - Specify the properties for initializing the Iceberg catalog, which can be referenced in this file:”https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java
hadoop.config map no - Properties passed through to the Hadoop configuration
iceberg.hadoop-conf-path string no - The specified loading paths for the ‘core-site.xml’, ‘hdfs-site.xml’, ‘hive-site.xml’ files.
case_sensitive boolean no false If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity.
iceberg.table.write-props map no - Properties passed through to Iceberg writer initialization, these take precedence, such as ‘write.format.default’, ‘write.target-file-size-bytes’, and other settings, can be found with specific parameters at ‘https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/TableProperties.java‘.
iceberg.table.auto-create-props map no - Configuration specified by Iceberg during automatic table creation.
iceberg.table.schema-evolution-enabled boolean no false Setting to true enables Iceberg tables to support schema evolution during the synchronization process
iceberg.table.primary-keys string no - Default comma-separated list of columns that identify a row in tables (primary key)
iceberg.table.partition-keys string no - Default comma-separated list of partition fields to use when creating tables
iceberg.table.upsert-mode-enabled boolean no false Set to true to enable upsert mode, default is false
schema_save_mode Enum no CREATE_SCHEMA_WHEN_NOT_EXIST the schema save mode, please refer to schema_save_mode below
data_save_mode Enum no APPEND_DATA the data save mode, please refer to data_save_mode below
iceberg.table.commit-branch string no - Default branch for commits

Task Example

Simple:

  1. env {
  2. parallelism = 1
  3. job.mode = "STREAMING"
  4. checkpoint.interval = 5000
  5. }
  6. source {
  7. MySQL-CDC {
  8. result_table_name = "customers_mysql_cdc_iceberg"
  9. server-id = 5652
  10. username = "st_user"
  11. password = "seatunnel"
  12. table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
  13. base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
  14. }
  15. }
  16. transform {
  17. }
  18. sink {
  19. Iceberg {
  20. catalog_name="seatunnel_test"
  21. iceberg.catalog.config={
  22. "type"="hadoop"
  23. "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
  24. }
  25. namespace="seatunnel_namespace"
  26. table="iceberg_sink_table"
  27. iceberg.table.write-props={
  28. write.format.default="parquet"
  29. write.target-file-size-bytes=536870912
  30. }
  31. iceberg.table.primary-keys="id"
  32. iceberg.table.partition-keys="f_datetime"
  33. iceberg.table.upsert-mode-enabled=true
  34. iceberg.table.schema-evolution-enabled=true
  35. case_sensitive=true
  36. }
  37. }

Hive Catalog:

  1. sink {
  2. Iceberg {
  3. catalog_name="seatunnel_test"
  4. iceberg.catalog.config={
  5. type = "hive"
  6. uri = "thrift://localhost:9083"
  7. warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
  8. }
  9. namespace="seatunnel_namespace"
  10. table="iceberg_sink_table"
  11. iceberg.table.write-props={
  12. write.format.default="parquet"
  13. write.target-file-size-bytes=536870912
  14. }
  15. iceberg.table.primary-keys="id"
  16. iceberg.table.partition-keys="f_datetime"
  17. iceberg.table.upsert-mode-enabled=true
  18. iceberg.table.schema-evolution-enabled=true
  19. case_sensitive=true
  20. }
  21. }

Hadoop catalog:

  1. sink {
  2. Iceberg {
  3. catalog_name="seatunnel_test"
  4. iceberg.catalog.config={
  5. type = "hadoop"
  6. warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
  7. }
  8. namespace="seatunnel_namespace"
  9. table="iceberg_sink_table"
  10. iceberg.table.write-props={
  11. write.format.default="parquet"
  12. write.target-file-size-bytes=536870912
  13. }
  14. iceberg.table.primary-keys="id"
  15. iceberg.table.partition-keys="f_datetime"
  16. iceberg.table.upsert-mode-enabled=true
  17. iceberg.table.schema-evolution-enabled=true
  18. case_sensitive=true
  19. }
  20. }

Changelog

2.3.4-SNAPSHOT 2024-01-18

  • Add Iceberg Sink Connector

next version