Apache Doris Connector

Apache Doris is a high-performance, and real-time analytical database, which could support high-concurrent point query scenarios. Apache StreamPark encapsulates DoirsSink for writing data to Doris in real-time, based on its stream loads.

Write with Apache StreamPark™

DorisSink only supports JSON format (single-layer) writing currently, such as: {"id":1,"name":"streampark"} The example of the running program is Java, as follows:

Configuration list

  1. doris.sink:
  2. fenodes: 127.0.0.1:8030 # doris fe http url
  3. database: test # doris database
  4. table: test_tbl # doris table
  5. user: root
  6. password: 123456
  7. batchSize: 100 # doris sink batch size per streamload
  8. intervalMs: 3000 # doris sink the time interval of each streamload
  9. maxRetries: 1 # stream load retries
  10. streamLoad: # doris streamload own parameters
  11. format: json
  12. strip_outer_array: true
  13. max_filter_ratio: 1

Write data to Doris

Java

  1. package org.apache.streampark.test.flink.java.datastream;
  2. import org.apache.streampark.flink.core.StreamEnvConfig;
  3. import org.apache.streampark.flink.core.java.sink.doris.DorisSink;
  4. import org.apache.streampark.flink.core.java.source.KafkaSource;
  5. import org.apache.streampark.flink.core.scala.StreamingContext;
  6. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8. import org.apache.flink.streaming.api.datastream.DataStream;
  9. public class DorisJavaApp {
  10. public static void main(String[] args) {
  11. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  12. StreamingContext context = new StreamingContext(envConfig);
  13. DataStream<String> source = new KafkaSource<String>(context)
  14. .getDataStream()
  15. .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value)
  16. .returns(String.class);
  17. new DorisSink<String>(context).sink(source);
  18. context.start();
  19. }
  20. }

It is recommended to set batchSize to insert data in batches to improve performance. At the same time, to improve real-time performance, intervalMs is supported for batch writing. The number of streamload retries can be increased by setting maxRetries.