SelectDB Cloud sink connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Description

Used to send data to SelectDB Cloud. Both support streaming and batch mode. The internal implementation of SelectDB Cloud sink connector upload after batch caching and commit the CopyInto sql to load data into the table.

Supported DataSource Info

Version Supported

  • supported SelectDB Cloud version is >= 2.2.x

Sink Options

Name Type Required Default Description
load-url String Yes - SelectDB Cloud warehouse http address, the format is warehouse_ip:http_port
jdbc-url String Yes - SelectDB Cloud warehouse jdbc address, the format is warehouse_ip:mysql_port
cluster-name String Yes - SelectDB Cloud cluster name
username String Yes - SelectDB Cloud user username
password String Yes - SelectDB Cloud user password
sink.enable-2pc bool No true Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. SelectDB uses cache files to load data. When the amount of data is large, cached data may become invalid (the default expiration time is 1 hour). If you encounter a large amount of data write loss, please configure sink.enable-2pc to false.
table.identifier String Yes - The name of SelectDB Cloud table, the format is database.table
sink.enable-delete bool No false Whether to enable deletion. This option requires SelectDB Cloud table to enable batch delete function, and only supports Unique model.
sink.max-retries int No 3 the max retry times if writing records to database failed
sink.buffer-size int No 10 1024 1024 (1MB) the buffer size to cache data for stream load.
sink.buffer-count int No 10000 the buffer count to cache data for stream load.
selectdb.config map yes - This option is used to support operations such as insert, delete, and update when automatically generate sql,and supported formats.

Data Type Mapping

SelectDB Cloud Data type SeaTunnel Data type
BOOLEAN BOOLEAN
TINYINT TINYINT
SMALLINT SMALLINT
TINYINT
INT INT
SMALLINT
TINYINT
BIGINT BIGINT
INT
SMALLINT
TINYINT
LARGEINT BIGINT
INT
SMALLINT
TINYINT
FLOAT FLOAT
DOUBLE DOUBLE
FLOAT
DECIMAL DECIMAL
DOUBLE
FLOAT
DATE DATE
DATETIME TIMESTAMP
CHAR STRING
VARCHAR STRING
STRING STRING
ARRAY ARRAY
MAP MAP
JSON STRING
HLL Not supported yet
BITMAP Not supported yet
QUANTILE_STATE Not supported yet
STRUCT Not supported yet

Supported import data formats

The supported formats include CSV and JSON

Task Example

Simple:

The following example describes writing multiple data types to SelectDBCloud, and users need to create corresponding tables downstream

  1. env {
  2. parallelism = 1
  3. job.mode = "BATCH"
  4. checkpoint.interval = 10000
  5. }
  6. source {
  7. FakeSource {
  8. row.num = 10
  9. map.size = 10
  10. array.size = 10
  11. bytes.length = 10
  12. string.length = 10
  13. schema = {
  14. fields {
  15. c_map = "map<string, array<int>>"
  16. c_array = "array<int>"
  17. c_string = string
  18. c_boolean = boolean
  19. c_tinyint = tinyint
  20. c_smallint = smallint
  21. c_int = int
  22. c_bigint = bigint
  23. c_float = float
  24. c_double = double
  25. c_decimal = "decimal(16, 1)"
  26. c_null = "null"
  27. c_bytes = bytes
  28. c_date = date
  29. c_timestamp = timestamp
  30. }
  31. }
  32. }
  33. }
  34. sink {
  35. SelectDBCloud {
  36. load-url = "warehouse_ip:http_port"
  37. jdbc-url = "warehouse_ip:mysql_port"
  38. cluster-name = "Cluster"
  39. table.identifier = "test.test"
  40. username = "admin"
  41. password = "******"
  42. selectdb.config {
  43. file.type = "json"
  44. }
  45. }
  46. }

Use JSON format to import data

  1. sink {
  2. SelectDBCloud {
  3. load-url = "warehouse_ip:http_port"
  4. jdbc-url = "warehouse_ip:mysql_port"
  5. cluster-name = "Cluster"
  6. table.identifier = "test.test"
  7. username = "admin"
  8. password = "******"
  9. selectdb.config {
  10. file.type = "json"
  11. }
  12. }
  13. }

Use CSV format to import data

  1. sink {
  2. SelectDBCloud {
  3. load-url = "warehouse_ip:http_port"
  4. jdbc-url = "warehouse_ip:mysql_port"
  5. cluster-name = "Cluster"
  6. table.identifier = "test.test"
  7. username = "admin"
  8. password = "******"
  9. selectdb.config {
  10. file.type = "csv"
  11. file.column_separator = ","
  12. file.line_delimiter = "\n"
  13. }
  14. }
  15. }