Apache Flink 官方提供了 Apache Kafka 的连接器,用于从 Kafka 主题中读取或者向其中写入数据,可提供精确一次的处理语义。

Apache StreamPark 中 KafkaSourceKafkaSink 基于官网的 Kafka Connector 进一步封装,屏蔽了很多细节,简化开发步骤,让数据的读取和写入更简单。

依赖

Apache Flink 集成了通用的 Kafka 连接器,它会尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。当前 Kafka client 向后兼容 0.10.0 或更高版本的 Kafka broker。有关 Kafka 兼容性的更多细节,请参考 Apache Kafka 的官方文档。

  1. <!--必须要导入的依赖-->
  2. <dependency>
  3. <groupId>org.apache.streampark</groupId>
  4. <artifactId>streampark-flink-core</artifactId>
  5. <version>${project.version}</version>
  6. </dependency>
  7. <!--flink-connector-->
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-connector-kafka_2.11</artifactId>
  11. <version>1.12.0</version>
  12. </dependency>

同时在开发阶段,以下的依赖也是必要的:

  1. <!--以下scope为provided的依赖也是必须要导入的-->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-scala_${scala.binary.version}</artifactId>
  5. <version>${flink.version}</version>
  6. <scope>provided</scope>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.apache.flink</groupId>
  10. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  11. <version>${flink.version}</version>
  12. <scope>provided</scope>
  13. </dependency>
  14. <dependency>
  15. <groupId>org.apache.flink</groupId>
  16. <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  17. <version>${flink.version}</version>
  18. <scope>provided</scope>
  19. </dependency>

Kafka Source (Consumer)

先介绍基于官网的标准的kafka consumer的方式,以下代码摘自 Apache Kafka 官网文档

  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "localhost:9092")
  3. properties.setProperty("group.id", "test")
  4. val stream = env.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))

可以看到一上来定义了一堆kafka的连接信息,这种方式各项参数都是硬编码的方式写死的,非常的不灵敏,下面我们来看看如何用StreamPark接入 kafka的数据,只需要按照规定的格式定义好配置文件然后编写代码即可,配置和代码如下

基础消费示例

  1. kafka.source:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. topic: test_user
  4. group.id: user_01
  5. auto.offset.reset: earliest
  6. enable.auto.commit: true

kafka.source这个前缀是固定的,kafka properties相关的参数必须遵守 Apache Kafka 官网文档 对参数key的设置规范

Scala

  1. package org.apache.streampark.flink.quickstart
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.streampark.flink.core.scala.sink.JdbcSink
  4. import org.apache.streampark.flink.core.scala.source.KafkaSource
  5. import org.apache.flink.api.scala._
  6. object kafkaSourceApp extends FlinkStreaming {
  7. override def handle(): Unit = {
  8. val source = KafkaSource().getDataStream[String]()
  9. print(source)
  10. }
  11. }

Java

  1. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  2. import org.apache.streampark.flink.core.java.source.KafkaSource;
  3. import org.apache.streampark.flink.core.scala.StreamingContext;
  4. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  5. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. public class KafkaSimpleJavaApp {
  9. public static void main(String[] args) {
  10. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  11. StreamingContext context = new StreamingContext(envConfig);
  12. DataStream<String> source = new KafkaSource<String>(context)
  13. .getDataStream()
  14. .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value);
  15. source.print();
  16. context.start();
  17. }
  18. }

高级配置参数

KafkaSource是基于Flink Kafka Connector封装一个更简单的kafka读取类,构造方法里需要传入StreamingContext,当程序启动时传入配置文件即可,框架会自动解析配置文件,在new KafkaSource的时候会自动的从配置文件中获取相关信息,初始化并返回一个Kafka Consumer,在这里topic下只配置了一个topic,因此在消费的时候不用指定topic直接默认获取这个topic来消费, 这只是一个最简单的例子,更多更复杂的规则和读取操作则要通过.getDataStream()在该方法里传入参数才能实现 我们看看getDataStream这个方法的签名

  1. def getDataStream[T: TypeInformation](topic: java.io.Serializable = null,
  2. alias: String = "",
  3. deserializer: KafkaDeserializationSchema[T],
  4. strategy: WatermarkStrategy[KafkaRecord[T]] = null
  5. ): DataStream[KafkaRecord[T]]

参数具体作用如下

参数名 参数类型 作用 默认值
topic Serializable 一个topic或者一组topic
alias String 用于区别不同的kafka实例
deserializer DeserializationSchema topic里数据的具体解析类 KafkaStringDeserializationSchema
strategy WatermarkStrategy watermark生成策略

下面我们来看看更多的使用和配置方式

  • 消费多个Kafka实例
  • 消费多个Topic
  • Topic动态发现
  • 从指定Offset消费
  • 指定KafkaDeserializationSchema
  • 指定WatermarkStrategy

消费多个Kafka实例

在框架开发之初就考虑到了多个不同实例的kafka的配置情况.如何来统一配置,并且规范格式呢?在streampark中是这么解决的,假如我们要同时消费两个不同实例的kafka,配置文件定义如下, 可以看到在 kafka.source 下直接放kafka的实例名称(名字可以任意),在这里我们统一称为 alias , alias 必须是唯一的,来区别不同的实例,然后别的参数还是按照之前的规范, 统统放到当前这个实例的 namespace 下即可.如果只有一个kafka实例,则可以不用配置 alias 在写代码消费时注意指定对应的 alias 即可,配置和代码如下

配置

  1. kafka.source:
  2. kafka1:
  3. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  4. topic: test_user
  5. group.id: user_01
  6. auto.offset.reset: earliest
  7. enable.auto.commit: true
  8. kafka2:
  9. bootstrap.servers: kfk4:9092,kfk5:9092,kfk6:9092
  10. topic: kafka2
  11. group.id: kafka2
  12. auto.offset.reset: earliest
  13. enable.auto.commit: true

Scala

  1. //消费kafka1实例的数据
  2. KafkaSource().getDataStream[String](alias = "kafka1")
  3. .uid("kfkSource1")
  4. .name("kfkSource1")
  5. .print()
  6. //消费kafka2实例的数据
  7. KafkaSource().getDataStream[String](alias = "kafka2")
  8. .uid("kfkSource2")
  9. .name("kfkSource2")
  10. .print()

Java

  1. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  2. StreamingContext context = new StreamingContext(envConfig);
  3. //消费kafka1实例的数据
  4. DataStream<String> source1 = new KafkaSource<String>(context)
  5. .alias("kafka1")
  6. .getDataStream()
  7. print();
  8. //消费kafka1实例的数据
  9. DataStream<String> source2 = new KafkaSource<String>(context)
  10. .alias("kafka2")
  11. .getDataStream()
  12. .print();
  13. context.start();

java api在编写代码时,一定要将alias等这些参数的设置放到调用.getDataStream()之前

消费多个Topic

配置消费多个topic也很简单,在配置文件topic下配置多个topic名称即可,用,或空格分隔,代码消费处理的时候指定topic参数即可,scala api下如果是消费一个topic,则直接传入topic名称即可,如果要消费多个,传入一个List即可 javaapi通过 topic()方法传入要消费topic的名称,是一个String类型的可变参数,可以传入一个或多个topic名称,配置和代码如下

配置

  1. kafka.source:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. topic: topic1,topic2,topic3...
  4. group.id: user_01
  5. auto.offset.reset: earliest # (earliest | latest)
  6. ...

Scala

  1. //消费指定单个topic的数据
  2. KafkaSource().getDataStream[String](topic = "topic1")
  3. .uid("kfkSource1")
  4. .name("kfkSource1")
  5. .print()
  6. //消费一批topic数据
  7. KafkaSource().getDataStream[String](topic = List("topic1","topic2","topic3"))
  8. .uid("kfkSource1")
  9. .name("kfkSource1")
  10. .print()

Java

  1. //消费指定单个topic的数据
  2. DataStream<String> source1 = new KafkaSource<String>(context)
  3. .topic("topic1")
  4. .getDataStream()
  5. .print();
  6. //消费一组topic的数据
  7. DataStream<String> source1 = new KafkaSource<String>(context)
  8. .topic("topic1","topic2")
  9. .getDataStream()
  10. .print();

topic支持配置多个topic实例,每个topic直接用,分隔或者空格分隔,如果topic下配置多个实例,在消费的时必须指定具体的topic名称

Topic 发现

关于kafka的分区动态,默认情况下,是禁用了分区发现的。若要启用它,请在提供的属性配置中为 flink.partition-discovery.interval-millis 设置大于 0,表示发现分区的间隔是以毫秒为单位的 更多详情请参考官网文档

Flink Kafka Consumer 还能够使用正则表达式基于 Topic 名称的模式匹配来发现 Topic,详情请参考官网文档StreamPark中提供更简单的方式,具体需要在 pattern下配置要匹配的topic实例名称的正则即可

配置

  1. kafka.source:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. pattern: ^topic[1-9]
  4. group.id: user_02
  5. auto.offset.reset: earliest # (earliest | latest)
  6. ...

Scala

  1. //消费正则topic数据
  2. KafkaSource().getDataStream[String](topic = "topic-a")
  3. .uid("kfkSource1")
  4. .name("kfkSource1")
  5. .print()

Java

  1. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  2. StreamingContext context = new StreamingContext(envConfig);
  3. //消费通配符topic数据
  4. new KafkaSource<String>(context)
  5. .topic("topic-a")
  6. .getDataStream()
  7. .print();
  8. context.start();

topicpattern不能同时配置,当配置了pattern正则匹配时,在消费的时候依然可以指定一个确定的topic名称,此时会检查pattern是否匹配当前的topic,如不匹配则会报错

配置开始消费的位置

Flink Kafka Consumer 允许通过配置来确定 Kafka 分区的起始位置,官网文档Kafka 分区的起始位置具体操作方式如下

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val myConsumer = new FlinkKafkaConsumer[String](...)
  3. myConsumer.setStartFromEarliest() // 尽可能从最早的记录开始
  4. myConsumer.setStartFromLatest() // 从最新的记录开始
  5. myConsumer.setStartFromTimestamp(...) // 从指定的时间开始(毫秒)
  6. myConsumer.setStartFromGroupOffsets() // 默认的方法
  7. val stream = env.addSource(myConsumer)
  8. ...

Java

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
  3. myConsumer.setStartFromEarliest(); // 尽可能从最早的记录开始
  4. myConsumer.setStartFromLatest(); // 从最新的记录开始
  5. myConsumer.setStartFromTimestamp(...); // 从指定的时间开始(毫秒)
  6. myConsumer.setStartFromGroupOffsets(); // 默认的方法
  7. DataStream<String> stream = env.addSource(myConsumer);
  8. ...

StreamPark中不推荐这种方式进行设定,提供了更方便的方式,只需要在配置里指定 auto.offset.reset 即可

  • earliest 从最早的记录开始
  • latest 从最新的记录开始

指定分区Offset

你也可以为每个分区指定 consumer 应该开始消费的具体 offset,只需要按照如下的配置文件配置start.from相关的信息即可

  1. kafka.source:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. topic: topic1,topic2,topic3...
  4. group.id: user_01
  5. auto.offset.reset: earliest # (earliest | latest)
  6. start.from:
  7. timestamp: 1591286400000 #指定timestamp,针对所有的topic生效
  8. offset: # 给topic的partition指定offset
  9. topic: topic_abc,topic_123
  10. topic_abc: 0:182,1:183,2:182 #分区0从182开始消费,分区1从183开始,分区2从182开始...
  11. topic_123: 0:182,1:183,2:182
  12. ...

指定deserializer

默认不指定deserializer则在内部采用String的方式反序列化topic中的数据,可以手动指定deserializer,这样可以一步直接返回目标DataStream,具体完整代码如下

Scala

  1. import org.apache.streampark.common.util.JsonUtils
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.streampark.flink.core.scala.sink.JdbcSink
  4. import org.apache.streampark.flink.core.scala.source.KafkaSource
  5. import org.apache.flink.api.common.typeinfo.TypeInformation
  6. import org.apache.flink.api.java.typeutils.TypeExtractor.getForClass
  7. import org.apache.flink.api.scala._
  8. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
  9. object KafkaSourceApp extends FlinkStreaming {
  10. override def handle(): Unit = {
  11. KafkaSource()
  12. .getDataStream[String](deserializer = new UserSchema)
  13. .map(_.value)
  14. .print()
  15. }
  16. }
  17. class UserSchema extends KafkaDeserializationSchema[User] {
  18. override def isEndOfStream(nextElement: User): Boolean = false
  19. override def getProducedType: TypeInformation[User] = getForClass(classOf[User])
  20. override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): User = {
  21. val value = new String(record.value())
  22. JsonUtils.read[User](value)
  23. }
  24. }
  25. case class User(name:String,age:Int,gender:Int,address:String)

Java

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  3. import org.apache.streampark.flink.core.java.source.KafkaSource;
  4. import org.apache.streampark.flink.core.scala.StreamingContext;
  5. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  6. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8. import org.apache.flink.api.common.typeinfo.TypeInformation;
  9. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
  10. import org.apache.kafka.clients.consumer.ConsumerRecord;
  11. import java.io.Serializable;
  12. import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
  13. public class KafkaSourceJavaApp {
  14. public static void main(String[] args) {
  15. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  16. StreamingContext context = new StreamingContext(envConfig);
  17. new KafkaSource<JavaUser>(context)
  18. .deserializer(new JavaUserSchema())
  19. .getDataStream()
  20. .map((MapFunction<KafkaRecord<JavaUser>, JavaUser>) KafkaRecord::value)
  21. .print();
  22. context.start();
  23. }
  24. }
  25. class JavaUserSchema implements KafkaDeserializationSchema<JavaUser> {
  26. private ObjectMapper mapper = new ObjectMapper();
  27. @Override public boolean isEndOfStream(JavaUser nextElement) return false;
  28. @Override public TypeInformation<JavaUser> getProducedType() return getForClass(JavaUser.class);
  29. @Override public JavaUser deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
  30. String value = new String(record.value());
  31. return mapper.readValue(value, JavaUser.class);
  32. }
  33. }
  34. class JavaUser implements Serializable {
  35. String name;
  36. Integer age;
  37. Integer gender;
  38. String address;
  39. }

返回记录KafkaRecord

返回的对象被包装在KafkaRecord中,kafkaRecord中有当前的offset,partition,timestamp等诸多有用的信息供开发者使用,其中value即返回的目标对象,如下图:

Apache Kafka Connector - 图1

指定strategy

在许多场景中,记录的时间戳是(显式或隐式)嵌入到记录本身中。此外,用户可能希望定期或以不规则的方式Watermark,例如基于Kafka流中包含当前事件时间的watermark的特殊记录。对于这些情况,Flink Kafka Consumer是允许指定AssignerWithPeriodicWatermarksAssignerWithPunctuatedWatermarks

StreamPark中运行传入一个WatermarkStrategy作为参数来分配Watermark,如下面的示例,解析topic中的数据为user对象,user中有个 orderTime 是时间类型,我们以这个为基准,为其分配一个Watermark

Scala

  1. import org.apache.streampark.common.util.JsonUtils
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.streampark.flink.core.scala.source.{KafkaRecord, KafkaSource}
  4. import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
  5. import org.apache.flink.api.common.typeinfo.TypeInformation
  6. import org.apache.flink.api.java.typeutils.TypeExtractor.getForClass
  7. import org.apache.flink.api.scala._
  8. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
  9. import org.apache.kafka.clients.consumer.ConsumerRecord
  10. import java.time.Duration
  11. import java.util.Date
  12. object KafkaSourceStrategyApp extends FlinkStreaming {
  13. override def handle(): Unit = {
  14. KafkaSource()
  15. .getDataStream[User](
  16. deserializer = new UserSchema,
  17. strategy = WatermarkStrategy
  18. .forBoundedOutOfOrderness[KafkaRecord[User]](Duration.ofMinutes(1))
  19. .withTimestampAssigner(new SerializableTimestampAssigner[KafkaRecord[User]] {
  20. override def extractTimestamp(element: KafkaRecord[User], recordTimestamp: Long): Long = {
  21. element.value.orderTime.getTime
  22. }
  23. })
  24. ).map(_.value)
  25. .print()
  26. }
  27. }
  28. class UserSchema extends KafkaDeserializationSchema[User] {
  29. override def isEndOfStream(nextElement: User): Boolean = false
  30. override def getProducedType: TypeInformation[User] = getForClass(classOf[User])
  31. override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): User = {
  32. val value = new String(record.value())
  33. JsonUtils.read[User](value)
  34. }
  35. }
  36. case class User(name: String, age: Int, gender: Int, address: String, orderTime: Date)

Java

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  3. import org.apache.streampark.flink.core.java.source.KafkaSource;
  4. import org.apache.streampark.flink.core.scala.StreamingContext;
  5. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  6. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  7. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  8. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  9. import org.apache.flink.api.common.functions.MapFunction;
  10. import org.apache.flink.api.common.typeinfo.TypeInformation;
  11. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
  12. import org.apache.kafka.clients.consumer.ConsumerRecord;
  13. import java.io.Serializable;
  14. import java.time.Duration;
  15. import java.util.Date;
  16. import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
  17. /**
  18. * @author benjobs
  19. */
  20. public class KafkaSourceStrategyJavaApp {
  21. public static void main(String[] args) {
  22. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  23. StreamingContext context = new StreamingContext(envConfig);
  24. new KafkaSource<JavaUser>(context)
  25. .deserializer(new JavaUserSchema())
  26. .strategy(
  27. WatermarkStrategy.<KafkaRecord<JavaUser>>forBoundedOutOfOrderness(Duration.ofMinutes(1))
  28. .withTimestampAssigner(
  29. (SerializableTimestampAssigner<KafkaRecord<JavaUser>>)
  30. (element, recordTimestamp) -> element.value().orderTime.getTime()
  31. )
  32. )
  33. .getDataStream()
  34. .map((MapFunction<KafkaRecord<JavaUser>, JavaUser>) KafkaRecord::value)
  35. .print();
  36. context.start();
  37. }
  38. }
  39. class JavaUserSchema implements KafkaDeserializationSchema<JavaUser> {
  40. private ObjectMapper mapper = new ObjectMapper();
  41. @Override public boolean isEndOfStream(JavaUser nextElement) return false;
  42. @Override public TypeInformation<JavaUser> getProducedType() return getForClass(JavaUser.class);
  43. @Override public JavaUser deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
  44. String value = new String(record.value());
  45. return mapper.readValue(value, JavaUser.class);
  46. }
  47. }
  48. class JavaUser implements Serializable {
  49. String name;
  50. Integer age;
  51. Integer gender;
  52. String address;
  53. Date orderTime;
  54. }

如果watermark assigner依赖于从Kafka读取的消息来上涨其watermark(通常就是这种情况),那么所有主题和分区都需要有连续的消息流。否则, 整个应用程序的watermark将无法上涨 ,所有基于时间的算子(例如时间窗口或带有计时器的函数)也无法运行。单个的Kafka分区也会导致这种反应。考虑设置适当的 idelness timeouts 来缓解这个问题。

Kafka Sink (Producer)

StreamParkKafka Producer 被称为KafkaSink,它允许将消息写入一个或多个Kafka topic中

Scala

  1. val source = KafkaSource().getDataStream[String]().map(_.value)
  2. KafkaSink().sink(source)

Java

  1. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  2. StreamingContext context = new StreamingContext(envConfig);
  3. DataStream<String> source = new KafkaSource<String>(context)
  4. .getDataStream()
  5. .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value);
  6. new KafkaSink<String>(context).sink(source);
  7. context.start();

sink是具体的写入数据的方法,参数列表如下

参数名 参数类型 作用 默认值 必须
stream DataStream[T] 要写的数据流 yes
alias String kafka的实例别名 no
serializationSchema SerializationSchema[T] 写入的序列化器 SimpleStringSchema no
partitioner FlinkKafkaPartitioner[T] kafka分区器 KafkaEqualityPartitioner[T] no

容错和语义

启用 Flink 的 checkpointing 后,KafkaSink 可以提供精确一次的语义保证,具体开启checkpointing的设置请参考第二章关于项目配置部分

除了启用 Flink 的 checkpointing,你也可以通过将适当的 semantic 参数传递给 KafkaSink 来选择三种不同的操作模式

  • EXACTLY_ONCE 使用 Kafka 事务提供精确一次语义
  • AT_LEAST_ONCE 至少一次,可以保证不会丢失任何记录(但是记录可能会重复)
  • NONE Flink 不会有任何语义的保证,产生的记录可能会丢失或重复

具体操作如下,只需要在kafka.sink下配置semantic即可

  1. kafka.sink:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. topic: kfk_sink
  4. transaction.timeout.ms: 1000
  5. semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  6. batch.size: 1
kafka EXACTLY_ONCE 语义说明 Semantic.EXACTLY_ONCE模式依赖于事务提交的能力。事务提交发生于触发 checkpoint 之前,以及从 checkpoint 恢复之后。如果从 Flink 应用程序崩溃到完全重启的时间超过了 Kafka 的事务超时时间,那么将会有数据丢失(Kafka 会自动丢弃超出超时时间的事务)。考虑到这一点,请根据预期的宕机时间来合理地配置事务超时时间。 默认情况下,Kafka broker 将 transaction.max.timeout.ms 设置为 15 分钟。此属性不允许为大于其值的 producer 设置事务超时时间。 默认情况下,FlinkKafkaProducer 将 producer config 中的 transaction.timeout.ms 属性设置为 1 小时,因此在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 transaction.max.timeout.ms 的值。 在 KafkaConsumer 的 read_committed 模式中,任何未结束(既未中止也未完成)的事务将阻塞来自给定 Kafka topic 的未结束事务之后的所有读取数据。 换句话说,在遵循如下一系列事件之后: 用户启动了 transaction1 并使用它写了一些记录 用户启动了 transaction2 并使用它编写了一些其他记录 用户提交了 transaction2 即使 transaction2 中的记录已提交,在提交或中止 transaction1 之前,消费者也不会看到这些记录。这有 2 层含义: 首先,在 Flink 应用程序的正常工作期间,用户可以预料 Kafka 主题中生成的记录的可见性会延迟,相当于已完成 checkpoint 之间的平均时间。 * 其次,在 Flink 应用程序失败的情况下,此应用程序正在写入的供消费者读取的主题将被阻塞,直到应用程序重新启动或配置的事务超时时间过去后,才恢复正常。此标注仅适用于有多个 agent 或者应用程序写入同一 Kafka 主题的情况。 注意:Semantic.EXACTLY_ONCE 模式为每个 FlinkKafkaProducer 实例使用固定大小的 KafkaProducer 池。每个 checkpoint 使用其中一个 producer。如果并发 checkpoint 的数量超过池的大小,FlinkKafkaProducer 将抛出异常,并导致整个应用程序失败。请合理地配置最大池大小和最大并发 checkpoint 数量。 注意:Semantic.EXACTLY_ONCE 会尽一切可能不留下任何逗留的事务,否则会阻塞其他消费者从这个 Kafka topic 中读取数据。但是,如果 Flink 应用程序在第一次 checkpoint 之前就失败了,那么在重新启动此类应用程序后,系统中不会有先前池大小(pool size)相关的信息。因此,在第一次 checkpoint 完成前对 Flink 应用程序进行缩容,且并发数缩容倍数大于安全系数 FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR 的值的话,是不安全的。

多实例kafka指定alias

如果写时有多个不同实例的kafka需要配置,同样采用alias来区别不用的kafka实例,配置如下:

  1. kafka.sink:
  2. kafka_cluster1:
  3. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  4. topic: kfk_sink
  5. transaction.timeout.ms: 1000
  6. semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  7. batch.size: 1
  8. kafka_cluster2:
  9. bootstrap.servers: kfk6:9092,kfk7:9092,kfk8:9092
  10. topic: kfk_sink
  11. transaction.timeout.ms: 1000
  12. semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  13. batch.size: 1

在写入的时候,需要手动指定alias,注意下scala api和java api在代码上稍有不同,scala直接在sink方法里指定参数,java api则是通过alias()方法来设置,其底层实现是完全一致的

Scala

  1. val source = KafkaSource().getDataStream[String]().map(_.value)
  2. KafkaSink().sink(source,alias = "kafka_cluster1")

Java

  1. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  2. StreamingContext context = new StreamingContext(envConfig);
  3. DataStream<String> source = new KafkaSource<String>(context)
  4. .getDataStream()
  5. .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value);
  6. new KafkaSink<String>(context).alias("kafka_cluster1").sink(source);
  7. context.start();

指定SerializationSchema

Flink Kafka Producer 需要知道如何将 Java/Scala 对象转化为二进制数据。 KafkaSerializationSchema 允许用户指定这样的schema, 相关操作方式和文档请参考官网文档

KafkaSink里默认不指定序列化方式,采用的是SimpleStringSchema来进行序列化,这里开发者可以显示的指定一个自定义的序列化器,通过serializationSchema参数指定即可,例如,将user对象安装自定义的格式写入kafka

Scala

  1. import org.apache.streampark.common.util.JsonUtils
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.flink.api.common.serialization.SerializationSchema
  4. import org.apache.streampark.flink.core.scala.sink.JdbcSink
  5. import org.apache.streampark.flink.core.scala.source.KafkaSource
  6. import org.apache.flink.api.scala._
  7. object KafkaSinkApp extends FlinkStreaming {
  8. override def handle(): Unit = {
  9. val source = KafkaSource()
  10. .getDataStream[String]()
  11. .map(x => JsonUtils.read[User](x.value))
  12. KafkaSink().sink[User](source, serialization = new SerializationSchema[User]() {
  13. override def serialize(user: User): Array[Byte] = {
  14. s"${user.name},${user.age},${user.gender},${user.address}".getBytes
  15. }
  16. })
  17. }
  18. }
  19. case class User(name: String, age: Int, gender: Int, address: String)

Java

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  3. import org.apache.streampark.flink.core.java.sink.KafkaSink;
  4. import org.apache.streampark.flink.core.java.source.KafkaSource;
  5. import org.apache.streampark.flink.core.scala.StreamingContext;
  6. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  7. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  8. import org.apache.flink.api.common.functions.FilterFunction;
  9. import org.apache.flink.api.common.functions.MapFunction;
  10. import org.apache.flink.api.common.serialization.SerializationSchema;
  11. import org.apache.flink.streaming.api.datastream.DataStream;
  12. import java.io.Serializable;
  13. public class kafkaSinkJavaApp {
  14. public static void main(String[] args) {
  15. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  16. StreamingContext context = new StreamingContext(envConfig);
  17. ObjectMapper mapper = new ObjectMapper();
  18. DataStream<JavaUser> source = new KafkaSource<String>(context)
  19. .getDataStream()
  20. .map((MapFunction<KafkaRecord<String>, JavaUser>) value ->
  21. mapper.readValue(value.value(), JavaUser.class));
  22. new KafkaSink<JavaUser>(context)
  23. .serializer(
  24. (SerializationSchema<JavaUser>) element ->
  25. String.format("%s,%d,%d,%s", element.name, element.age, element.gender, element.address).getBytes()
  26. ).sink(source);
  27. context.start();
  28. }
  29. }
  30. class JavaUser implements Serializable {
  31. String name;
  32. Integer age;
  33. Integer gender;
  34. String address;
  35. }

指定SerializationSchema

Flink Kafka Producer 需要知道如何将 Java/Scala 对象转化为二进制数据。 KafkaSerializationSchema 允许用户指定这样的schema, 相关操作方式和文档请参考官网文档

KafkaSink里默认不指定序列化方式,采用的是SimpleStringSchema来进行序列化,这里开发者可以显示的指定一个自定义的序列化器,通过serializationSchema参数指定即可,例如,将user对象安装自定义的格式写入kafka

Scala

  1. import org.apache.streampark.common.util.JsonUtils
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.flink.api.common.serialization.SerializationSchema
  4. import org.apache.streampark.flink.core.scala.sink.JdbcSink
  5. import org.apache.streampark.flink.core.scala.source.KafkaSource
  6. import org.apache.flink.api.scala._
  7. object KafkaSinkApp extends FlinkStreaming {
  8. override def handle(): Unit = {
  9. val source = KafkaSource()
  10. .getDataStream[String]()
  11. .map(x => JsonUtils.read[User](x.value))
  12. KafkaSink().sink[User](source, serialization = new SerializationSchema[User]() {
  13. override def serialize(user: User): Array[Byte] = {
  14. s"${user.name},${user.age},${user.gender},${user.address}".getBytes
  15. }
  16. })
  17. }
  18. }
  19. case class User(name: String, age: Int, gender: Int, address: String)

Java

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  3. import org.apache.streampark.flink.core.java.sink.KafkaSink;
  4. import org.apache.streampark.flink.core.java.source.KafkaSource;
  5. import org.apache.streampark.flink.core.scala.StreamingContext;
  6. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  7. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  8. import org.apache.flink.api.common.functions.FilterFunction;
  9. import org.apache.flink.api.common.functions.MapFunction;
  10. import org.apache.flink.api.common.serialization.SerializationSchema;
  11. import org.apache.flink.streaming.api.datastream.DataStream;
  12. import java.io.Serializable;
  13. public class kafkaSinkJavaApp {
  14. public static void main(String[] args) {
  15. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  16. StreamingContext context = new StreamingContext(envConfig);
  17. ObjectMapper mapper = new ObjectMapper();
  18. DataStream<JavaUser> source = new KafkaSource<String>(context)
  19. .getDataStream()
  20. .map((MapFunction<KafkaRecord<String>, JavaUser>) value ->
  21. mapper.readValue(value.value(), JavaUser.class));
  22. new KafkaSink<JavaUser>(context)
  23. .serializer(
  24. (SerializationSchema<JavaUser>) element ->
  25. String.format("%s,%d,%d,%s", element.name, element.age, element.gender, element.address).getBytes()
  26. ).sink(source);
  27. context.start();
  28. }
  29. }
  30. class JavaUser implements Serializable {
  31. String name;
  32. Integer age;
  33. Integer gender;
  34. String address;
  35. }

指定partitioner

KafkaSink允许显示的指定一个kafka分区器,不指定默认使用StreamPark内置的 KafkaEqualityPartitioner 分区器,顾名思义,该分区器可以均匀的将数据写到各个分区中去,scala api是通过partitioner参数来设置分区器, java api中是通过partitioner()方法来设置的

Flink Kafka Connector中默认使用的是 FlinkFixedPartitioner 分区器,该分区器需要特别注意sink的并行度和kafka的分区数,不然会出现往一个分区写