Kudu sink connector

Support Kudu Version

  • 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Data Type Mapping

SeaTunnel Data Type Kudu Data Type
BOOLEAN BOOL
INT INT8
INT16
INT32
BIGINT INT64
DECIMAL DECIMAL
FLOAT FLOAT
DOUBLE DOUBLE
STRING STRING
TIMESTAMP UNIXTIME_MICROS
BYTES BINARY

Sink Options

Name Type Required Default Description
kudu_masters String Yes - Kudu master address. Separated by ‘,’,such as ‘192.168.88.110:7051’.
table_name String Yes - The name of kudu table.
client_worker_count Int No 2 * Runtime.getRuntime().availableProcessors() Kudu worker count. Default value is twice the current number of cpu cores.
client_default_operation_timeout_ms Long No 30000 Kudu normal operation time out.
client_default_admin_operation_timeout_ms Long No 30000 Kudu admin operation time out.
enable_kerberos Bool No false Kerberos principal enable.
kerberos_principal String No - Kerberos principal. Note that all zeta nodes require have this file.
kerberos_keytab String No - Kerberos keytab. Note that all zeta nodes require have this file.
kerberos_krb5conf String No - Kerberos krb5 conf. Note that all zeta nodes require have this file.
save_mode String No - Storage mode, support overwrite and append.
session_flush_mode String No AUTO_FLUSH_SYNC Kudu flush mode. Default AUTO_FLUSH_SYNC.
batch_size Int No 1024 The flush max size (includes all append, upsert and delete records), over this number of records, will flush data. The default value is 100
buffer_flush_interval Int No 10000 The flush interval mills, over this time, asynchronous threads will flush data.
ignore_not_found Bool No false If true, ignore all not found rows.
ignore_not_duplicate Bool No false If true, ignore all dulicate rows.
common-options No - Source plugin common parameters, please refer to Source Common Options for details.

Task Example

Simple:

The following example refers to a FakeSource named “kudu” cdc write kudu table “kudu_sink_table”

  1. env {
  2. parallelism = 1
  3. job.mode = "BATCH"
  4. }
  5. source {
  6. FakeSource {
  7. result_table_name = "kudu"
  8. schema = {
  9. fields {
  10. id = int
  11. val_bool = boolean
  12. val_int8 = tinyint
  13. val_int16 = smallint
  14. val_int32 = int
  15. val_int64 = bigint
  16. val_float = float
  17. val_double = double
  18. val_decimal = "decimal(16, 1)"
  19. val_string = string
  20. val_unixtime_micros = timestamp
  21. }
  22. }
  23. rows = [
  24. {
  25. kind = INSERT
  26. fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
  27. },
  28. {
  29. kind = INSERT
  30. fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
  31. },
  32. {
  33. kind = INSERT
  34. fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
  35. },
  36. {
  37. kind = UPDATE_BEFORE
  38. fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
  39. },
  40. {
  41. kind = UPDATE_AFTER
  42. fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
  43. },
  44. {
  45. kind = DELETE
  46. fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
  47. }
  48. ]
  49. }
  50. }
  51. sink {
  52. kudu{
  53. source_table_name = "kudu"
  54. kudu_masters = "kudu-master-cdc:7051"
  55. table_name = "kudu_sink_table"
  56. enable_kerberos = true
  57. kerberos_principal = "xx@xx.COM"
  58. kerberos_keytab = "xx.keytab"
  59. }
  60. }

Multiple Table

  1. env {
  2. # You can set engine configuration here
  3. parallelism = 1
  4. job.mode = "BATCH"
  5. }
  6. source {
  7. FakeSource {
  8. tables_configs = [
  9. {
  10. schema = {
  11. table = "kudu_sink_1"
  12. fields {
  13. id = int
  14. val_bool = boolean
  15. val_int8 = tinyint
  16. val_int16 = smallint
  17. val_int32 = int
  18. val_int64 = bigint
  19. val_float = float
  20. val_double = double
  21. val_decimal = "decimal(16, 1)"
  22. val_string = string
  23. val_unixtime_micros = timestamp
  24. }
  25. }
  26. rows = [
  27. {
  28. kind = INSERT
  29. fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
  30. }
  31. ]
  32. },
  33. {
  34. schema = {
  35. table = "kudu_sink_2"
  36. fields {
  37. id = int
  38. val_bool = boolean
  39. val_int8 = tinyint
  40. val_int16 = smallint
  41. val_int32 = int
  42. val_int64 = bigint
  43. val_float = float
  44. val_double = double
  45. val_decimal = "decimal(16, 1)"
  46. val_string = string
  47. val_unixtime_micros = timestamp
  48. }
  49. }
  50. rows = [
  51. {
  52. kind = INSERT
  53. fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
  54. }
  55. ]
  56. }
  57. ]
  58. }
  59. }
  60. sink {
  61. kudu{
  62. kudu_masters = "kudu-master-multiple:7051"
  63. }
  64. }

Changelog

2.2.0-beta 2022-09-26

  • Add Kudu Sink Connector

2.3.0-beta 2022-10-20

  • [Improve] Kudu Sink Connector Support to upsert row (2881)

Next Version

  • Change plugin name from KuduSink to Kudu 3432