ClickHouse 是一个用于联机分析(OLAP)的列式数据库管理系统,主要面向 OLAP 场景。目前 Apache Flink 官方未提供写入
读取 ClickHouse 数据的连接器。Apache StreamPark 基于 ClickHouse 支持的访问形式 HTTP 客户端、JDBC 驱动封装了 ClickHouseSink 用于向 ClickHouse 实时写入数据。
ClickHouse 写入不支持事务,使用 JDBC 向其中写入数据可提供至少一次的处理语义。使用 HTTP 客户端异步写入,对异步写入重试多次失败的数据会写入外部组件,最终通过人为介入来恢复数据,实现最终数据一致。
JDBC 同步写入
ClickHouse 提供了 JDBC 驱动,需要先导入 ClickHouse 的 JDBC 驱动包:
<dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.3.1</version></dependency>
常规方式写入
常规方式下创建clickhouse jdbc连接的方式如下:
Java
import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;public class ClickHouseUtil {private static Connection connection;public static Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {Class.forName("ru.yandex.clickhouse.ClickHouseDriver");String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;connection = DriverManager.getConnection(address);return connection;}public static Connection getConn(String host, int port) throws SQLException, ClassNotFoundException {return getConn(host,port,"default");}public static Connection getConn() throws SQLException, ClassNotFoundException {return getConn("node-01",8123);}public void close() throws SQLException {connection.close();}}
以上将各项参数拼接为请求 url 的方式较繁琐,并且是硬编码的方式写死的,非常的不灵敏.
Apache StreamPark™ 方式写入
用StreamPark接入 clickhouse的数据, 只需要按照规定的格式定义好配置文件然后编写代码即可,配置和代码如下在StreamPark中clickhose jdbc 约定的配置见配置列表,运行程序样例为scala,如下:
配置信息
clickhouse:sink:#写入节点地址jdbcUrl: jdbc:clickhouse://127.0.0.1:8123,192.168.1.2:8123socketTimeout: 3000000database: testuser: $userpassword: $password#写入结果表及对应的字段,全部可不指定字段targetTable: orders(userId,siteId)batch:size: 1000delaytime: 6000000
写入clickhouse
Scala
import org.apache.streampark.flink.core.scala.FlinkStreamingimport org.apache.streampark.flink.core.scala.sink.ClickHouseSinkimport org.apache.flink.api.scala._object ClickHouseSinkApp extends FlinkStreaming {override def handle(): Unit = {//要写出的表结构(在clickhosue中已经存在)val createTable ="""|create TABLE test.orders(|userId UInt16,|orderId UInt16,|siteId UInt8,|cityId UInt8,|orderStatus UInt8,|price Float64,|quantity UInt8,|timestamp UInt16|)ENGINE = TinyLog;|""".stripMargin// 1) 接入数据源val source = context.addSource(new TestSource)// 2) 写出数据ClickHouseSink().syncSink[TestEntity](source)(x => {s"(${x.userId},${x.siteId})"}).setParallelism(1)}}class Order(val marketId: String, val timestamp: String) extends Serializable
clickhouse 可支持多个节点均衡写入,只需在jdbcUrl配置可写入的节点即可
由于ClickHouse单次插入的延迟比较高,建议设置 batch.size 来批量插入数据提高性能,同时为了提高实时性, 支持按照数据量或者间隔时间 batch.delaytime 来批次写入
在ClickHouseSink的实现中,若最后一批数据的数目不足BatchSize,则会在关闭连接时候插入剩余数据
HTTP 异步写入
jdbc的方式连接写入数据,在数据量较小的情况下可以采用,而在实际生产中更多的是采用async http的方式更高效的,更快速的写入
常规方式写入
clickhouse INSERT 必须通过POST方法来插入数据 常规操作如下:
$ echo 'INSERT INTO t VALUES (1),(2),(3)' | curl 'http://localhost:8123/' --data-binary @-
上述方式操作较简陋,当然也可以使用java 代码来进行写入, StreamPark 对 http post 写入方式进行封装增强,增加缓存、异步写入、失败重试、达到重试阈值后数据备份至外部组件(kafka,mysql,hdfs,hbase) 等功能,以上功能只需要按照规定的格式定义好配置文件然后编写代码即可,配置和代码如下
Apache StreamPark™ 方式写入
在StreamPark中clickhose jdbc 约定的配置见配置列表,运行程序样例为scala,如下:
这里采用asynchttpclient作为http异步客户端来进行写入,先导入 asynchttpclient 的jar
<!--clickhouse async need asynchttpclient --><dependency><groupId>org.asynchttpclient</groupId><artifactId>async-http-client</artifactId><optional>true</optional></dependency>
异步写入配置及失败恢复组件配置
clickhouse:sink:hosts: 127.0.0.1:8123,192.168.1.2:8123socketTimeout: 3000000database: testuser: $userpassword: $passwordtargetTable: test.orders(userId,siteId)batch:size: 1delaytime: 60000threshold:bufferSize: 10# 异步写入的并发数numWriters: 4# 缓存队列大小queueCapacity: 100delayTime: 10requestTimeout: 600retries: 1# 成功响应码successCode: 200failover:table: chfailover# 达到失败最大写入次数后,数据备份的组件storage: kafka #kafka|mysql|hbase|hdfsmysql:driverClassName: com.mysql.cj.jdbc.DriverjdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=trueusername: $userpassword: $passkafka:bootstrap.servers: localhost:9092topic: test1group.id: user_01auto.offset.reset: latesthbase:zookeeper.quorum: localhostzookeeper.property.clientPort: 2181hdfs:path: /data/chfailovernamenode: hdfs://localhost:8020user: hdfs
写入clickhouse
Scala
import org.apache.streampark.flink.core.scala.FlinkStreamingimport org.apache.streampark.flink.core.scala.sink.ClickHouseSinkimport org.apache.flink.api.scala._object ClickHouseSinkApp extends FlinkStreaming {override def handle(): Unit = {val createTable ="""|create TABLE test.orders(|userId UInt16,|orderId UInt16,|siteId UInt8,|cityId UInt8,|orderStatus UInt8,|price Float64,|quantity UInt8,|timestamp UInt16|)ENGINE = TinyLog;|""".stripMarginprintln(createTable)val source = context.addSource(new TestSource)// 异步写入ClickHouseSink().sink[TestEntity](source)(x => {s"(${x.userId},${x.siteId})"}).setParallelism(1)}}class Order(val marketId: String, val timestamp: String) extends Serializable
由于ClickHouse单次插入的延迟比较高,小数据量频繁写入会造成clickhouse server 频繁排序合并分区,建议使用异步提交方式,设置合理阈值提高性能
由于ClickHouse 异步写入失败会重新将数据添加至缓存队列,可能造成同一窗口数据分两批次写入,实时性要求高的场景建议全面测试clickhouse的稳定性
异步写入数据达到重试最大值后,会将数据备份至外部组件,在此时才会初始化组件连接,建议确保 failover 组件的可用性
其他配置
其他的所有的配置都必须遵守 ClickHouseDataSource 连接池的配置,具体可配置项和各个参数的作用请参考clickhouse-jdbc官网文档.
