Catalog Configuration
A catalog is created and named by executing the following query (replace <catalog_name> with your catalog name and
<config_key>=<config_value> with catalog implementation config):
CREATE CATALOG <catalog_name> WITH ('type'='iceberg',`<config_key>`=`<config_value>`);
The following properties can be set globally and are not limited to a specific catalog implementation:
| Property | Required | Values | Description |
|---|---|---|---|
| type | ✔️ | iceberg | Must be iceberg. |
| catalog-type | hive, hadoop, rest, glue, jdbc or nessie |
The underlying Iceberg catalog implementation, HiveCatalog, HadoopCatalog, RESTCatalog, GlueCatalog, JdbcCatalog, NessieCatalog or left unset if using a custom catalog implementation via catalog-impl |
|
| catalog-impl | The fully-qualified class name of a custom catalog implementation. Must be set if catalog-type is unset. |
||
| property-version | Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is 1. |
||
| cache-enabled | true or false |
Whether to enable catalog cache, default value is true. |
|
| cache.expiration-interval-ms | How long catalog entries are locally cached, in milliseconds; negative values like -1 will disable expiration, value 0 is not allowed to set. default value is -1. |
The following properties can be set if using the Hive catalog:
| Property | Required | Values | Description |
|---|---|---|---|
| uri | ✔️ | The Hive metastore’s thrift URI. | |
| clients | The Hive metastore client pool size, default value is 2. | ||
| warehouse | The Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath. |
||
| hive-conf-dir | Path to a directory containing a hive-site.xml configuration file which will be used to provide custom Hive configuration values. The value of hive.metastore.warehouse.dir from <hive-conf-dir>/hive-site.xml (or hive configure file from classpath) will be overwritten with the warehouse value if setting both hive-conf-dir and warehouse when creating iceberg catalog. |
||
| hadoop-conf-dir | Path to a directory containing core-site.xml and hdfs-site.xml configuration files which will be used to provide custom Hadoop configuration values. |
The following properties can be set if using the Hadoop catalog:
| Property | Required | Values | Description |
|---|---|---|---|
| warehouse | ✔️ | The HDFS directory to store metadata files and data files. |
The following properties can be set if using the REST catalog:
| Property | Required | Values | Description |
|---|---|---|---|
| uri | ✔️ | The URL to the REST Catalog. | |
| credential | A credential to exchange for a token in the OAuth2 client credentials flow. | ||
| token | A token which will be used to interact with the server. |
Runtime configuration
Read options
Flink read options are passed when configuring the Flink IcebergSource:
IcebergSource.forRowData().tableLoader(TableLoader.fromCatalog(...)).assignerFactory(new SimpleSplitAssignerFactory()).streaming(true).streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID).startSnapshotId(3821550127947089987L).monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s").build()
For Flink SQL, read options can be passed in via SQL hints like this:
SELECT * FROM tableName /*+ OPTIONS('monitor-interval'='10s') */...
Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.
env.getConfig().getConfiguration().set(FlinkReadOptions.SPLIT_FILE_OPEN_COST_OPTION, 1000L);...
Read option has the highest priority, followed by Flink configuration and then Table property.
| Read option | Flink configuration | Table property | Default | Description |
|---|---|---|---|---|
| snapshot-id | N/A | N/A | null | For time travel in batch mode. Read data from the specified snapshot-id. |
| case-sensitive | connector.iceberg.case-sensitive | N/A | false | If true, match column name in a case sensitive way. |
| as-of-timestamp | N/A | N/A | null | For time travel in batch mode. Read data from the most recent snapshot as of the given time in milliseconds. |
| starting-strategy | connector.iceberg.starting-strategy | N/A | INCREMENTAL_FROM_LATEST_SNAPSHOT | Starting strategy for streaming execution. TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode. The incremental mode starts from the current snapshot exclusive. INCREMENTAL_FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive. If it is an empty map, all future append snapshots should be discovered. INCREMENTAL_FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive. INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. If the timestamp is between two snapshots, it should start from the snapshot after the timestamp. Just for FIP27 Source. |
| start-snapshot-timestamp | N/A | N/A | null | Start to read data from the most recent snapshot as of the given time in milliseconds. |
| start-snapshot-id | N/A | N/A | null | Start to read data from the specified snapshot-id. |
| end-snapshot-id | N/A | N/A | The latest snapshot id | Specifies the end snapshot. |
| branch | N/A | N/A | main | Specifies the branch to read from in batch mode |
| tag | N/A | N/A | null | Specifies the tag to read from in batch mode |
| start-tag | N/A | N/A | null | Specifies the starting tag to read from for incremental reads |
| end-tag | N/A | N/A | null | Specifies the ending tag to to read from for incremental reads |
| split-size | connector.iceberg.split-size | read.split.target-size | 128 MB | Target size when combining input splits. |
| split-lookback | connector.iceberg.split-file-open-cost | read.split.planning-lookback | 10 | Number of bins to consider when combining input splits. |
| split-file-open-cost | connector.iceberg.split-file-open-cost | read.split.open-file-cost | 4MB | The estimated cost to open a file, used as a minimum weight when combining splits. |
| streaming | connector.iceberg.streaming | N/A | false | Sets whether the current task runs in streaming or batch mode. |
| monitor-interval | connector.iceberg.monitor-interval | N/A | 60s | Monitor interval to discover splits from new snapshots. Applicable only for streaming read. |
| include-column-stats | connector.iceberg.include-column-stats | N/A | false | Create a new scan from this that loads the column stats with each data file. Column stats include: value count, null value count, lower bounds, and upper bounds. |
| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
| limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. |
| max-allowed-planning-failures | connector.iceberg.max-allowed-planning-failures | N/A | 3 | Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure. |
| watermark-column | connector.iceberg.watermark-column | N/A | null | Specifies the watermark column to use for watermark generation. If this option is present, the splitAssignerFactory will be overridden with OrderedSplitAssignerFactory. |
| watermark-column-time-unit | connector.iceberg.watermark-column-time-unit | N/A | TimeUnit.MICROSECONDS | Specifies the watermark time unit to use for watermark generation. The possible values are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS. |
Write options
Flink write options are passed when configuring the FlinkSink, like this:
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA).table(table).tableLoader(tableLoader).set("write-format", "orc").set(FlinkWriteOptions.OVERWRITE_MODE, "true");
For Flink SQL, write options can be passed in via SQL hints like this:
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */...
| Flink option | Default | Description |
|---|---|---|
| write-format | Table write.format.default | File format to use for this write operation; parquet, avro, or orc |
| target-file-size-bytes | As per table property | Overrides this table’s write.target-file-size-bytes |
| upsert-enabled | Table write.upsert.enabled | Overrides this table’s write.upsert.enabled |
| overwrite-enabled | false | Overwrite the table’s data, overwrite mode shouldn’t be enable when configuring to use UPSERT data stream. |
| distribution-mode | Table write.distribution-mode | Overrides this table’s write.distribution-mode |
| compression-codec | Table write.(fileformat).compression-codec | Overrides this table’s compression codec for this write |
| compression-level | Table write.(fileformat).compression-level | Overrides this table’s compression level for Parquet and Avro tables for this write |
| compression-strategy | Table write.orc.compression-strategy | Overrides this table’s compression strategy for ORC tables for this write |
| write-parallelism | Upstream operator parallelism | Overrides the writer parallelism |
