SelectDB Cloud sink connector
Support Those Engines
Spark
Flink
SeaTunnel Zeta
Key Features
Description
Used to send data to SelectDB Cloud. Both support streaming and batch mode. The internal implementation of SelectDB Cloud sink connector upload after batch caching and commit the CopyInto sql to load data into the table.
Supported DataSource Info
Version Supported
- supported
SelectDB Cloud version is >= 2.2.x
Sink Options
Name | Type | Required | Default | Description |
---|---|---|---|---|
load-url | String | Yes | - | SelectDB Cloud warehouse http address, the format is warehouse_ip:http_port |
jdbc-url | String | Yes | - | SelectDB Cloud warehouse jdbc address, the format is warehouse_ip:mysql_port |
cluster-name | String | Yes | - | SelectDB Cloud cluster name |
username | String | Yes | - | SelectDB Cloud user username |
password | String | Yes | - | SelectDB Cloud user password |
sink.enable-2pc | bool | No | true | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. SelectDB uses cache files to load data. When the amount of data is large, cached data may become invalid (the default expiration time is 1 hour). If you encounter a large amount of data write loss, please configure sink.enable-2pc to false. |
table.identifier | String | Yes | - | The name of SelectDB Cloud table, the format is database.table |
sink.enable-delete | bool | No | false | Whether to enable deletion. This option requires SelectDB Cloud table to enable batch delete function, and only supports Unique model. |
sink.max-retries | int | No | 3 | the max retry times if writing records to database failed |
sink.buffer-size | int | No | 10 1024 1024 (1MB) | the buffer size to cache data for stream load. |
sink.buffer-count | int | No | 10000 | the buffer count to cache data for stream load. |
selectdb.config | map | yes | - | This option is used to support operations such as insert , delete , and update when automatically generate sql,and supported formats. |
Data Type Mapping
SelectDB Cloud Data type | SeaTunnel Data type |
---|---|
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT TINYINT |
INT | INT SMALLINT TINYINT |
BIGINT | BIGINT INT SMALLINT TINYINT |
LARGEINT | BIGINT INT SMALLINT TINYINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE FLOAT |
DECIMAL | DECIMAL DOUBLE FLOAT |
DATE | DATE |
DATETIME | TIMESTAMP |
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
ARRAY | ARRAY |
MAP | MAP |
JSON | STRING |
HLL | Not supported yet |
BITMAP | Not supported yet |
QUANTILE_STATE | Not supported yet |
STRUCT | Not supported yet |
Supported import data formats
The supported formats include CSV and JSON
Task Example
Simple:
The following example describes writing multiple data types to SelectDBCloud, and users need to create corresponding tables downstream
env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}
source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "json"
}
}
}
Use JSON format to import data
sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "json"
}
}
}
Use CSV format to import data
sink {
SelectDBCloud {
load-url = "warehouse_ip:http_port"
jdbc-url = "warehouse_ip:mysql_port"
cluster-name = "Cluster"
table.identifier = "test.test"
username = "admin"
password = "******"
selectdb.config {
file.type = "csv"
file.column_separator = ","
file.line_delimiter = "\n"
}
}
}