Doris source connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key features

Description

Used to read data from Doris. Doris Source will send a SQL to FE, FE will parse it into an execution plan, send it to BE, and BE will directly return the data

Supported DataSource Info

Datasource Supported versions Driver Url Maven
Doris Only Doris2.0 or later is supported. - - -

Database Dependency

Please download the support list corresponding to ‘Maven’ and copy it to the ‘$SEATNUNNEL_HOME/plugins/jdbc/lib/‘ working directory

Data Type Mapping

Doris Data type SeaTunnel Data type
INT INT
TINYINT TINYINT
SMALLINT SMALLINT
BIGINT BIGINT
LARGEINT STRING
BOOLEAN BOOLEAN
DECIMAL DECIMAL((Get the designated column’s specified column size)+1,
(Gets the designated column’s number of digits to right of the decimal point.)))
FLOAT FLOAT
DOUBLE DOUBLE
CHAR
VARCHAR
STRING
TEXT
STRING
DATE DATE
DATETIME
DATETIME(p)
TIMESTAMP
ARRAY ARRAY

Source Options

Name Type Required Default Description
fenodes string yes - FE address, the format is "fe_host:fe_http_port"
username string yes - User username
password string yes - User password
database string yes - The name of Doris database
table string yes - The name of Doris table
doris.read.field string no - Use the ‘doris.read.field’ parameter to select the doris table columns to read
query-port string no 9030 Doris QueryPort
doris.filter.query string no - Data filtering in doris. the format is “field = value”,example : doris.filter.query = “F_ID > 2”
doris.batch.size int no 1024 The maximum value that can be obtained by reading Doris BE once.
doris.request.query.timeout.s int no 3600 Timeout period of Doris scan data, expressed in seconds.
doris.exec.mem.limit long no 2147483648 Maximum memory that can be used by a single be scan request. The default memory is 2G (2147483648).
doris.request.retries int no 3 Number of retries to send requests to Doris FE.
doris.request.read.timeout.ms int no 30000
doris.request.connect.timeout.ms int no 30000

Tips

It is not recommended to modify advanced parameters at will

Task Example

This is an example of reading a Doris table and writing to Console.

  1. env {
  2. parallelism = 2
  3. job.mode = "BATCH"
  4. }
  5. source{
  6. Doris {
  7. fenodes = "doris_e2e:8030"
  8. username = root
  9. password = ""
  10. database = "e2e_source"
  11. table = "doris_e2e_table"
  12. }
  13. }
  14. transform {
  15. # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  16. # please go to https://seatunnel.apache.org/docs/transform/sql
  17. }
  18. sink {
  19. Console {}
  20. }

Use the ‘doris.read.field’ parameter to select the doris table columns to read

  1. env {
  2. parallelism = 2
  3. job.mode = "BATCH"
  4. }
  5. source{
  6. Doris {
  7. fenodes = "doris_e2e:8030"
  8. username = root
  9. password = ""
  10. database = "e2e_source"
  11. table = "doris_e2e_table"
  12. doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT"
  13. }
  14. }
  15. transform {
  16. # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  17. # please go to https://seatunnel.apache.org/docs/transform/sql
  18. }
  19. sink {
  20. Console {}
  21. }

Use ‘doris.filter.query’ to filter the data, and the parameter values are passed directly to doris

  1. env {
  2. parallelism = 2
  3. job.mode = "BATCH"
  4. }
  5. source{
  6. Doris {
  7. fenodes = "doris_e2e:8030"
  8. username = root
  9. password = ""
  10. database = "e2e_source"
  11. table = "doris_e2e_table"
  12. doris.filter.query = "F_ID > 2"
  13. }
  14. }
  15. transform {
  16. # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  17. # please go to https://seatunnel.apache.org/docs/transform/sql
  18. }
  19. sink {
  20. Console {}
  21. }