Kudu sink connector
Support Kudu Version
- 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0
Support Those Engines
Spark
Flink
SeaTunnel Zeta
Key Features
Data Type Mapping
| SeaTunnel Data Type | Kudu Data Type |
|---|---|
| BOOLEAN | BOOL |
| INT | INT8 INT16 INT32 |
| BIGINT | INT64 |
| DECIMAL | DECIMAL |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| STRING | STRING |
| TIMESTAMP | UNIXTIME_MICROS |
| BYTES | BINARY |
Sink Options
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| kudu_masters | String | Yes | - | Kudu master address. Separated by ‘,’,such as ‘192.168.88.110:7051’. |
| table_name | String | Yes | - | The name of kudu table. |
| client_worker_count | Int | No | 2 * Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value is twice the current number of cpu cores. |
| client_default_operation_timeout_ms | Long | No | 30000 | Kudu normal operation time out. |
| client_default_admin_operation_timeout_ms | Long | No | 30000 | Kudu admin operation time out. |
| enable_kerberos | Bool | No | false | Kerberos principal enable. |
| kerberos_principal | String | No | - | Kerberos principal. Note that all zeta nodes require have this file. |
| kerberos_keytab | String | No | - | Kerberos keytab. Note that all zeta nodes require have this file. |
| kerberos_krb5conf | String | No | - | Kerberos krb5 conf. Note that all zeta nodes require have this file. |
| save_mode | String | No | - | Storage mode, support overwrite and append. |
| session_flush_mode | String | No | AUTO_FLUSH_SYNC | Kudu flush mode. Default AUTO_FLUSH_SYNC. |
| batch_size | Int | No | 1024 | The flush max size (includes all append, upsert and delete records), over this number of records, will flush data. The default value is 100 |
| buffer_flush_interval | Int | No | 10000 | The flush interval mills, over this time, asynchronous threads will flush data. |
| ignore_not_found | Bool | No | false | If true, ignore all not found rows. |
| ignore_not_duplicate | Bool | No | false | If true, ignore all dulicate rows. |
| common-options | No | - | Source plugin common parameters, please refer to Source Common Options for details. |
Task Example
Simple:
The following example refers to a FakeSource named “kudu” cdc write kudu table “kudu_sink_table”
env {parallelism = 1job.mode = "BATCH"}source {FakeSource {result_table_name = "kudu"schema = {fields {id = intval_bool = booleanval_int8 = tinyintval_int16 = smallintval_int32 = intval_int64 = bigintval_float = floatval_double = doubleval_decimal = "decimal(16, 1)"val_string = stringval_unixtime_micros = timestamp}}rows = [{kind = INSERTfields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]},{kind = INSERTfields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]},{kind = INSERTfields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]},{kind = UPDATE_BEFOREfields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]},{kind = UPDATE_AFTERfields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]},{kind = DELETEfields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]}]}}sink {kudu{source_table_name = "kudu"kudu_masters = "kudu-master-cdc:7051"table_name = "kudu_sink_table"enable_kerberos = truekerberos_principal = "xx@xx.COM"kerberos_keytab = "xx.keytab"}}
Multiple Table
env {# You can set engine configuration hereparallelism = 1job.mode = "BATCH"}source {FakeSource {tables_configs = [{schema = {table = "kudu_sink_1"fields {id = intval_bool = booleanval_int8 = tinyintval_int16 = smallintval_int32 = intval_int64 = bigintval_float = floatval_double = doubleval_decimal = "decimal(16, 1)"val_string = stringval_unixtime_micros = timestamp}}rows = [{kind = INSERTfields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]}]},{schema = {table = "kudu_sink_2"fields {id = intval_bool = booleanval_int8 = tinyintval_int16 = smallintval_int32 = intval_int64 = bigintval_float = floatval_double = doubleval_decimal = "decimal(16, 1)"val_string = stringval_unixtime_micros = timestamp}}rows = [{kind = INSERTfields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]}]}]}}sink {kudu{kudu_masters = "kudu-master-multiple:7051"}}
Changelog
2.2.0-beta 2022-09-26
- Add Kudu Sink Connector
2.3.0-beta 2022-10-20
- [Improve] Kudu Sink Connector Support to upsert row (2881)
Next Version
- Change plugin name from
KuduSinktoKudu3432
