ClickHouse is a columnar database management system (DBMS) for online analytics (OLAP). Currently, Apache Flink does not officially provide a connector for writing to ClickHouse and reading from ClickHouse. Based on the access form supported by ClickHouse - HTTP client and JDBC driver, StreamPark encapsulates ClickHouseSink for writing data to ClickHouse in real-time.

ClickHouse writes do not support transactions, using JDBC write data to it could provide (AT_LEAST_ONCE) semanteme. Using the HTTP client to write asynchronously, it will retry the asynchronous write multiple times. The failed data will be written to external components (Apache Kafka, MySQL, Apache Hadoop HDFS, Apache HBase), the data will be restored manually to achieve final data consistency.

JDBC synchronous write

ClickHouseprovides a JDBC driver,JDBC driver package of ClickHouse need to be import first

  1. <dependency>
  2. <groupId>ru.yandex.clickhouse</groupId>
  3. <artifactId>clickhouse-jdbc</artifactId>
  4. <version>0.3.1</version>
  5. </dependency>

Write in the normal way

The conventional way to create a clickhouse jdbc connection:

Java

  1. import java.sql.Connection;
  2. import java.sql.DriverManager;
  3. import java.sql.SQLException;
  4. public class ClickHouseUtil {
  5. private static Connection connection;
  6. public static Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
  7. Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
  8. String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
  9. connection = DriverManager.getConnection(address);
  10. return connection;
  11. }
  12. public static Connection getConn(String host, int port) throws SQLException, ClassNotFoundException {
  13. return getConn(host,port,"default");
  14. }
  15. public static Connection getConn() throws SQLException, ClassNotFoundException {
  16. return getConn("node-01",8123);
  17. }
  18. public void close() throws SQLException {
  19. connection.close();
  20. }
  21. }

The method of splicing various parameters into the request url is cumbersome and hard-coded, which is very inflexible.

Write with Apache StreamPark™

To access ClickHouse data with StreamPark, you only need to define the configuration file in the specified format and then write code. The configuration and code are as follows. The configuration of ClickHose JDBC in StreamPark is in the configuration list, and the sample running program is scala

configuration list

  1. clickhouse:
  2. sink:
  3. jdbcUrl: jdbc:clickhouse://127.0.0.1:8123,192.168.1.2:8123
  4. socketTimeout: 3000000
  5. database: test
  6. user: $user
  7. password: $password
  8. targetTable: orders(userId,siteId)
  9. batch:
  10. size: 1000
  11. delaytime: 6000000

Write to ClickHouse

Scala

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.sink.ClickHouseSink
  3. import org.apache.flink.api.scala._
  4. object ClickHouseSinkApp extends FlinkStreaming {
  5. override def handle(): Unit = {
  6. val createTable =
  7. """
  8. |create TABLE test.orders(
  9. |userId UInt16,
  10. |orderId UInt16,
  11. |siteId UInt8,
  12. |cityId UInt8,
  13. |orderStatus UInt8,
  14. |price Float64,
  15. |quantity UInt8,
  16. |timestamp UInt16
  17. |)ENGINE = TinyLog;
  18. |""".stripMargin
  19. val source = context.addSource(new TestSource)
  20. ClickHouseSink().syncSink[TestEntity](source)(x => {
  21. s"(${x.userId},${x.siteId})"
  22. }).setParallelism(1)
  23. }
  24. }
  25. class Order(val marketId: String, val timestamp: String) extends Serializable

ClickHouse can support balanced writing of multiple nodes, you only need to configure writable nodes in JDBC URL Since ClickHouse has a relatively high delay for single insertion, it is recommended to set the batch. size to insert data in batches to improve performance. At the same time, to improve real-time performance, it supports batch writing according to data volume or interval time. In the implementation of ClickHouseSink, if the number of the last batch of data is less than BatchSize, the remaining data will be inserted when the connection is closed.

HTTP async write

In the case of a small amount of data, you can use JDBC to connect and write data. In actual production,is more using async HTTP to write data more efficiently and quickly.

Write in the normal way

Clickhouse INSERT must insert data through the POST method. The general operation is as follows:

  1. $ echo 'INSERT INTO t VALUES (1),(2),(3)' | curl 'http://localhost:8123/' --data-binary @-

The operation of the above method is relatively simple. Sure java could also be used for writing. StreamPark adds many functions to the http post writing method, including encapsulation enhancement, adding cache, asynchronous writing, failure retry, and data backup after reaching the retry threshold, To external components (Apache Kafka, MySQL, HDFS, Apache HBase), etc., the above functions only need to define the configuration file in the prescribed format, and write the code.

Write to ClickHouse

The configuration of ClickHose JDBC in StreamPark is in the configuration list, and the sample running program is scala, as follows: asynchttpclient is used as an HTTP asynchronous client for writing. first, import the jar of asynchttpclient

  1. <!--clickhouse async need asynchttpclient -->
  2. <dependency>
  3. <groupId>org.asynchttpclient</groupId>
  4. <artifactId>async-http-client</artifactId>
  5. <optional>true</optional>
  6. </dependency>

Asynchronous write configuration and failure recovery component configuration

  1. clickhouse:
  2. sink:
  3. hosts: 127.0.0.1:8123,192.168.1.2:8123
  4. socketTimeout: 3000000
  5. database: test
  6. user: $user
  7. password: $password
  8. targetTable: test.orders(userId,siteId)
  9. batch:
  10. size: 1
  11. delaytime: 60000
  12. threshold:
  13. bufferSize: 10
  14. # Concurrent number of asynchronous writes
  15. numWriters: 4
  16. # cache queue size
  17. queueCapacity: 100
  18. delayTime: 10
  19. requestTimeout: 600
  20. retries: 1
  21. # success response code
  22. successCode: 200
  23. failover:
  24. table: chfailover
  25. # After reaching the maximum number of failed writes, the components of the data backup
  26. storage: kafka #kafka|mysql|hbase|hdfs
  27. mysql:
  28. driverClassName: com.mysql.cj.jdbc.Driver
  29. jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
  30. username: $user
  31. password: $pass
  32. kafka:
  33. bootstrap.servers: localhost:9092
  34. topic: test1
  35. group.id: user_01
  36. auto.offset.reset: latest
  37. hbase:
  38. zookeeper.quorum: localhost
  39. zookeeper.property.clientPort: 2181
  40. hdfs:
  41. path: /data/chfailover
  42. namenode: hdfs://localhost:8020
  43. user: hdfs

Write to clickhouse

Scala

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.sink.ClickHouseSink
  3. import org.apache.flink.api.scala._
  4. object ClickHouseSinkApp extends FlinkStreaming {
  5. override def handle(): Unit = {
  6. val createTable =
  7. """
  8. |create TABLE test.orders(
  9. |userId UInt16,
  10. |orderId UInt16,
  11. |siteId UInt8,
  12. |cityId UInt8,
  13. |orderStatus UInt8,
  14. |price Float64,
  15. |quantity UInt8,
  16. |timestamp UInt16
  17. |)ENGINE = TinyLog;
  18. |""".stripMargin
  19. println(createTable)
  20. val source = context.addSource(new TestSource)
  21. // asynchronous write
  22. ClickHouseSink().sink[TestEntity](source)(x => {
  23. s"(${x.userId},${x.siteId})"
  24. }).setParallelism(1)
  25. }
  26. }
  27. class Order(val marketId: String, val timestamp: String) extends Serializable

Due to the high latency of single insertion of ClickHouse, partitions will be merged too frequently by the ClickHouse server, because of frequent writing of small data.It is recommended to use the asynchronous submission method and set a reasonable threshold to improve performance.

ClickHouse will re-add data to the cache queue when asynchronous writing fails, it may cause the same window of data to be written in two batches. It is recommended to fully test the stability of ClickHouse in scenarios with high real-time requirements.

After the asynchronous write data reaches the maximum retry value, the data will be backed up to the external component, and the component connection will be initialized at this time. It is recommended to ensure the availability of the failover component.

Other configuration

All other configurations must comply with the ClickHouseDataSource connection pool configuration. For specific configurable items and the role of each parameter, please refer to the ClickHouse-JDBC official website documentation.