Apache Flink officially provides the JDBC connector for reading from or writing to JDBC, which can provides AT_LEAST_ONCE (at least once) processing semantics.

Apache StreamPark implements EXACTLY_ONCE (Exactly Once) semantics of JdbcSink based on two-stage commit, and uses HikariCP as connection pool to make data reading and write data more easily and accurately.

JDBC Configuration

The implementation of the Jdbc Connector in StreamPark uses the HikariCP connection pool, which is configured under the namespace of jdbc, and the agreed configuration is as follows:

  1. jdbc:
  2. semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  3. username: root
  4. password: 123456
  5. driverClassName: com.mysql.jdbc.Driver
  6. connectionTimeout: 30000
  7. idleTimeout: 30000
  8. maxLifetime: 30000
  9. maximumPoolSize: 6
  10. jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true

Semantic

The parameter semantic is the semantics when writing in JdbcSink, only effect for JdbcSink, JdbcSource will automatically mask this parameter, there are three options

EXACTLY_ONCE
AT_LEAST_ONCE
NONE

EXACTLY_ONCE

If JdbcSink is configured with EXACTLY_ONCE semantics, the underlying two-phase commit implementation is used to complete the write, at this time to Apache Flink with Checkpointing to take effect, how to open checkpoint please refer to Chapter 2 on checkpoint configuration section

AT_LEAST_ONCE && NONE

The default does not specify that the NONE semantics will be used, both configurations have the same effect, both are guaranteed at least once semantics

The benefit of turning on EXACTLY_ONCE exactly once is obvious, to ensure the accuracy of the data, but the cost is also high, the need for checkpoint support, the underlying simulation of the transaction is submitted to read, there is a certain loss of real-time, if your business requirements for data accuracy is not so high, it is recommended to use AT_LEAST_ONCE semantics

Others

Except for the special semantic configuration item, all other configurations must comply with the HikariCP connection pool configuration, please refer to the Light HikariCP official website documentation for the specific configurable items and the role of each parameter. HikariCP#gear-configuration-knobs-baby).

JDBC read

In StreamPark, JdbcSource is used to read data, and according to the data offset to read data can be replayed, we look at the specific how to use JdbcSource to read data, if the demand is as follows

Read data from the t_order table, using the timestamp field, starting at 2020-12-16 12:00:00 and extracting data from there. Construct the read data into an Order object and return it

The jdbc configuration and reading code is as follows

Setting

  1. jdbc:
  2. driverClassName: com.mysql.jdbc.Driver
  3. jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
  4. username: root
  5. password: 123456

Scala

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.source.JdbcSource
  3. import org.apache.flink.api.scala._
  4. object MySQLSourceApp extends FlinkStreaming {
  5. override def handle(): Unit = {
  6. JdbcSource().getDataStream[Order](lastOne => {
  7. Thread.sleep(5000);
  8. val laseOffset = if (lastOne == null) "2020-12-16 12:00:00" else lastOne.timestamp
  9. s"select * from t_order where timestamp > '$laseOffset' order by timestamp asc "
  10. },
  11. _.map(x => new Order(x("market_id").toString, x("timestamp").toString))
  12. ).print()
  13. }
  14. }
  15. class Order(val marketId: String, val timestamp: String) extends Serializable

Java

  1. import org.apache.streampark.flink.core.java.function.SQLQueryFunction;
  2. import org.apache.streampark.flink.core.java.function.SQLResultFunction;
  3. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  4. import org.apache.streampark.flink.core.java.source.JdbcSource;
  5. import org.apache.streampark.flink.core.scala.StreamingContext;
  6. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  7. import org.apache.flink.api.common.typeinfo.TypeInformation;
  8. import java.io.Serializable;
  9. import java.util.ArrayList;
  10. import java.util.List;
  11. public class MySQLJavaApp {
  12. public static void main(String[] args) {
  13. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  14. StreamingContext context = new StreamingContext(envConfig);
  15. new JdbcSource<Order>(context)
  16. .getDataStream(
  17. (SQLQueryFunction<Order>) lastOne -> {
  18. Thread.sleep(5000);
  19. Serializable lastOffset = lastOne == null
  20. ? "2020-12-16 12:00:00"
  21. : lastOne.timestamp;
  22. return String.format(
  23. "select * from t_order " +
  24. "where timestamp > '%s' " +
  25. "order by timestamp asc ",
  26. lastOffset
  27. );
  28. },
  29. (SQLResultFunction<Order>) iterable -> {
  30. List<Order> result = new ArrayList<>();
  31. iterable.forEach(item -> {
  32. Order Order = new Order();
  33. Order.marketId = item.get("market_id").toString();
  34. Order.timestamp = Long.parseLong(item.get("timestamp").toString());
  35. result.add(Order);
  36. });
  37. return result;
  38. })
  39. .returns(TypeInformation.of(Order.class))
  40. .print();
  41. context.start();
  42. }
  43. }

Take the java api as an example, here you have to accept two parameters

SQLQueryFunction<T> queryFunc SQLResultFunction<T> resultFunc

queryFunc to get sql

queryFunc needs to pass in a function of type SQLQueryFunction, the function is used to get the query sql, will return the last record to the developer, and then the developer needs to return a new query sql according to the last record, queryFunc is defined as follows :

  1. @FunctionalInterface
  2. public interface SQLQueryFunction<T> extends Serializable {
  3. /**
  4. * @return query sql
  5. */
  6. String query(T last) throws Exception;
  7. }

So the above code, the first time lastOne (the last record) equals null, and will be judged, if null will take the default offset, query sql according to the timestamp field in positive order, so that after the first query, will return the last record, the next time you can directly use this record as the basis for the next query

JdbcSource implements the CheckpointedFunction, that is, when the program opens checkpoint, it will save these state data such as laseOffset to the state backend, so that when the program hangs, it will automatically restore offset from checkpoint, and continue to read data from the last position, In the production environment, a more flexible way is writing lastOffset to storage, such as redis, after each query and then update the offset to redis, so that even if the program hangs unexpectedly, you can also get the last offset from redis for data extract, but also very convenient to adjust offset for data replay

resultFunc process the query data

The parameter type of resultFunc is SQLResultFunction<T>, which puts a query result set into Iterable<Map<String, ? >>, and then returns it to the developer, at the same time, you can see that each iteration of the iterator returns a Map, the Map records a complete line of records, the Map key is the query field, value is the value, SQLResultFunction<T> is defined as follows

  1. @FunctionalInterface
  2. public interface SQLResultFunction<T> extends Serializable {
  3. Iterable<T> result(Iterable<Map<String, ?>> iterable);
  4. }

JDBC Read Write

In StreamPark, JdbcSink is used to write data, let’s see how to write data with JdbcSink, the example is to read data from kakfa and write to mysql.

Setting

  1. kafka.source:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. pattern: user
  4. group.id: user_02
  5. auto.offset.reset: earliest # (earliest | latest)
  6. ...
  7. jdbc:
  8. semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  9. driverClassName: com.mysql.jdbc.Driver
  10. jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
  11. username: root
  12. password: 123456

The configuration under jdbc semantic is the semantics of writing, as described in Jdbc Info Configuration, the configuration will only take effect on JdbcSink, StreamPark is based on two-phase commit to achieve EXACTLY_ONCE semantics, This requires that the database being manipulated supports transactions(mysql, oracle, MariaDB, MS SQL Server), theoretically all databases that support standard Jdbc transactions can do EXACTLY_ONCE (exactly once) write

Scala

  1. import org.apache.streampark.common.util.JsonUtils
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.streampark.flink.core.scala.sink.JdbcSink
  4. import org.apache.streampark.flink.core.scala.source.KafkaSource
  5. import org.apache.flink.api.common.typeinfo.TypeInformation
  6. import org.apache.flink.api.java.typeutils.TypeExtractor.getForClass
  7. import org.apache.flink.api.scala._
  8. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
  9. object JdbcSinkApp extends FlinkStreaming {
  10. override def handle(): Unit = {
  11. val source = KafkaSource()
  12. .getDataStream[String]()
  13. .map(x => JsonUtils.read[User](x.value))
  14. JdbcSink().sink[User](source)(user =>
  15. s"""
  16. |insert into t_user(`name`,`age`,`gender`,`address`)
  17. |value('${user.name}',${user.age},${user.gender},'${user.address}')
  18. |""".stripMargin
  19. )
  20. }
  21. }
  22. case class User(name:String,age:Int,gender:Int,address:String)

Java

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  3. import org.apache.streampark.flink.core.java.source.KafkaSource;
  4. import org.apache.streampark.flink.core.scala.StreamingContext;
  5. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  6. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8. import org.apache.flink.api.common.typeinfo.TypeInformation;
  9. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
  10. import org.apache.kafka.clients.consumer.ConsumerRecord;
  11. import java.io.Serializable;
  12. import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
  13. public class JdbcSinkJavaApp {
  14. public static void main(String[] args) {
  15. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  16. StreamingContext context = new StreamingContext(envConfig);
  17. ObjectMapper mapper = new ObjectMapper();
  18. DataStream<JavaUser> source = new KafkaSource<String>(context)
  19. .getDataStream()
  20. .map((MapFunction<KafkaRecord<String>, JavaUser>) value ->
  21. mapper.readValue(value.value(), JavaUser.class));
  22. new JdbcSink<JavaUser>(context)
  23. .sql((SQLFromFunction<JavaUser>) JavaUser::toSql)
  24. .sink(source);
  25. context.start();
  26. }
  27. }
  28. class JavaUser implements Serializable {
  29. String name;
  30. Integer age;
  31. Integer gender;
  32. String address;
  33. public String toSql() {
  34. return String.format(
  35. "insert into t_user(`name`,`age`,`gender`,`address`) value('%s',%d,%d,'%s')",
  36. name,
  37. age,
  38. gender,
  39. address);
  40. }
  41. }

Generate target SQL based on data flow

When writing, you need to know the specific sql statement to write, the sql statement needs to be provided by the developer by a way of the function, in the scala api, directly after the sink method followed by the function, while the java api is passed a function of type SQLFromFunction through the sql() method

The following is an example of the java api, let’s look at the definition of the function method that provides sql in the java api

  1. @FunctionalInterface
  2. public interface SQLFromFunction<T> extends Serializable {
  3. /**
  4. * @param bean
  5. * @return
  6. */
  7. String from(T bean);
  8. }

The generic <T> on the SQLFromFunction is the actual data type in the DataStream, the function has a method form(T bean), the bean is a specific data in the current DataStream, the data will be returned to the developer, the developer will based on this data, generate a specific sql that can be inserted into the database

Set to write batch size

In non-EXACTLY_ONCE (under the semantics of exactly once) you can set batch.size to improve the performance of JDBC writes (provided that the business allows it), configured as follows

  1. jdbc:
  2. semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  3. driverClassName: com.mysql.jdbc.Driver
  4. jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
  5. username: root
  6. password: 123456
  7. batch.size: 1000

In this way, instead of writing data immediately when it comes, and then performs a bulk insert

This setting only takes effect the non-EXACTLY_ONCE semantics, the benefit is to improve the performance of Jdbc writes, a large number of data insertion, the disadvantage is that data writing will inevitably have a delay, please use caution according to the actual use of the situation

Multi-instance JDBC support

Specify JDBC connection information manually