Doris sink connector

Support Doris Version

  • exactly-once & cdc supported Doris version is >= 1.1.x
  • Array data type supported Doris version is >= 1.2.x
  • Map data type will be support in Doris version is 2.x

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Description

Used to send data to Doris. Both support streaming and batch mode. The internal implementation of Doris sink connector is cached and imported by stream load in batches.

Sink Options

Name Type Required Default Description
fenodes String Yes - Doris cluster fenodes address, the format is "fe_ip:fe_http_port, ..."
query-port int No 9030 Doris Fenodes query_port
username String Yes - Doris user username
password String Yes - Doris user password
database String Yes - The database name of Doris table, use ${database_name} to represent the upstream table name
table String Yes - The table name of Doris table, use ${table_name} to represent the upstream table name
table.identifier String Yes - The name of Doris table, it will deprecate after version 2.3.5, please use database and table instead.
sink.label-prefix String Yes - The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel.
sink.enable-2pc bool No false Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to here.
sink.enable-delete bool No - Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this link
sink.check-interval int No 10000 check exception with the interval while loading
sink.max-retries int No 3 the max retry times if writing records to database failed
sink.buffer-size int No 256 * 1024 the buffer size to cache data for stream load.
sink.buffer-count int No 3 the buffer count to cache data for stream load.
doris.batch.size int No 1024 the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server.
needs_unsupported_type_casting boolean No false Whether to enable the unsupported type casting, such as Decimal64 to Double
schema_save_mode Enum no CREATE_SCHEMA_WHEN_NOT_EXIST the schema save mode, please refer to schema_save_mode below
data_save_mode Enum no APPEND_DATA the data save mode, please refer to data_save_mode below
save_mode_create_template string no see below see below
custom_sql String no - When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks.
doris.config map yes - This option is used to support operations such as insert, delete, and update when automatically generate sql,and supported formats.

schema_save_mode[Enum]

Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
Option introduction:
RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild when the table is saved
CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, skipped when the table is saved
ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not exist

data_save_mode[Enum]

Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side.
Option introduction:
DROP_DATA: Preserve database structure and delete data
APPEND_DATA:Preserve database structure, preserve data
CUSTOM_PROCESSING:User defined processing
ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported

save_mode_create_template

We use templates to automatically create Doris tables, which will create corresponding table creation statements based on the type of upstream data and schema type, and the default template can be modified according to the situation.

Default template:

  1. CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
  2. ${rowtype_fields}
  3. ) ENGINE=OLAP
  4. UNIQUE KEY (${rowtype_primary_key})
  5. DISTRIBUTED BY HASH (${rowtype_primary_key})
  6. PROPERTIES (
  7. "replication_allocation" = "tag.location.default: 1",
  8. "in_memory" = "false",
  9. "storage_format" = "V2",
  10. "disable_auto_compaction" = "false"
  11. )

If a custom field is filled in the template, such as adding an id field

  1. CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
  2. (
  3. id,
  4. ${rowtype_fields}
  5. ) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
  6. DISTRIBUTED BY HASH (${rowtype_primary_key})
  7. PROPERTIES
  8. (
  9. "replication_num" = "1"
  10. );

The connector will automatically obtain the corresponding type from the upstream to complete the filling, and remove the id field from rowtype_fields. This method can be used to customize the modification of field types and attributes.

You can use the following placeholders

  • database: Used to get the database in the upstream schema
  • table_name: Used to get the table name in the upstream schema
  • rowtype_fields: Used to get all the fields in the upstream schema, we will automatically map to the field description of Doris
  • rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
  • rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)

Data Type Mapping

Doris Data Type SeaTunnel Data Type
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT
INT INT
SMALLINT
TINYINT
BIGINT BIGINT
INT
SMALLINT
TINYINT
LARGEINT BIGINT
INT
SMALLINT
TINYINT
FLOAT FLOAT
DOUBLE DOUBLE
FLOAT
DECIMAL DECIMAL
DOUBLE
FLOAT
DATE DATE
DATETIME TIMESTAMP
CHAR STRING
VARCHAR STRING
STRING STRING
ARRAY ARRAY
MAP MAP
JSON STRING
HLL Not supported yet
BITMAP Not supported yet
QUANTILE_STATE Not supported yet
STRUCT Not supported yet

Supported import data formats

The supported formats include CSV and JSON

Task Example

Simple:

The following example describes writing multiple data types to Doris, and users need to create corresponding tables downstream

  1. env {
  2. parallelism = 1
  3. job.mode = "BATCH"
  4. checkpoint.interval = 10000
  5. }
  6. source {
  7. FakeSource {
  8. row.num = 10
  9. map.size = 10
  10. array.size = 10
  11. bytes.length = 10
  12. string.length = 10
  13. schema = {
  14. fields {
  15. c_map = "map<string, array<int>>"
  16. c_array = "array<int>"
  17. c_string = string
  18. c_boolean = boolean
  19. c_tinyint = tinyint
  20. c_smallint = smallint
  21. c_int = int
  22. c_bigint = bigint
  23. c_float = float
  24. c_double = double
  25. c_decimal = "decimal(16, 1)"
  26. c_null = "null"
  27. c_bytes = bytes
  28. c_date = date
  29. c_timestamp = timestamp
  30. }
  31. }
  32. }
  33. }
  34. sink {
  35. Doris {
  36. fenodes = "doris_cdc_e2e:8030"
  37. username = root
  38. password = ""
  39. database = "test"
  40. table = "e2e_table_sink"
  41. sink.label-prefix = "test-cdc"
  42. sink.enable-2pc = "true"
  43. sink.enable-delete = "true"
  44. doris.config {
  45. format = "json"
  46. read_json_by_line = "true"
  47. }
  48. }
  49. }

CDC(Change Data Capture) Event:

This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to Doris Sink,FakeSource simulates CDC data with schema, score (int type),Doris needs to create a table sink named test.e2e_table_sink and a corresponding table for it.

  1. env {
  2. parallelism = 1
  3. job.mode = "BATCH"
  4. checkpoint.interval = 10000
  5. }
  6. source {
  7. FakeSource {
  8. schema = {
  9. fields {
  10. pk_id = bigint
  11. name = string
  12. score = int
  13. sex = boolean
  14. number = tinyint
  15. height = float
  16. sight = double
  17. create_time = date
  18. update_time = timestamp
  19. }
  20. }
  21. rows = [
  22. {
  23. kind = INSERT
  24. fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
  25. },
  26. {
  27. kind = INSERT
  28. fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
  29. },
  30. {
  31. kind = INSERT
  32. fields = [3, "C", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
  33. },
  34. {
  35. kind = UPDATE_BEFORE
  36. fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
  37. },
  38. {
  39. kind = UPDATE_AFTER
  40. fields = [1, "A_1", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
  41. },
  42. {
  43. kind = DELETE
  44. fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
  45. }
  46. ]
  47. }
  48. }
  49. sink {
  50. Doris {
  51. fenodes = "doris_cdc_e2e:8030"
  52. username = root
  53. password = ""
  54. database = "test"
  55. table = "e2e_table_sink"
  56. sink.label-prefix = "test-cdc"
  57. sink.enable-2pc = "true"
  58. sink.enable-delete = "true"
  59. doris.config {
  60. format = "json"
  61. read_json_by_line = "true"
  62. }
  63. }
  64. }

Use JSON format to import data

  1. sink {
  2. Doris {
  3. fenodes = "e2e_dorisdb:8030"
  4. username = root
  5. password = ""
  6. database = "test"
  7. table = "e2e_table_sink"
  8. sink.enable-2pc = "true"
  9. sink.label-prefix = "test_json"
  10. doris.config = {
  11. format="json"
  12. read_json_by_line="true"
  13. }
  14. }
  15. }

Use CSV format to import data

  1. sink {
  2. Doris {
  3. fenodes = "e2e_dorisdb:8030"
  4. username = root
  5. password = ""
  6. database = "test"
  7. table = "e2e_table_sink"
  8. sink.enable-2pc = "true"
  9. sink.label-prefix = "test_csv"
  10. doris.config = {
  11. format = "csv"
  12. column_separator = ","
  13. }
  14. }
  15. }

Changelog

2.3.0-beta 2022-10-20

  • Add Doris Sink Connector

Next version

  • [Improve] Change Doris Config Prefix 3856

  • [Improve] Refactor some Doris Sink code as well as support 2pc and cdc 4235

PR 4235 is an incompatible modification to PR 3856. Please refer to PR 4235 to use the new Doris connector