Apache Flink 官方提供了JDBC的连接器,用于从 JDBC 中读取或者向其中写入数据,可提供至少一次(AT_LEAST_ONCE)的处理语义。
Apache StreamPark 中基于两阶段提交实现了精确一次(EXACTLY_ONCE)语义的 JdbcSink
类,并且采用 HikariCP
为连接池,让数据的读取和写入更简单更准确。
JDBC 信息配置
在 Apache StreamPark 中,JDBC Connector 的实现用到了 HikariCP
连接池,相关的配置在jdbc
的 namespace 下,约定的配置如下:
jdbc:
semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
username: root
password: 123456
driverClassName: com.mysql.jdbc.Driver
connectionTimeout: 30000
idleTimeout: 30000
maxLifetime: 30000
maximumPoolSize: 6
jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
semantic 语义配置
semantic
这个参数是在 JdbcSink
写时候的语义,仅对 JdbcSink 有效,JdbcSource
会自动屏蔽该参数,有三个可选项
- EXACTLY_ONCE
- AT_LEAST_ONCE
- NONE
EXACTLY_ONCE
如果JdbcSink
配置了 EXACTLY_ONCE
语义,则底层采用了两阶段提交的实现方式来完成写入,此时要flink配合开启Checkpointing
才会生效,如何开启checkpoint请参考第二章关于checkpoint配置部分
AT_LEAST_ONCE && NONE
默认不指定会采用NONE
语义,这两种配置效果一样,都是保证 至少一次 语义
开启
EXACTLY_ONCE
精确一次的好处是显而易见的,保证了数据的准确性,但成本也是高昂的,需要checkpoint
的支持,底层模拟了事务的提交读,对实时性有一定的损耗,如果你的业务对数据的准确性要求不是那么高,则建议采用AT_LEAST_ONCE
语义
其他配置
除了特殊的semantic
配置项之外,其他的所有的配置都必须遵守 HikariCP 连接池的配置,具体可配置项和各个参数的作用请参考光 HikariCP
官网文档.
JDBC 读取数据
在StreamPark
中JdbcSource
用来读取数据,并且根据数据的offset
做到数据读时可回放,我们看看具体如何用JdbcSource
读取数据,假如需求如下
- 从
t_order
表中读取数据,以timestamp
字段为参照,起始值为2020-12-16 12:00:00
往后抽取数据 - 将读取到的数据构造成
Order
对象返回
jdbc配置和读取代码如下
配置
jdbc:
driverClassName: com.mysql.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 123456
Scala
import org.apache.streampark.flink.core.scala.FlinkStreaming
import org.apache.streampark.flink.core.scala.source.JdbcSource
import org.apache.flink.api.scala._
object MySQLSourceApp extends FlinkStreaming {
override def handle(): Unit = {
JdbcSource().getDataStream[Order](lastOne => {
//防止抽取过于密集,间隔5秒抽取一次数据
Thread.sleep(5000);
val laseOffset = if (lastOne == null) "2020-12-16 12:00:00" else lastOne.timestamp
s"select * from t_order where timestamp > '$laseOffset' order by timestamp asc "
},
_.map(x => new Order(x("market_id").toString, x("timestamp").toString))
).print()
}
}
class Order(val marketId: String, val timestamp: String) extends Serializable
Java
import org.apache.streampark.flink.core.java.function.SQLQueryFunction;
import org.apache.streampark.flink.core.java.function.SQLResultFunction;
import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
import org.apache.streampark.flink.core.java.source.JdbcSource;
import org.apache.streampark.flink.core.scala.StreamingContext;
import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class MySQLJavaApp {
public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
new JdbcSource<Order>(context)
.getDataStream(
(SQLQueryFunction<Order>) lastOne -> {
//防止抽取过于密集,间隔5秒抽取一次数据
Thread.sleep(5000);
Serializable lastOffset = lastOne == null
? "2020-12-16 12:00:00"
: lastOne.timestamp;
return String.format(
"select * from t_order " +
"where timestamp > '%s' " +
"order by timestamp asc ",
lastOffset
);
},
(SQLResultFunction<Order>) iterable -> {
List<Order> result = new ArrayList<>();
iterable.forEach(item -> {
Order Order = new Order();
Order.marketId = item.get("market_id").toString();
Order.timestamp = Long.parseLong(item.get("timestamp").toString());
result.add(Order);
});
return result;
})
.returns(TypeInformation.of(Order.class))
.print();
context.start();
}
}
以java
api为例,这里要传入两个参数
SQLQueryFunction<T> queryFunc
SQLResultFunction<T> resultFunc
queryFunc获取一条sql
queryFunc
是要传入一个SQLQueryFunction
类型的function
,该function
用于获取查询sql的,会将最后一条记录返回给开发者,然后需要开发者根据最后一条记录返回一条新的查询sql
,queryFunc
定义如下:
/**
* @author benjobs
*/
@FunctionalInterface
public interface SQLQueryFunction<T> extends Serializable {
/**
* 获取要查询的SQL
*
* @return
* @throws Exception
*/
String query(T last) throws Exception;
}
所以上面的代码中,第一次上来lastOne
(最后一条记录)为null,会判断一下,为null则取需求里默认的offset
,查询的sql里根据timestamp
字段正序排,这样在第一次查询之后,会返回最后的那条记录,下次直接可以使用这条记录作为下一次查询的根据
JdbcSource
实现了CheckpointedFunction
,即当程序开启 checkpoint 后,会将这些诸如laseOffset
的状态数据保存到state backend
,这样程序挂了,再次启动会自动从checkpoint
中恢复offset
,会接着上次的位置继续读取数据, 一般在生产环境,更灵活的方式是将lastOffset
写入如redis
等存储中,每次查询完之后再将最后的记录更新到redis
,这样即便程序意外挂了,再次启动,也可以从redis
中获取到最后的offset
进行数据的抽取,也可以很方便的人为的任意调整这个offset
进行数据的回放
resultFunc 处理查询到的数据
resultFunc
的参数类型是SQLResultFunction<T>
,是将一个查询到的结果集放到Iterable<Map<String, ?>>
中返回给开发者,可以看到返回了一个迭代器Iterable
,迭代器每次迭代返回一个Map
,该Map
里记录了一行完整的记录,Map
的key
为查询字段,value
为值,SQLResultFunction<T>
定义如下
/**
* @author benjobs
*/
@FunctionalInterface
public interface SQLResultFunction<T> extends Serializable {
/**
* 将查下结果以Iterable<Map>的方式返回,开发者去实现转成对象.
*
* @param map
* @return
*/
Iterable<T> result(Iterable<Map<String, ?>> iterable);
}
JDBC 读取写入
StreamPark
中JdbcSink
是用来写入数据,我们看看具体如何用JdbcSink
写入数据,假如需求是需要从kakfa
中读取数据,写入到mysql
配置
kafka.source:
bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
pattern: user
group.id: user_02
auto.offset.reset: earliest # (earliest | latest)
...
jdbc:
semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
driverClassName: com.mysql.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 123456
配置里
jdbc
下的 semantic 是写入的语义,在上面Jdbc信息配置有介绍,该配置只会在JdbcSink
下生效,StreamPark
中基于两阶段提交实现了 EXACTLY_ONCE 语义, 这本身需要被操作的数据库(mysql
,oracle
,MariaDB
,MS SQL Server
)等支持事务,理论上所有支持标准Jdbc事务的数据库都可以做到EXACTLY_ONCE(精确一次)的写入Scala
import org.apache.streampark.common.util.JsonUtils
import org.apache.streampark.flink.core.scala.FlinkStreaming
import org.apache.streampark.flink.core.scala.sink.JdbcSink
import org.apache.streampark.flink.core.scala.source.KafkaSource
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor.getForClass
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
object JdbcSinkApp extends FlinkStreaming {
override def handle(): Unit = {
val source = KafkaSource()
.getDataStream[String]()
.map(x => JsonUtils.read[User](x.value))
JdbcSink().sink[User](source)(user =>
s"""
|insert into t_user(`name`,`age`,`gender`,`address`)
|value('${user.name}',${user.age},${user.gender},'${user.address}')
|""".stripMargin
)
}
}
case class User(name:String,age:Int,gender:Int,address:String)
Java
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
import org.apache.streampark.flink.core.java.source.KafkaSource;
import org.apache.streampark.flink.core.scala.StreamingContext;
import org.apache.streampark.flink.core.scala.source.KafkaRecord;
import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.Serializable;
import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
public class JdbcSinkJavaApp {
public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
ObjectMapper mapper = new ObjectMapper();
DataStream<JavaUser> source = new KafkaSource<String>(context)
.getDataStream()
.map((MapFunction<KafkaRecord<String>, JavaUser>) value ->
mapper.readValue(value.value(), JavaUser.class));
new JdbcSink<JavaUser>(context)
.sql((SQLFromFunction<JavaUser>) JavaUser::toSql)
.sink(source);
context.start();
}
}
class JavaUser implements Serializable {
String name;
Integer age;
Integer gender;
String address;
public String toSql() {
return String.format(
"insert into t_user(`name`,`age`,`gender`,`address`) value('%s',%d,%d,'%s')",
name,
age,
gender,
address);
}
}
根据数据流生成目标SQL
在写入的时候,需要知道具体写入的sql
语句,该sql
语句需要开发者通过function
的方式提供,在scala
api中,直接在sink
方法后跟上function
即可,java
api 则是通过sql()
方法传入一个SQLFromFunction
类型的function
下面以java
api为例说明,我们来看看java
api 中提供sql的function
方法的定义
/**
* @author benjobs
*/
@FunctionalInterface
public interface SQLFromFunction<T> extends Serializable {
/**
* @param bean
* @return
*/
String from(T bean);
}
SQLFromFunction
上的泛型<T>
即为DataStream
里实际的数据类型,该function
里有一个方法form(T bean)
,这个bean
即为当前DataStream
中的一条具体数据,会将该数据返给开发者,开发者来决定基于这条数据,生成一条具体可以往数据库中插入的sql
设置写入批次大小
在 非 EXACTLY_ONCE
(精确一次的语义下)可以适当的设置batch.size
来提高Jdbc写入的性能(前提是业务允许的情况下),具体配置如下
jdbc:
semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
driverClassName: com.mysql.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 123456
batch.size: 1000
这样一来就不是来一条数据就立即写入,而是积攒一个匹配然后执行批量插入
这个设置仅在非
EXACTLY_ONCE
语义下生效,带来的好处是可以提高Jdbc写入的性能,一次大批量的插入数据,缺点是数据写入势必会有延迟,请根据实际使用情况谨慎使用