ClickHouse 是一个用于联机分析(OLAP)的列式数据库管理系统,主要面向 OLAP 场景。目前 Apache Flink 官方未提供写入 读取 ClickHouse 数据的连接器。Apache StreamPark 基于 ClickHouse 支持的访问形式 HTTP 客户端JDBC 驱动封装了 ClickHouseSink 用于向 ClickHouse 实时写入数据。

ClickHouse 写入不支持事务,使用 JDBC 向其中写入数据可提供至少一次的处理语义。使用 HTTP 客户端异步写入,对异步写入重试多次失败的数据会写入外部组件,最终通过人为介入来恢复数据,实现最终数据一致。

JDBC 同步写入

ClickHouse 提供了 JDBC 驱动,需要先导入 ClickHouse 的 JDBC 驱动包:

  1. <dependency>
  2. <groupId>ru.yandex.clickhouse</groupId>
  3. <artifactId>clickhouse-jdbc</artifactId>
  4. <version>0.3.1</version>
  5. </dependency>

常规方式写入

常规方式下创建clickhouse jdbc连接的方式如下:

Java

  1. import java.sql.Connection;
  2. import java.sql.DriverManager;
  3. import java.sql.SQLException;
  4. public class ClickHouseUtil {
  5. private static Connection connection;
  6. public static Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
  7. Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
  8. String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
  9. connection = DriverManager.getConnection(address);
  10. return connection;
  11. }
  12. public static Connection getConn(String host, int port) throws SQLException, ClassNotFoundException {
  13. return getConn(host,port,"default");
  14. }
  15. public static Connection getConn() throws SQLException, ClassNotFoundException {
  16. return getConn("node-01",8123);
  17. }
  18. public void close() throws SQLException {
  19. connection.close();
  20. }
  21. }

以上将各项参数拼接为请求 url 的方式较繁琐,并且是硬编码的方式写死的,非常的不灵敏.

Apache StreamPark™ 方式写入

StreamPark接入 clickhouse的数据, 只需要按照规定的格式定义好配置文件然后编写代码即可,配置和代码如下在StreamParkclickhose jdbc 约定的配置见配置列表,运行程序样例为scala,如下:

配置信息

  1. clickhouse:
  2. sink:
  3. #写入节点地址
  4. jdbcUrl: jdbc:clickhouse://127.0.0.1:8123,192.168.1.2:8123
  5. socketTimeout: 3000000
  6. database: test
  7. user: $user
  8. password: $password
  9. #写入结果表及对应的字段,全部可不指定字段
  10. targetTable: orders(userId,siteId)
  11. batch:
  12. size: 1000
  13. delaytime: 6000000

写入clickhouse

Scala

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.sink.ClickHouseSink
  3. import org.apache.flink.api.scala._
  4. object ClickHouseSinkApp extends FlinkStreaming {
  5. override def handle(): Unit = {
  6. //要写出的表结构(在clickhosue中已经存在)
  7. val createTable =
  8. """
  9. |create TABLE test.orders(
  10. |userId UInt16,
  11. |orderId UInt16,
  12. |siteId UInt8,
  13. |cityId UInt8,
  14. |orderStatus UInt8,
  15. |price Float64,
  16. |quantity UInt8,
  17. |timestamp UInt16
  18. |)ENGINE = TinyLog;
  19. |""".stripMargin
  20. // 1) 接入数据源
  21. val source = context.addSource(new TestSource)
  22. // 2) 写出数据
  23. ClickHouseSink().syncSink[TestEntity](source)(x => {
  24. s"(${x.userId},${x.siteId})"
  25. }).setParallelism(1)
  26. }
  27. }
  28. class Order(val marketId: String, val timestamp: String) extends Serializable

clickhouse 可支持多个节点均衡写入,只需在jdbcUrl配置可写入的节点即可

由于ClickHouse单次插入的延迟比较高,建议设置 batch.size 来批量插入数据提高性能,同时为了提高实时性, 支持按照数据量或者间隔时间 batch.delaytime 来批次写入

在ClickHouseSink的实现中,若最后一批数据的数目不足BatchSize,则会在关闭连接时候插入剩余数据

HTTP 异步写入

jdbc的方式连接写入数据,在数据量较小的情况下可以采用,而在实际生产中更多的是采用async http的方式更高效的,更快速的写入

常规方式写入

clickhouse INSERT 必须通过POST方法来插入数据 常规操作如下:

  1. $ echo 'INSERT INTO t VALUES (1),(2),(3)' | curl 'http://localhost:8123/' --data-binary @-

上述方式操作较简陋,当然也可以使用java 代码来进行写入, StreamPark 对 http post 写入方式进行封装增强,增加缓存、异步写入、失败重试、达到重试阈值后数据备份至外部组件(kafka,mysql,hdfs,hbase) 等功能,以上功能只需要按照规定的格式定义好配置文件然后编写代码即可,配置和代码如下

Apache StreamPark™ 方式写入

StreamParkclickhose jdbc 约定的配置见配置列表,运行程序样例为scala,如下:

这里采用asynchttpclient作为http异步客户端来进行写入,先导入 asynchttpclient 的jar

  1. <!--clickhouse async need asynchttpclient -->
  2. <dependency>
  3. <groupId>org.asynchttpclient</groupId>
  4. <artifactId>async-http-client</artifactId>
  5. <optional>true</optional>
  6. </dependency>

异步写入配置及失败恢复组件配置

  1. clickhouse:
  2. sink:
  3. hosts: 127.0.0.1:8123,192.168.1.2:8123
  4. socketTimeout: 3000000
  5. database: test
  6. user: $user
  7. password: $password
  8. targetTable: test.orders(userId,siteId)
  9. batch:
  10. size: 1
  11. delaytime: 60000
  12. threshold:
  13. bufferSize: 10
  14. # 异步写入的并发数
  15. numWriters: 4
  16. # 缓存队列大小
  17. queueCapacity: 100
  18. delayTime: 10
  19. requestTimeout: 600
  20. retries: 1
  21. # 成功响应码
  22. successCode: 200
  23. failover:
  24. table: chfailover
  25. # 达到失败最大写入次数后,数据备份的组件
  26. storage: kafka #kafka|mysql|hbase|hdfs
  27. mysql:
  28. driverClassName: com.mysql.cj.jdbc.Driver
  29. jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
  30. username: $user
  31. password: $pass
  32. kafka:
  33. bootstrap.servers: localhost:9092
  34. topic: test1
  35. group.id: user_01
  36. auto.offset.reset: latest
  37. hbase:
  38. zookeeper.quorum: localhost
  39. zookeeper.property.clientPort: 2181
  40. hdfs:
  41. path: /data/chfailover
  42. namenode: hdfs://localhost:8020
  43. user: hdfs

写入clickhouse

Scala

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.sink.ClickHouseSink
  3. import org.apache.flink.api.scala._
  4. object ClickHouseSinkApp extends FlinkStreaming {
  5. override def handle(): Unit = {
  6. val createTable =
  7. """
  8. |create TABLE test.orders(
  9. |userId UInt16,
  10. |orderId UInt16,
  11. |siteId UInt8,
  12. |cityId UInt8,
  13. |orderStatus UInt8,
  14. |price Float64,
  15. |quantity UInt8,
  16. |timestamp UInt16
  17. |)ENGINE = TinyLog;
  18. |""".stripMargin
  19. println(createTable)
  20. val source = context.addSource(new TestSource)
  21. // 异步写入
  22. ClickHouseSink().sink[TestEntity](source)(x => {
  23. s"(${x.userId},${x.siteId})"
  24. }).setParallelism(1)
  25. }
  26. }
  27. class Order(val marketId: String, val timestamp: String) extends Serializable

由于ClickHouse单次插入的延迟比较高,小数据量频繁写入会造成clickhouse server 频繁排序合并分区,建议使用异步提交方式,设置合理阈值提高性能

由于ClickHouse 异步写入失败会重新将数据添加至缓存队列,可能造成同一窗口数据分两批次写入,实时性要求高的场景建议全面测试clickhouse的稳定性

异步写入数据达到重试最大值后,会将数据备份至外部组件,在此时才会初始化组件连接,建议确保 failover 组件的可用性

其他配置

其他的所有的配置都必须遵守 ClickHouseDataSource 连接池的配置,具体可配置项和各个参数的作用请参考clickhouse-jdbc官网文档.