Apache Flink 官方提供了JDBC的连接器,用于从 JDBC 中读取或者向其中写入数据,可提供至少一次(AT_LEAST_ONCE)的处理语义。
Apache StreamPark 中基于两阶段提交实现了精确一次(EXACTLY_ONCE)语义的 JdbcSink 类,并且采用 HikariCP 为连接池,让数据的读取和写入更简单更准确。
JDBC 信息配置
在 Apache StreamPark 中,JDBC Connector 的实现用到了 HikariCP 连接池,相关的配置在jdbc 的 namespace 下,约定的配置如下:
jdbc:semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONEusername: rootpassword: 123456driverClassName: com.mysql.jdbc.DriverconnectionTimeout: 30000idleTimeout: 30000maxLifetime: 30000maximumPoolSize: 6jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
semantic 语义配置
semantic 这个参数是在 JdbcSink写时候的语义,仅对 JdbcSink 有效,JdbcSource 会自动屏蔽该参数,有三个可选项
- EXACTLY_ONCE
- AT_LEAST_ONCE
- NONE
EXACTLY_ONCE
如果JdbcSink配置了 EXACTLY_ONCE语义,则底层采用了两阶段提交的实现方式来完成写入,此时要flink配合开启Checkpointing才会生效,如何开启checkpoint请参考第二章关于checkpoint配置部分
AT_LEAST_ONCE && NONE
默认不指定会采用NONE语义,这两种配置效果一样,都是保证 至少一次 语义
开启
EXACTLY_ONCE精确一次的好处是显而易见的,保证了数据的准确性,但成本也是高昂的,需要checkpoint的支持,底层模拟了事务的提交读,对实时性有一定的损耗,如果你的业务对数据的准确性要求不是那么高,则建议采用AT_LEAST_ONCE语义
其他配置
除了特殊的semantic 配置项之外,其他的所有的配置都必须遵守 HikariCP 连接池的配置,具体可配置项和各个参数的作用请参考光 HikariCP官网文档.
JDBC 读取数据
在StreamPark中JdbcSource用来读取数据,并且根据数据的offset做到数据读时可回放,我们看看具体如何用JdbcSource读取数据,假如需求如下
- 从
t_order表中读取数据,以timestamp字段为参照,起始值为2020-12-16 12:00:00往后抽取数据 - 将读取到的数据构造成
Order对象返回
jdbc配置和读取代码如下
配置
jdbc:driverClassName: com.mysql.jdbc.DriverjdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=trueusername: rootpassword: 123456
Scala
import org.apache.streampark.flink.core.scala.FlinkStreamingimport org.apache.streampark.flink.core.scala.source.JdbcSourceimport org.apache.flink.api.scala._object MySQLSourceApp extends FlinkStreaming {override def handle(): Unit = {JdbcSource().getDataStream[Order](lastOne => {//防止抽取过于密集,间隔5秒抽取一次数据Thread.sleep(5000);val laseOffset = if (lastOne == null) "2020-12-16 12:00:00" else lastOne.timestamps"select * from t_order where timestamp > '$laseOffset' order by timestamp asc "},_.map(x => new Order(x("market_id").toString, x("timestamp").toString))).print()}}class Order(val marketId: String, val timestamp: String) extends Serializable
Java
import org.apache.streampark.flink.core.java.function.SQLQueryFunction;import org.apache.streampark.flink.core.java.function.SQLResultFunction;import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;import org.apache.streampark.flink.core.java.source.JdbcSource;import org.apache.streampark.flink.core.scala.StreamingContext;import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;import org.apache.flink.api.common.typeinfo.TypeInformation;import java.io.Serializable;import java.util.ArrayList;import java.util.List;public class MySQLJavaApp {public static void main(String[] args) {StreamEnvConfig envConfig = new StreamEnvConfig(args, null);StreamingContext context = new StreamingContext(envConfig);new JdbcSource<Order>(context).getDataStream((SQLQueryFunction<Order>) lastOne -> {//防止抽取过于密集,间隔5秒抽取一次数据Thread.sleep(5000);Serializable lastOffset = lastOne == null? "2020-12-16 12:00:00": lastOne.timestamp;return String.format("select * from t_order " +"where timestamp > '%s' " +"order by timestamp asc ",lastOffset);},(SQLResultFunction<Order>) iterable -> {List<Order> result = new ArrayList<>();iterable.forEach(item -> {Order Order = new Order();Order.marketId = item.get("market_id").toString();Order.timestamp = Long.parseLong(item.get("timestamp").toString());result.add(Order);});return result;}).returns(TypeInformation.of(Order.class)).print();context.start();}}
以java api为例,这里要传入两个参数
SQLQueryFunction<T> queryFuncSQLResultFunction<T> resultFunc
queryFunc获取一条sql
queryFunc是要传入一个SQLQueryFunction类型的function,该function用于获取查询sql的,会将最后一条记录返回给开发者,然后需要开发者根据最后一条记录返回一条新的查询sql,queryFunc定义如下:
/*** @author benjobs*/@FunctionalInterfacepublic interface SQLQueryFunction<T> extends Serializable {/*** 获取要查询的SQL** @return* @throws Exception*/String query(T last) throws Exception;}
所以上面的代码中,第一次上来lastOne(最后一条记录)为null,会判断一下,为null则取需求里默认的offset,查询的sql里根据timestamp字段正序排,这样在第一次查询之后,会返回最后的那条记录,下次直接可以使用这条记录作为下一次查询的根据
JdbcSource实现了CheckpointedFunction,即当程序开启 checkpoint 后,会将这些诸如laseOffset的状态数据保存到state backend,这样程序挂了,再次启动会自动从checkpoint中恢复offset,会接着上次的位置继续读取数据, 一般在生产环境,更灵活的方式是将lastOffset写入如redis等存储中,每次查询完之后再将最后的记录更新到redis,这样即便程序意外挂了,再次启动,也可以从redis中获取到最后的offset进行数据的抽取,也可以很方便的人为的任意调整这个offset进行数据的回放
resultFunc 处理查询到的数据
resultFunc的参数类型是SQLResultFunction<T>,是将一个查询到的结果集放到Iterable<Map<String, ?>>中返回给开发者,可以看到返回了一个迭代器Iterable,迭代器每次迭代返回一个Map,该Map里记录了一行完整的记录,Map的key为查询字段,value为值,SQLResultFunction<T>定义如下
/*** @author benjobs*/@FunctionalInterfacepublic interface SQLResultFunction<T> extends Serializable {/*** 将查下结果以Iterable<Map>的方式返回,开发者去实现转成对象.** @param map* @return*/Iterable<T> result(Iterable<Map<String, ?>> iterable);}
JDBC 读取写入
StreamPark中JdbcSink是用来写入数据,我们看看具体如何用JdbcSink写入数据,假如需求是需要从kakfa中读取数据,写入到mysql
配置
kafka.source:bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092pattern: usergroup.id: user_02auto.offset.reset: earliest # (earliest | latest)...jdbc:semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONEdriverClassName: com.mysql.jdbc.DriverjdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=trueusername: rootpassword: 123456
配置里
jdbc下的 semantic 是写入的语义,在上面Jdbc信息配置有介绍,该配置只会在JdbcSink下生效,StreamPark中基于两阶段提交实现了 EXACTLY_ONCE 语义, 这本身需要被操作的数据库(mysql,oracle,MariaDB,MS SQL Server)等支持事务,理论上所有支持标准Jdbc事务的数据库都可以做到EXACTLY_ONCE(精确一次)的写入Scala
import org.apache.streampark.common.util.JsonUtilsimport org.apache.streampark.flink.core.scala.FlinkStreamingimport org.apache.streampark.flink.core.scala.sink.JdbcSinkimport org.apache.streampark.flink.core.scala.source.KafkaSourceimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.api.java.typeutils.TypeExtractor.getForClassimport org.apache.flink.api.scala._import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchemaobject JdbcSinkApp extends FlinkStreaming {override def handle(): Unit = {val source = KafkaSource().getDataStream[String]().map(x => JsonUtils.read[User](x.value))JdbcSink().sink[User](source)(user =>s"""|insert into t_user(`name`,`age`,`gender`,`address`)|value('${user.name}',${user.age},${user.gender},'${user.address}')|""".stripMargin)}}case class User(name:String,age:Int,gender:Int,address:String)
Java
import com.fasterxml.jackson.databind.ObjectMapper;import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;import org.apache.streampark.flink.core.java.source.KafkaSource;import org.apache.streampark.flink.core.scala.StreamingContext;import org.apache.streampark.flink.core.scala.source.KafkaRecord;import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;import org.apache.kafka.clients.consumer.ConsumerRecord;import java.io.Serializable;import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;public class JdbcSinkJavaApp {public static void main(String[] args) {StreamEnvConfig envConfig = new StreamEnvConfig(args, null);StreamingContext context = new StreamingContext(envConfig);ObjectMapper mapper = new ObjectMapper();DataStream<JavaUser> source = new KafkaSource<String>(context).getDataStream().map((MapFunction<KafkaRecord<String>, JavaUser>) value ->mapper.readValue(value.value(), JavaUser.class));new JdbcSink<JavaUser>(context).sql((SQLFromFunction<JavaUser>) JavaUser::toSql).sink(source);context.start();}}class JavaUser implements Serializable {String name;Integer age;Integer gender;String address;public String toSql() {return String.format("insert into t_user(`name`,`age`,`gender`,`address`) value('%s',%d,%d,'%s')",name,age,gender,address);}}
根据数据流生成目标SQL
在写入的时候,需要知道具体写入的sql语句,该sql语句需要开发者通过function的方式提供,在scala api中,直接在sink方法后跟上function即可,java api 则是通过sql()方法传入一个SQLFromFunction类型的function
下面以java api为例说明,我们来看看javaapi 中提供sql的function方法的定义
/*** @author benjobs*/@FunctionalInterfacepublic interface SQLFromFunction<T> extends Serializable {/*** @param bean* @return*/String from(T bean);}
SQLFromFunction上的泛型<T>即为DataStream里实际的数据类型,该function里有一个方法form(T bean),这个bean即为当前DataStream中的一条具体数据,会将该数据返给开发者,开发者来决定基于这条数据,生成一条具体可以往数据库中插入的sql
设置写入批次大小
在 非 EXACTLY_ONCE(精确一次的语义下)可以适当的设置batch.size来提高Jdbc写入的性能(前提是业务允许的情况下),具体配置如下
jdbc:semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONEdriverClassName: com.mysql.jdbc.DriverjdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=trueusername: rootpassword: 123456batch.size: 1000
这样一来就不是来一条数据就立即写入,而是积攒一个匹配然后执行批量插入
这个设置仅在非
EXACTLY_ONCE语义下生效,带来的好处是可以提高Jdbc写入的性能,一次大批量的插入数据,缺点是数据写入势必会有延迟,请根据实际使用情况谨慎使用
