Apache Iceberg source connector
Support Iceberg Version
- 1.4.2
Support Those Engines
Spark
Flink
SeaTunnel Zeta
Key features
- batch
- stream
- exactly-once
- column projection
- parallelism
- support user-defined split
- data format
- parquet
- orc
- avro
- iceberg catalog
- hadoop(2.7.1 , 2.7.5 , 3.1.3)
- hive(2.3.9 , 3.1.2)
Description
Source connector for Apache Iceberg. It can support batch and stream mode.
Supported DataSource Info
| Datasource | Dependent | Maven |
|---|---|---|
| Iceberg | hive-exec | Download |
| Iceberg | libfb303 | Download |
Database Dependency
In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to
/lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.
hive-exec-xxx.jarlibfb303-xxx.jar
Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package.
Data Type Mapping
| Iceberg Data type | SeaTunnel Data type |
|---|---|
| BOOLEAN | BOOLEAN |
| INTEGER | INT |
| LONG | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIME | TIME |
| TIMESTAMP | TIMESTAMP |
| STRING | STRING |
| FIXED BINARY |
BYTES |
| DECIMAL | DECIMAL |
| STRUCT | ROW |
| LIST | ARRAY |
| MAP | MAP |
Source Options
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| catalog_name | string | yes | - | User-specified catalog name. |
| namespace | string | yes | - | The iceberg database name in the backend catalog. |
| table | string | yes | - | The iceberg table name in the backend catalog. |
| iceberg.catalog.config | map | yes | - | Specify the properties for initializing the Iceberg catalog, which can be referenced in this file:”https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java“ |
| hadoop.config | map | no | - | Properties passed through to the Hadoop configuration |
| iceberg.hadoop-conf-path | string | no | - | The specified loading paths for the ‘core-site.xml’, ‘hdfs-site.xml’, ‘hive-site.xml’ files. |
| schema | config | no | - | Use projection to select data columns and columns order. |
| case_sensitive | boolean | no | false | If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity. |
| start_snapshot_timestamp | long | no | - | Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp. timestamp – the timestamp in millis since the Unix epoch |
| start_snapshot_id | long | no | - | Instructs this scan to look for changes starting from a particular snapshot (exclusive). |
| end_snapshot_id | long | no | - | Instructs this scan to look for changes up to a particular snapshot (inclusive). |
| use_snapshot_id | long | no | - | Instructs this scan to look for use the given snapshot ID. |
| use_snapshot_timestamp | long | no | - | Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch |
| stream_scan_strategy | enum | no | FROM_LATEST_SNAPSHOT | Starting strategy for stream mode execution, Default to use FROM_LATEST_SNAPSHOT if don’t specify any value,The optional values are:TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. |
| common-options | no | - | Source plugin common parameters, please refer to Source Common Options for details. |
Task Example
Simple:
env {parallelism = 2job.mode = "BATCH"}source {Iceberg {schema {fields {f2 = "boolean"f1 = "bigint"f3 = "int"f4 = "bigint"f5 = "float"f6 = "double"f7 = "date"f9 = "timestamp"f10 = "timestamp"f11 = "string"f12 = "bytes"f13 = "bytes"f14 = "decimal(19,9)"f15 = "array<int>"f16 = "map<string, int>"}}catalog_name = "seatunnel"iceberg.catalog.config={type = "hadoop"warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"}namespace = "database1"table = "source"result_table_name = "iceberg"}}transform {}sink {Console {source_table_name = "iceberg"}}
Hive Catalog:
source {Iceberg {catalog_name = "seatunnel"iceberg.catalog.config={type = "hive"uri = "thrift://localhost:9083"warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"}catalog_type = "hive"namespace = "your_iceberg_database"table = "your_iceberg_table"}}
Column Projection:
source {Iceberg {catalog_name = "seatunnel"iceberg.catalog.config={type = "hadoop"warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"}namespace = "your_iceberg_database"table = "your_iceberg_table"schema {fields {f2 = "boolean"f1 = "bigint"f3 = "int"f4 = "bigint"}}}}
Changelog
2.2.0-beta 2022-09-26
- Add Iceberg Source Connector
