iceberg.table.\.route-regex
The regex used to match a record’s routeField to a table |
iceberg.control.topic |
Name of the control topic, default is control-iceberg |
iceberg.control.group-id-prefix |
Prefix for the control consumer group, default is cg-control |
iceberg.control.commit.interval-ms |
Commit interval in msec, default is 300,000 (5 min) |
iceberg.control.commit.timeout-ms |
Commit timeout interval in msec, default is 30,000 (30 sec) |
iceberg.control.commit.threads |
Number of threads to use for commits, default is (cores * 2) |
iceberg.catalog |
Name of the catalog, default is iceberg |
iceberg.catalog.* |
Properties passed through to Iceberg catalog initialization |
iceberg.hadoop-conf-dir |
If specified, Hadoop config files in this directory will be loaded |
iceberg.hadoop.* |
Properties passed through to the Hadoop configuration |
iceberg.kafka.* |
Properties passed through to control topic Kafka client initialization |
If iceberg.tables.dynamic-enabled is false (the default) then you must specify iceberg.tables . If
iceberg.tables.dynamic-enabled is true then you must specify iceberg.tables.route-field which will
contain the name of the table.
Kafka configurationBy default the connector will attempt to use Kafka client config from the worker properties for connecting to
the control topic. If that config cannot be read for some reason, Kafka client settings
can be set explicitly using iceberg.kafka.* properties.
Messages should be converted to a struct or map using the appropriate Kafka Connect converter.
Catalog configurationThe iceberg.catalog.* properties are required for connecting to the Iceberg catalog. The core catalog
types are included in the default distribution, including REST, Glue, DynamoDB, Hadoop, Nessie,
JDBC, and Hive. JDBC drivers are not included in the default distribution, so you will need to include
those if needed. When using a Hive catalog, you can use the distribution that includes the Hive metastore client,
otherwise you will need to include that yourself.
To set the catalog type, you can set iceberg.catalog.type to rest , hive , or hadoop . For other
catalog types, you need to instead set iceberg.catalog.catalog-impl to the name of the catalog class.
REST example"iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://catalog-service", "iceberg.catalog.credential": "<credential>", "iceberg.catalog.warehouse": "<warehouse>",
Hive exampleNOTE: Use the distribution that includes the HMS client (or include the HMS client yourself). Use S3FileIO when
using S3 for storage (the default is HadoopFileIO with HiveCatalog ).
"iceberg.catalog.type": "hive", "iceberg.catalog.uri": "thrift://hive:9083", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "iceberg.catalog.warehouse": "s3a://bucket/warehouse", "iceberg.catalog.client.region": "us-east-1", "iceberg.catalog.s3.access-key-id": "<AWS access>", "iceberg.catalog.s3.secret-access-key": "<AWS secret>",
Glue example"iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "iceberg.catalog.warehouse": "s3a://bucket/warehouse", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
Nessie example"iceberg.catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog", "iceberg.catalog.uri": "http://localhost:19120/api/v2", "iceberg.catalog.ref": "main", "iceberg.catalog.warehouse": "s3a://bucket/warehouse", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
NotesDepending on your setup, you may need to also set iceberg.catalog.s3.endpoint , iceberg.catalog.s3.staging-dir ,
or iceberg.catalog.s3.path-style-access . See the Iceberg docs for
full details on configuring catalogs.
Azure ADLS configuration exampleWhen using ADLS, Azure requires the passing of AZURE_CLIENT_ID, AZURE_TENANT_ID, and AZURE_CLIENT_SECRET for its Java SDK.
If you’re running Kafka Connect in a container, be sure to inject those values as environment variables. See the
Azure Identity Client library for Java for more information.
An example of these would be:
AZURE_CLIENT_ID=e564f687-7b89-4b48-80b8-111111111111 AZURE_TENANT_ID=95f2f365-f5b7-44b1-88a1-111111111111 AZURE_CLIENT_SECRET="XXX"
Where the CLIENT_ID is the Application ID of a registered application under
App Registrations, the TENANT_ID is
from your Azure Tenant Properties, and
the CLIENT_SECRET is created within the “Certificates & Secrets” section, under “Manage” after choosing your specific
App Registration. You might have to choose “Client secrets” in the middle panel and the “+” in front of “New client secret”
to generate one. Be sure to set this variable to the Value and not the Id.
It’s also important that the App Registration is granted the Role Assignment “Storage Blob Data Contributor” in your
Storage Account’s Access Control (IAM), or it won’t be able to write new files there.
Then, within the Connector’s configuration, you’ll want to include the following:
"iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://catalog:8181", "iceberg.catalog.warehouse": "abfss://storage-container-name@storageaccount.dfs.core.windows.net/warehouse", "iceberg.catalog.io-impl": "org.apache.iceberg.azure.adlsv2.ADLSFileIO", "iceberg.catalog.include-credentials": "true"
Where storage-container-name is the container name within your Azure Storage Account, /warehouse is the location
within that container where your Apache Iceberg files will be written by default (or if iceberg.tables.auto-create-enabled=true),
and the include-credentials parameter passes along the Azure Java client credentials along. This will configure the
Iceberg Sink connector to connect to the REST catalog implementation at iceberg.catalog.uri to obtain the required
Connection String for the ADLSv2 client
Google GCS configuration exampleBy default, Application Default Credentials (ADC) will be used to connect to GCS. Details on how ADC works can
be found in the Google Cloud documentation.
"iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://catalog:8181", "iceberg.catalog.warehouse": "gs://bucket-name/warehouse", "iceberg.catalog.io-impl": "org.apache.iceberg.google.gcs.GCSFileIO"
Hadoop configurationWhen using HDFS or Hive, the sink will initialize the Hadoop configuration. First, config files
from the classpath are loaded. Next, if iceberg.hadoop-conf-dir is specified, config files
are loaded from that location. Finally, any iceberg.hadoop.* properties from the sink config are
applied. When merging these, the order of precedence is sink config > config dir > classpath.
ExamplesInitial setupSource topicThis assumes the source topic already exists and is named events .
Control topicIf your Kafka cluster has auto.create.topics.enable set to true (the default), then the control topic will be
automatically created. If not, then you will need to create the topic first. The default topic name is control-iceberg :
bin/kafka-topics \ --command-config command-config.props \ --bootstrap-server ${CONNECT_BOOTSTRAP_SERVERS} \ --create \ --topic control-iceberg \ --partitions 1
NOTE: Clusters running on Confluent Cloud have auto.create.topics.enable set to false by default.
Iceberg catalog configurationConfiguration properties with the prefix iceberg.catalog. will be passed to Iceberg catalog initialization.
See the Iceberg docs for details on how to configure
a particular catalog.
Single destination tableThis example writes all incoming records to a single table.
Create the destination tableCREATE TABLE default.events ( id STRING, type STRING, ts TIMESTAMP, payload STRING) PARTITIONED BY (hours(ts))
Connector configThis example config connects to a Iceberg REST catalog.
{ "name": "events-sink", "config": { "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", "tasks.max": "2", "topics": "events", "iceberg.tables": "default.events", "iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://localhost", "iceberg.catalog.credential": "<credential>", "iceberg.catalog.warehouse": "<warehouse name>" } }
Multi-table fan-out, static routingThis example writes records with type set to list to the table default.events_list , and
writes records with type set to create to the table default.events_create . Other records
will be skipped.
Create two destination tablesCREATE TABLE default.events_list ( id STRING, type STRING, ts TIMESTAMP, payload STRING) PARTITIONED BY (hours(ts));
CREATE TABLE default.events_create ( id STRING, type STRING, ts TIMESTAMP, payload STRING) PARTITIONED BY (hours(ts));
Connector config{ "name": "events-sink", "config": { "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", "tasks.max": "2", "topics": "events", "iceberg.tables": "default.events_list,default.events_create", "iceberg.tables.route-field": "type", "iceberg.table.default.events_list.route-regex": "list", "iceberg.table.default.events_create.route-regex": "create", "iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://localhost", "iceberg.catalog.credential": "<credential>", "iceberg.catalog.warehouse": "<warehouse name>" } }
Multi-table fan-out, dynamic routingThis example writes to tables with names from the value in the db_table field. If a table with
the name does not exist, then the record will be skipped. For example, if the record’s db_table
field is set to default.events_list , then the record is written to the default.events_list table.
Create two destination tablesSee above for creating two tables.
Connector config{ "name": "events-sink", "config": { "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector", "tasks.max": "2", "topics": "events", "iceberg.tables.dynamic-enabled": "true", "iceberg.tables.route-field": "db_table", "iceberg.catalog.type": "rest", "iceberg.catalog.uri": "https://localhost", "iceberg.catalog.credential": "<credential>", "iceberg.catalog.warehouse": "<warehouse name>" } }
|