Apache Flink 官方提供了JDBC的连接器,用于从 JDBC 中读取或者向其中写入数据,可提供至少一次(AT_LEAST_ONCE)的处理语义。

Apache StreamPark 中基于两阶段提交实现了精确一次(EXACTLY_ONCE)语义的 JdbcSink 类,并且采用 HikariCP 为连接池,让数据的读取和写入更简单更准确。

JDBC 信息配置

在 Apache StreamPark 中,JDBC Connector 的实现用到了 HikariCP 连接池,相关的配置在jdbc 的 namespace 下,约定的配置如下:

  1. jdbc:
  2. semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  3. username: root
  4. password: 123456
  5. driverClassName: com.mysql.jdbc.Driver
  6. connectionTimeout: 30000
  7. idleTimeout: 30000
  8. maxLifetime: 30000
  9. maximumPoolSize: 6
  10. jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true

semantic 语义配置

semantic 这个参数是在 JdbcSink写时候的语义,仅对 JdbcSink 有效,JdbcSource 会自动屏蔽该参数,有三个可选项

  • EXACTLY_ONCE
  • AT_LEAST_ONCE
  • NONE

EXACTLY_ONCE

如果JdbcSink配置了 EXACTLY_ONCE语义,则底层采用了两阶段提交的实现方式来完成写入,此时要flink配合开启Checkpointing才会生效,如何开启checkpoint请参考第二章关于checkpoint配置部分

AT_LEAST_ONCE && NONE

默认不指定会采用NONE语义,这两种配置效果一样,都是保证 至少一次 语义

开启EXACTLY_ONCE精确一次的好处是显而易见的,保证了数据的准确性,但成本也是高昂的,需要checkpoint的支持,底层模拟了事务的提交读,对实时性有一定的损耗,如果你的业务对数据的准确性要求不是那么高,则建议采用AT_LEAST_ONCE语义

其他配置

除了特殊的semantic 配置项之外,其他的所有的配置都必须遵守 HikariCP 连接池的配置,具体可配置项和各个参数的作用请参考光 HikariCP官网文档.

JDBC 读取数据

StreamParkJdbcSource用来读取数据,并且根据数据的offset做到数据读时可回放,我们看看具体如何用JdbcSource读取数据,假如需求如下

  • t_order表中读取数据,以timestamp字段为参照,起始值为2020-12-16 12:00:00往后抽取数据
  • 将读取到的数据构造成Order对象返回

jdbc配置和读取代码如下

配置

  1. jdbc:
  2. driverClassName: com.mysql.jdbc.Driver
  3. jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
  4. username: root
  5. password: 123456

Scala

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.source.JdbcSource
  3. import org.apache.flink.api.scala._
  4. object MySQLSourceApp extends FlinkStreaming {
  5. override def handle(): Unit = {
  6. JdbcSource().getDataStream[Order](lastOne => {
  7. //防止抽取过于密集,间隔5秒抽取一次数据
  8. Thread.sleep(5000);
  9. val laseOffset = if (lastOne == null) "2020-12-16 12:00:00" else lastOne.timestamp
  10. s"select * from t_order where timestamp > '$laseOffset' order by timestamp asc "
  11. },
  12. _.map(x => new Order(x("market_id").toString, x("timestamp").toString))
  13. ).print()
  14. }
  15. }
  16. class Order(val marketId: String, val timestamp: String) extends Serializable

Java

  1. import org.apache.streampark.flink.core.java.function.SQLQueryFunction;
  2. import org.apache.streampark.flink.core.java.function.SQLResultFunction;
  3. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  4. import org.apache.streampark.flink.core.java.source.JdbcSource;
  5. import org.apache.streampark.flink.core.scala.StreamingContext;
  6. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  7. import org.apache.flink.api.common.typeinfo.TypeInformation;
  8. import java.io.Serializable;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. public class MySQLJavaApp {
  12. public static void main(String[] args) {
  13. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  14. StreamingContext context = new StreamingContext(envConfig);
  15. new JdbcSource<Order>(context)
  16. .getDataStream(
  17. (SQLQueryFunction<Order>) lastOne -> {
  18. //防止抽取过于密集,间隔5秒抽取一次数据
  19. Thread.sleep(5000);
  20. Serializable lastOffset = lastOne == null
  21. ? "2020-12-16 12:00:00"
  22. : lastOne.timestamp;
  23. return String.format(
  24. "select * from t_order " +
  25. "where timestamp > '%s' " +
  26. "order by timestamp asc ",
  27. lastOffset
  28. );
  29. },
  30. (SQLResultFunction<Order>) iterable -> {
  31. List<Order> result = new ArrayList<>();
  32. iterable.forEach(item -> {
  33. Order Order = new Order();
  34. Order.marketId = item.get("market_id").toString();
  35. Order.timestamp = Long.parseLong(item.get("timestamp").toString());
  36. result.add(Order);
  37. });
  38. return result;
  39. })
  40. .returns(TypeInformation.of(Order.class))
  41. .print();
  42. context.start();
  43. }
  44. }

java api为例,这里要传入两个参数

  • SQLQueryFunction<T> queryFunc
  • SQLResultFunction<T> resultFunc

queryFunc获取一条sql

queryFunc是要传入一个SQLQueryFunction类型的function,该function用于获取查询sql的,会将最后一条记录返回给开发者,然后需要开发者根据最后一条记录返回一条新的查询sql,queryFunc定义如下:

  1. /**
  2. * @author benjobs
  3. */
  4. @FunctionalInterface
  5. public interface SQLQueryFunction<T> extends Serializable {
  6. /**
  7. * 获取要查询的SQL
  8. *
  9. * @return
  10. * @throws Exception
  11. */
  12. String query(T last) throws Exception;
  13. }

所以上面的代码中,第一次上来lastOne(最后一条记录)为null,会判断一下,为null则取需求里默认的offset,查询的sql里根据timestamp字段正序排,这样在第一次查询之后,会返回最后的那条记录,下次直接可以使用这条记录作为下一次查询的根据

JdbcSource实现了CheckpointedFunction,即当程序开启 checkpoint 后,会将这些诸如laseOffset的状态数据保存到state backend,这样程序挂了,再次启动会自动从checkpoint中恢复offset,会接着上次的位置继续读取数据, 一般在生产环境,更灵活的方式是将lastOffset写入如redis等存储中,每次查询完之后再将最后的记录更新到redis,这样即便程序意外挂了,再次启动,也可以从redis中获取到最后的offset进行数据的抽取,也可以很方便的人为的任意调整这个offset进行数据的回放

resultFunc 处理查询到的数据

resultFunc的参数类型是SQLResultFunction<T>,是将一个查询到的结果集放到Iterable<Map<String, ?>>中返回给开发者,可以看到返回了一个迭代器Iterable,迭代器每次迭代返回一个Map,该Map里记录了一行完整的记录,Mapkey为查询字段,value为值,SQLResultFunction<T>定义如下

  1. /**
  2. * @author benjobs
  3. */
  4. @FunctionalInterface
  5. public interface SQLResultFunction<T> extends Serializable {
  6. /**
  7. * 将查下结果以Iterable<Map>的方式返回,开发者去实现转成对象.
  8. *
  9. * @param map
  10. * @return
  11. */
  12. Iterable<T> result(Iterable<Map<String, ?>> iterable);
  13. }

JDBC 读取写入

StreamParkJdbcSink是用来写入数据,我们看看具体如何用JdbcSink写入数据,假如需求是需要从kakfa中读取数据,写入到mysql

配置

  1. kafka.source:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. pattern: user
  4. group.id: user_02
  5. auto.offset.reset: earliest # (earliest | latest)
  6. ...
  7. jdbc:
  8. semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  9. driverClassName: com.mysql.jdbc.Driver
  10. jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
  11. username: root
  12. password: 123456

配置里jdbc下的 semantic 是写入的语义,在上面Jdbc信息配置有介绍,该配置只会在JdbcSink下生效,StreamPark中基于两阶段提交实现了 EXACTLY_ONCE 语义, 这本身需要被操作的数据库(mysql,oracle,MariaDB,MS SQL Server)等支持事务,理论上所有支持标准Jdbc事务的数据库都可以做到EXACTLY_ONCE(精确一次)的写入

Scala

  1. import org.apache.streampark.common.util.JsonUtils
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.streampark.flink.core.scala.sink.JdbcSink
  4. import org.apache.streampark.flink.core.scala.source.KafkaSource
  5. import org.apache.flink.api.common.typeinfo.TypeInformation
  6. import org.apache.flink.api.java.typeutils.TypeExtractor.getForClass
  7. import org.apache.flink.api.scala._
  8. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
  9. object JdbcSinkApp extends FlinkStreaming {
  10. override def handle(): Unit = {
  11. val source = KafkaSource()
  12. .getDataStream[String]()
  13. .map(x => JsonUtils.read[User](x.value))
  14. JdbcSink().sink[User](source)(user =>
  15. s"""
  16. |insert into t_user(`name`,`age`,`gender`,`address`)
  17. |value('${user.name}',${user.age},${user.gender},'${user.address}')
  18. |""".stripMargin
  19. )
  20. }
  21. }
  22. case class User(name:String,age:Int,gender:Int,address:String)

Java

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  3. import org.apache.streampark.flink.core.java.source.KafkaSource;
  4. import org.apache.streampark.flink.core.scala.StreamingContext;
  5. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  6. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8. import org.apache.flink.api.common.typeinfo.TypeInformation;
  9. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
  10. import org.apache.kafka.clients.consumer.ConsumerRecord;
  11. import java.io.Serializable;
  12. import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
  13. public class JdbcSinkJavaApp {
  14. public static void main(String[] args) {
  15. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  16. StreamingContext context = new StreamingContext(envConfig);
  17. ObjectMapper mapper = new ObjectMapper();
  18. DataStream<JavaUser> source = new KafkaSource<String>(context)
  19. .getDataStream()
  20. .map((MapFunction<KafkaRecord<String>, JavaUser>) value ->
  21. mapper.readValue(value.value(), JavaUser.class));
  22. new JdbcSink<JavaUser>(context)
  23. .sql((SQLFromFunction<JavaUser>) JavaUser::toSql)
  24. .sink(source);
  25. context.start();
  26. }
  27. }
  28. class JavaUser implements Serializable {
  29. String name;
  30. Integer age;
  31. Integer gender;
  32. String address;
  33. public String toSql() {
  34. return String.format(
  35. "insert into t_user(`name`,`age`,`gender`,`address`) value('%s',%d,%d,'%s')",
  36. name,
  37. age,
  38. gender,
  39. address);
  40. }
  41. }

根据数据流生成目标SQL

在写入的时候,需要知道具体写入的sql语句,该sql语句需要开发者通过function的方式提供,在scala api中,直接在sink方法后跟上function即可,java api 则是通过sql()方法传入一个SQLFromFunction类型的function

下面以java api为例说明,我们来看看javaapi 中提供sql的function方法的定义

  1. /**
  2. * @author benjobs
  3. */
  4. @FunctionalInterface
  5. public interface SQLFromFunction<T> extends Serializable {
  6. /**
  7. * @param bean
  8. * @return
  9. */
  10. String from(T bean);
  11. }

SQLFromFunction上的泛型<T>即为DataStream里实际的数据类型,该function里有一个方法form(T bean),这个bean即为当前DataStream中的一条具体数据,会将该数据返给开发者,开发者来决定基于这条数据,生成一条具体可以往数据库中插入的sql

设置写入批次大小

在 非 EXACTLY_ONCE(精确一次的语义下)可以适当的设置batch.size来提高Jdbc写入的性能(前提是业务允许的情况下),具体配置如下

  1. jdbc:
  2. semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  3. driverClassName: com.mysql.jdbc.Driver
  4. jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
  5. username: root
  6. password: 123456
  7. batch.size: 1000

这样一来就不是来一条数据就立即写入,而是积攒一个匹配然后执行批量插入

这个设置仅在非EXACTLY_ONCE语义下生效,带来的好处是可以提高Jdbc写入的性能,一次大批量的插入数据,缺点是数据写入势必会有延迟,请根据实际使用情况谨慎使用

多实例 JDBC 支持

手动指定 JDBC 连接信息