Apache Iceberg source connector

Support Iceberg Version

  • 1.4.2

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key features

Description

Source connector for Apache Iceberg. It can support batch and stream mode.

Supported DataSource Info

Datasource Dependent Maven
Iceberg hive-exec Download
Iceberg libfb303 Download

Database Dependency

In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to /lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.

  1. hive-exec-xxx.jar
  2. libfb303-xxx.jar

Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package.

Data Type Mapping

Iceberg Data type SeaTunnel Data type
BOOLEAN BOOLEAN
INTEGER INT
LONG BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
DATE DATE
TIME TIME
TIMESTAMP TIMESTAMP
STRING STRING
FIXED
BINARY
BYTES
DECIMAL DECIMAL
STRUCT ROW
LIST ARRAY
MAP MAP

Source Options

Name Type Required Default Description
catalog_name string yes - User-specified catalog name.
namespace string yes - The iceberg database name in the backend catalog.
table string yes - The iceberg table name in the backend catalog.
iceberg.catalog.config map yes - Specify the properties for initializing the Iceberg catalog, which can be referenced in this file:”https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java
hadoop.config map no - Properties passed through to the Hadoop configuration
iceberg.hadoop-conf-path string no - The specified loading paths for the ‘core-site.xml’, ‘hdfs-site.xml’, ‘hive-site.xml’ files.
schema config no - Use projection to select data columns and columns order.
case_sensitive boolean no false If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity.
start_snapshot_timestamp long no - Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp.
timestamp – the timestamp in millis since the Unix epoch
start_snapshot_id long no - Instructs this scan to look for changes starting from a particular snapshot (exclusive).
end_snapshot_id long no - Instructs this scan to look for changes up to a particular snapshot (inclusive).
use_snapshot_id long no - Instructs this scan to look for use the given snapshot ID.
use_snapshot_timestamp long no - Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch
stream_scan_strategy enum no FROM_LATEST_SNAPSHOT Starting strategy for stream mode execution, Default to use FROM_LATEST_SNAPSHOT if don’t specify any value,The optional values are:
TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode.
FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive.
FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive.
FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive.
FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive.
common-options no - Source plugin common parameters, please refer to Source Common Options for details.

Task Example

Simple:

  1. env {
  2. parallelism = 2
  3. job.mode = "BATCH"
  4. }
  5. source {
  6. Iceberg {
  7. schema {
  8. fields {
  9. f2 = "boolean"
  10. f1 = "bigint"
  11. f3 = "int"
  12. f4 = "bigint"
  13. f5 = "float"
  14. f6 = "double"
  15. f7 = "date"
  16. f9 = "timestamp"
  17. f10 = "timestamp"
  18. f11 = "string"
  19. f12 = "bytes"
  20. f13 = "bytes"
  21. f14 = "decimal(19,9)"
  22. f15 = "array<int>"
  23. f16 = "map<string, int>"
  24. }
  25. }
  26. catalog_name = "seatunnel"
  27. iceberg.catalog.config={
  28. type = "hadoop"
  29. warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
  30. }
  31. namespace = "database1"
  32. table = "source"
  33. result_table_name = "iceberg"
  34. }
  35. }
  36. transform {
  37. }
  38. sink {
  39. Console {
  40. source_table_name = "iceberg"
  41. }
  42. }

Hive Catalog:

  1. source {
  2. Iceberg {
  3. catalog_name = "seatunnel"
  4. iceberg.catalog.config={
  5. type = "hive"
  6. uri = "thrift://localhost:9083"
  7. warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
  8. }
  9. catalog_type = "hive"
  10. namespace = "your_iceberg_database"
  11. table = "your_iceberg_table"
  12. }
  13. }

Column Projection:

  1. source {
  2. Iceberg {
  3. catalog_name = "seatunnel"
  4. iceberg.catalog.config={
  5. type = "hadoop"
  6. warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
  7. }
  8. namespace = "your_iceberg_database"
  9. table = "your_iceberg_table"
  10. schema {
  11. fields {
  12. f2 = "boolean"
  13. f1 = "bigint"
  14. f3 = "int"
  15. f4 = "bigint"
  16. }
  17. }
  18. }
  19. }

Changelog

2.2.0-beta 2022-09-26

  • Add Iceberg Source Connector

next version

  • [Feature] Support Hadoop3.x (3046)
  • [improve][api] Refactoring schema parse (4157)