InfluxDB source connector

Description

Read external data source data through InfluxDB.

Key features

supports query SQL and can achieve projection effect.

Options

name type required default value
url string yes -
sql string yes -
schema config yes -
database string yes
username string no -
password string no -
lower_bound long no -
upper_bound long no -
partition_num int no -
split_column string no -
epoch string no n
connect_timeout_ms long no 15000
query_timeout_sec int no 3
common-options config no -

url

the url to connect to influxDB e.g.

  1. http://influxdb-host:8086

sql [string]

The query sql used to search data

  1. select name,age from test

schema [config]

fields [Config]

The schema information of upstream data. e.g.

  1. schema {
  2. fields {
  3. name = string
  4. age = int
  5. }
  6. }

database [string]

The influxDB database

username [string]

the username of the influxDB when you select

password [string]

the password of the influxDB when you select

split_column [string]

the split_column of the influxDB when you select

Tips:

  • influxDB tags is not supported as a segmented primary key because the type of tags can only be a string
  • influxDB time is not supported as a segmented primary key because the time field cannot participate in mathematical calculation
  • Currently, split_column only supports integer data segmentation, and does not support float, string, date and other types.

upper_bound [long]

upper bound of the split_columncolumn

lower_bound [long]

lower bound of the split_column column

  1. split the $split_column range into $partition_num parts
  2. if partition_num is 1, use the whole `split_column` range
  3. if partition_num < (upper_bound - lower_bound), use (upper_bound - lower_bound) partitions
  4. eg: lower_bound = 1, upper_bound = 10, partition_num = 2
  5. sql = "select * from test where age > 0 and age < 10"
  6. split result
  7. split 1: select * from test where ($split_column >= 1 and $split_column < 6) and ( age > 0 and age < 10 )
  8. split 2: select * from test where ($split_column >= 6 and $split_column < 11) and ( age > 0 and age < 10 )

partition_num [int]

the partition_num of the InfluxDB when you select

Tips: Ensure that upper_bound minus lower_bound is divided bypartition_num, otherwise the query results will overlap

epoch [string]

returned time precision

  • Optional values: H, m, s, MS, u, n
  • default value: n

query_timeout_sec [int]

the query_timeout of the InfluxDB when you select, in seconds

connect_timeout_ms [long]

the timeout for connecting to InfluxDB, in milliseconds

common options

Source plugin common parameters, please refer to Source Common Options for details

Examples

Example of multi parallelism and multi partition scanning

  1. source {
  2. InfluxDB {
  3. url = "http://influxdb-host:8086"
  4. sql = "select label, value, rt, time from test"
  5. database = "test"
  6. upper_bound = 100
  7. lower_bound = 1
  8. partition_num = 4
  9. split_column = "value"
  10. schema {
  11. fields {
  12. label = STRING
  13. value = INT
  14. rt = STRING
  15. time = BIGINT
  16. }
  17. }
  18. }

Example of not using partition scan

  1. source {
  2. InfluxDB {
  3. url = "http://influxdb-host:8086"
  4. sql = "select label, value, rt, time from test"
  5. database = "test"
  6. schema {
  7. fields {
  8. label = STRING
  9. value = INT
  10. rt = STRING
  11. time = BIGINT
  12. }
  13. }
  14. }

Changelog

2.2.0-beta 2022-09-26

  • Add InfluxDB Source Connector