IoTDB source connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Description

Read external data source data through IoTDB.

There is a conflict of thrift version between IoTDB and Spark.Therefore, you need to execute rm -f $SPARK_HOME/jars/libthrift* and cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/ to resolve it.

Using Dependency

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/plugins/.

For SeaTunnel Zeta Engine

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/lib/.

Key features

supports query SQL and can achieve projection effect.

Supported DataSource Info

Datasource Supported Versions Url
IoTDB >= 0.13.0 localhost:6667

Data Type Mapping

IotDB Data Type SeaTunnel Data Type
BOOLEAN BOOLEAN
INT32 TINYINT
INT32 SMALLINT
INT32 INT
INT64 BIGINT
FLOAT FLOAT
DOUBLE DOUBLE
TEXT STRING

Source Options

Name Type Required Default Value Description
node_urls string yes - IoTDB cluster address, the format is "host1:port" or "host1:port,host2:port"
username string yes - IoTDB user username
password string yes - IoTDB user password
sql string yes - execute sql statement
schema config yes - the data schema
fetch_size int no - the fetch_size of the IoTDB when you select
lower_bound long no - the lower_bound of the IoTDB when you select
upper_bound long no - the upper_bound of the IoTDB when you select
num_partitions int no - the num_partitions of the IoTDB when you select
thrift_default_buffer_size int no - the thrift_default_buffer_size of the IoTDB when you select
thrift_max_frame_size int no - the thrift max frame size
enable_cache_leader boolean no - enable_cache_leader of the IoTDB when you select
version string no - SQL semantic version used by the client, The possible values are: V_0_12, V_0_13
common-options no -

split partitions

we can split the partitions of the IoTDB and we used time column split

num_partitions [int]

split num

upper_bound [long]

upper bound of the time column

lower_bound [long]

lower bound of the time column

  1. split the time range into numPartitions parts
  2. if numPartitions is 1, use the whole time range
  3. if numPartitions < (upper_bound - lower_bound), use (upper_bound - lower_bound) partitions
  4. eg: lower_bound = 1, upper_bound = 10, numPartitions = 2
  5. sql = "select * from test where age > 0 and age < 10"
  6. split result
  7. split 1: select * from test where (time >= 1 and time < 6) and ( age > 0 and age < 10 )
  8. split 2: select * from test where (time >= 6 and time < 11) and ( age > 0 and age < 10 )

common options

Source plugin common parameters, please refer to Source Common Options for details

Examples

  1. env {
  2. parallelism = 2
  3. job.mode = "BATCH"
  4. }
  5. source {
  6. IoTDB {
  7. node_urls = "localhost:6667"
  8. username = "root"
  9. password = "root"
  10. sql = "SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device"
  11. schema {
  12. fields {
  13. ts = timestamp
  14. device_name = string
  15. temperature = float
  16. moisture = bigint
  17. c_int = int
  18. c_bigint = bigint
  19. c_float = float
  20. c_double = double
  21. c_string = string
  22. c_boolean = boolean
  23. }
  24. }
  25. }
  26. }
  27. sink {
  28. Console {
  29. }
  30. }

Upstream IoTDB data format is the following:

  1. IoTDB> SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device;
  2. +------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
  3. | Time| Device| temperature| moisture| c_int| c_bigint| c_float| c_double| c_string| c_boolean|
  4. +------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
  5. |2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1| 21474836470| 1.0f| 1.0d| abc| true|
  6. |2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 2| 21474836470| 2.0f| 2.0d| abc| true|
  7. |2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 3| 21474836470| 3.0f| 3.0d| abc| true|
  8. +------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+

Loaded to SeaTunnelRow data format is the following:

ts device_name temperature moisture c_int c_bigint c_float c_double c_string c_boolean
1664035200001 root.test_group.device_a 36.1 100 1 21474836470 1.0f 1.0d abc true
1664035200001 root.test_group.device_b 36.2 101 2 21474836470 2.0f 2.0d abc true
1664035200001 root.test_group.device_c 36.3 102 3 21474836470 3.0f 3.0d abc true

Changelog

2.2.0-beta 2022-09-26

  • Add IoTDB Source Connector

2.3.0-beta 2022-10-20

  • [Improve] Improve IoTDB Source Connector (2917)
    • Support extract timestamp、device、measurement from SeaTunnelRow
    • Support TINYINT、SMALLINT
    • Support flush cache to database before prepareCommit