Apache Flink officially provides a connector to Apache Kafka connector for reading from or writing to a Kafka topic, providing exactly once processing semantics.

KafkaSource and KafkaSink in StreamPark are further encapsulated based on kafka connector from the official website, simplifying the development steps, making it easier to read and write data.

Dependencies

Apache Flink integrates with the generic Kafka connector, which tries to keep up with the latest version of the Kafka client. The version of the Kafka client used by this connector may change between Flink versions. The current Kafka client is backward compatible with Kafka broker version 0.10.0 or later. For more details on Kafka compatibility, please refer to the Apache Kafka official documentation.

  1. <dependency>
  2. <groupId>org.apache.streampark</groupId>
  3. <artifactId>streampark-flink-core</artifactId>
  4. <version>${project.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-connector-kafka_2.11</artifactId>
  9. <version>1.12.0</version>
  10. </dependency>

In the development phase, the following dependencies are also necessary

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-scala_${scala.binary.version}</artifactId>
  4. <version>${flink.version}</version>
  5. <scope>provided</scope>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-clients_${scala.binary.version}</artifactId>
  10. <version>${flink.version}</version>
  11. <scope>provided</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  16. <version>${flink.version}</version>
  17. <scope>provided</scope>
  18. </dependency>

Kafka Source (Consumer)

First, we introduce the standard kafka consumer approach based on the official website, the following code is taken from the official website documentation

  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "localhost:9092")
  3. properties.setProperty("group.id", "test")
  4. val stream = env.addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))

You can see a series of kafka connection information defined, this way the parameters are hard-coded, very insensitive, let’s see how to use StreamPark to access kafka data, we just define the configuration file in the rule format and then write the code

example

  1. kafka.source:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. topic: test_user
  4. group.id: user_01
  5. auto.offset.reset: earliest
  6. enable.auto.commit: true

The prefix kafka.source is fixed, and the parameters related to kafka properties must comply with the kafka official website specification for setting the parameter key

Scala

  1. package org.apache.streampark.flink.quickstart
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.streampark.flink.core.scala.sink.JdbcSink
  4. import org.apache.streampark.flink.core.scala.source.KafkaSource
  5. import org.apache.flink.api.scala._
  6. object kafkaSourceApp extends FlinkStreaming {
  7. override def handle(): Unit = {
  8. val source = KafkaSource().getDataStream[String]()
  9. print(source)
  10. }
  11. }

Java

  1. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  2. import org.apache.streampark.flink.core.java.source.KafkaSource;
  3. import org.apache.streampark.flink.core.scala.StreamingContext;
  4. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  5. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  6. import org.apache.flink.api.common.functions.MapFunction;
  7. import org.apache.flink.streaming.api.datastream.DataStream;
  8. public class KafkaSimpleJavaApp {
  9. public static void main(String[] args) {
  10. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  11. StreamingContext context = new StreamingContext(envConfig);
  12. DataStream<String> source = new KafkaSource<String>(context)
  13. .getDataStream()
  14. .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value);
  15. source.print();
  16. context.start();
  17. }
  18. }

Advanced configuration parameters

KafkaSource is based on the Flink Kafka Connector construct a simpler kafka reading class, the constructor needs to pass StreamingContext, when the program starts to pass the configuration file can be, framework will automatically parse the configuration file, when new KafkaSource it will automatically get the relevant information from the configuration file, initialize and return a Kafka Consumer, in this case, only configuration one topic, so in the consumption of the time without specifying the topic directly by default to get this topic to consume, this is the simple example, more complex rules and read operations through the . getDataStream() pass parameters in the method to achieve Let’s look at the signature of the getDataStream method

  1. def getDataStream[T: TypeInformation](topic: java.io.Serializable = null,
  2. alias: String = "",
  3. deserializer: KafkaDeserializationSchema[T],
  4. strategy: WatermarkStrategy[KafkaRecord[T]] = null
  5. ): DataStream[KafkaRecord[T]]

The specific description of the parameters are as follows

Parameter Name Parameter Type Description Default
topic Serializable a topic or group of topics
alias String distinguish different kafka instances
deserializer DeserializationSchema deserialize class of the data in the topic KafkaStringDeserializationSchema
strategy WatermarkStrategy watermark generation strategy

Let’s take a look at more usage and configuration methods

Consume multiple Kafka instances Consume multiple topics Topic dynamic discovery Consume from the specified offset Specific KafkaDeserializationSchema Specific WatermarkStrategy

Consume multiple Kafka instances

StreamPark has taken into account the configuration of kafka of multiple different instances at the beginning of development . How to unify the configuration, and standardize the format? The solution in streampark is this, if we want to consume two different instances of kafka at the same time, the configuration file is defined as follows, As you can see in the kafka.source directly under the kafka instance name, here we unified called alias , alias must be unique, to distinguish between different instances If there is only one kafka instance, then you can not configure alias When writing the code for consumption, pay attention to the corresponding alias can be specified, the configuration and code is as follows

Setting

  1. kafka.source:
  2. kafka1:
  3. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  4. topic: test_user
  5. group.id: user_01
  6. auto.offset.reset: earliest
  7. enable.auto.commit: true
  8. kafka2:
  9. bootstrap.servers: kfk4:9092,kfk5:9092,kfk6:9092
  10. topic: kafka2
  11. group.id: kafka2
  12. auto.offset.reset: earliest
  13. enable.auto.commit: true

Scala

  1. KafkaSource().getDataStream[String](alias = "kafka1")
  2. .uid("kfkSource1")
  3. .name("kfkSource1")
  4. .print()
  5. KafkaSource().getDataStream[String](alias = "kafka2")
  6. .uid("kfkSource2")
  7. .name("kfkSource2")
  8. .print()

Java

  1. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  2. StreamingContext context = new StreamingContext(envConfig);
  3. DataStream<String> source1 = new KafkaSource<String>(context)
  4. .alias("kafka1")
  5. .getDataStream()
  6. print();
  7. DataStream<String> source2 = new KafkaSource<String>(context)
  8. .alias("kafka2")
  9. .getDataStream()
  10. .print();
  11. context.start();

When writing code in java api, be sure to place the settings of these parameters such as alias before calling getDataStream()

Consume multiple topics

Configure the consumption of multiple topic is also very simple, in the configuration file topic can be configured under multiple topic name, separated by , or space, in the scala api, if the consumption of a topic, then directly pass the topic name can be, if you want to consume multiple, pass a List can be java api through the topic() method to pass in the topic name, which is a variable parameter of type String, can be accepted in one or more topic name, configuration and code as follows

Setting

  1. kafka.source:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. topic: topic1,topic2,topic3...
  4. group.id: user_01
  5. auto.offset.reset: earliest # (earliest | latest)
  6. ...

Scala

  1. KafkaSource().getDataStream[String](topic = "topic1")
  2. .uid("kfkSource1")
  3. .name("kfkSource1")
  4. .print()
  5. KafkaSource().getDataStream[String](topic = List("topic1","topic2","topic3"))
  6. .uid("kfkSource1")
  7. .name("kfkSource1")
  8. .print()

Java

  1. DataStream<String> source1 = new KafkaSource<String>(context)
  2. .topic("topic1")
  3. .getDataStream()
  4. .print();
  5. DataStream<String> source1 = new KafkaSource<String>(context)
  6. .topic("topic1","topic2")
  7. .getDataStream()
  8. .print();

topic supports configuring multiple instances of topic, each topic is directly separated by , or separated by spaces, if multiple instances are configured under the topic, the specific topic name must be specified when consuming

Topic dynamic discovery

Regarding kafka’s partition dynamics, by default, partition discovery is disabled. To enable it, please set flink.partition-discovery.interval-millis to greater than 0 in the provided configuration, which means the interval between partition discovery is in milliseconds For more details, please refer to the official website documentation

Flink Kafka Consumer is also able to discover Topics using regular expressions, please refer to the official website documentation A simpler way is provided in StreamPark, you need to configure the regular pattern of the matching topic instance name in pattern

Setting

  1. kafka.source:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. pattern: ^topic[1-9]
  4. group.id: user_02
  5. auto.offset.reset: earliest # (earliest | latest)
  6. ...

Scala

  1. KafkaSource().getDataStream[String](topic = "topic-a")
  2. .uid("kfkSource1")
  3. .name("kfkSource1")
  4. .print()

Java

  1. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  2. StreamingContext context = new StreamingContext(envConfig);
  3. new KafkaSource<String>(context)
  4. .topic("topic-a")
  5. .getDataStream()
  6. .print();
  7. context.start();

topic and pattern can not be configured at the same time, when configured with pattern regular match, you can still specify a definite topic name, at this time, will check whether pattern matches the current topic, if not, will be reported an error

Consume from the specified offset

Flink Kafka Consumer allows the starting position of Kafka partitions to be determined by configuration, official website documentation The starting position of a Kafka partition is specified as follows

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val myConsumer = new FlinkKafkaConsumer[String](...)
  3. myConsumer.setStartFromEarliest()
  4. myConsumer.setStartFromLatest()
  5. myConsumer.setStartFromTimestamp(...)
  6. myConsumer.setStartFromGroupOffsets()
  7. val stream = env.addSource(myConsumer)
  8. ...

Java

  1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
  3. myConsumer.setStartFromEarliest();
  4. myConsumer.setStartFromLatest();
  5. myConsumer.setStartFromTimestamp(...);
  6. myConsumer.setStartFromGroupOffsets();
  7. DataStream<String> stream = env.addSource(myConsumer);
  8. ...

This setting is not recommended in StreamPark, a more convenient way is provided by specifying auto.offset.reset in the configuration

  • earliest consume from earliest record
  • latest consume from latest record

Consume from the specified offset

You can also specify the offset to start consumption for each partition, simply by configuring the start.from related information in the following configuration file

  1. kafka.source:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. topic: topic1,topic2,topic3...
  4. group.id: user_01
  5. auto.offset.reset: earliest # (earliest | latest)
  6. start.from:
  7. timestamp: 1591286400000 # Specify timestamp to take effect for all topics
  8. offset: # Specify an offset for the topic's partition
  9. topic: topic_abc,topic_123
  10. topic_abc: 0:182,1:183,2:182 # Partition 0 starts consumption from 182, partition 1 starts from 183, and partition 2 starts from 182...
  11. topic_123: 0:182,1:183,2:182
  12. ...

Specific deserializer

If you do not specify deserializer, the data in the topic will be deserialized by using String by default. At the same time, you can also manually specify deserializer. The complete code is as follows

Scala

  1. import org.apache.streampark.common.util.JsonUtils
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.streampark.flink.core.scala.sink.JdbcSink
  4. import org.apache.streampark.flink.core.scala.source.KafkaSource
  5. import org.apache.flink.api.common.typeinfo.TypeInformation
  6. import org.apache.flink.api.java.typeutils.TypeExtractor.getForClass
  7. import org.apache.flink.api.scala._
  8. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
  9. object KafkaSourceApp extends FlinkStreaming {
  10. override def handle(): Unit = {
  11. KafkaSource()
  12. .getDataStream[String](deserializer = new UserSchema)
  13. .map(_.value)
  14. .print()
  15. }
  16. }
  17. class UserSchema extends KafkaDeserializationSchema[User] {
  18. override def isEndOfStream(nextElement: User): Boolean = false
  19. override def getProducedType: TypeInformation[User] = getForClass(classOf[User])
  20. override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): User = {
  21. val value = new String(record.value())
  22. JsonUtils.read[User](value)
  23. }
  24. }
  25. case class User(name:String,age:Int,gender:Int,address:String)

Java

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  3. import org.apache.streampark.flink.core.java.source.KafkaSource;
  4. import org.apache.streampark.flink.core.scala.StreamingContext;
  5. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  6. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  7. import org.apache.flink.api.common.functions.MapFunction;
  8. import org.apache.flink.api.common.typeinfo.TypeInformation;
  9. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
  10. import org.apache.kafka.clients.consumer.ConsumerRecord;
  11. import java.io.Serializable;
  12. import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
  13. public class KafkaSourceJavaApp {
  14. public static void main(String[] args) {
  15. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  16. StreamingContext context = new StreamingContext(envConfig);
  17. new KafkaSource<JavaUser>(context)
  18. .deserializer(new JavaUserSchema())
  19. .getDataStream()
  20. .map((MapFunction<KafkaRecord<JavaUser>, JavaUser>) KafkaRecord::value)
  21. .print();
  22. context.start();
  23. }
  24. }
  25. class JavaUserSchema implements KafkaDeserializationSchema<JavaUser> {
  26. private ObjectMapper mapper = new ObjectMapper();
  27. @Override public boolean isEndOfStream(JavaUser nextElement) return false;
  28. @Override public TypeInformation<JavaUser> getProducedType() return getForClass(JavaUser.class);
  29. @Override public JavaUser deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
  30. String value = new String(record.value());
  31. return mapper.readValue(value, JavaUser.class);
  32. }
  33. }
  34. class JavaUser implements Serializable {
  35. String name;
  36. Integer age;
  37. Integer gender;
  38. String address;
  39. }

Return record Kafka Record

The returned object is wrapped in a KafkaRecord, which has the current offset, partition, timestamp and many other useful information for developers to use, where value is the target object returned, as shown below:

Apache Kafka Connector - 图1

Specific strategy

In many case, the timestamp of the record is embedded (explicitly or implicitly) in the record itself. In addition, users may want to specify in a custom way, for example a special record in a Kafka stream containing a watermark of the current event time. For these cases, Flink Kafka Consumer is allowed to specify AssignerWithPeriodicWatermarks or AssignerWithPunctuatedWatermarks.

In the StreamPark run pass a WatermarkStrategy as a parameter to assign a Watermark, for example, parse the data in the topic as a user object, there is an orderTime in user which is a time type, we use this as a base to assign a Watermark to it

Scala

  1. import org.apache.streampark.common.util.JsonUtils
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.streampark.flink.core.scala.source.{KafkaRecord, KafkaSource}
  4. import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
  5. import org.apache.flink.api.common.typeinfo.TypeInformation
  6. import org.apache.flink.api.java.typeutils.TypeExtractor.getForClass
  7. import org.apache.flink.api.scala._
  8. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
  9. import org.apache.kafka.clients.consumer.ConsumerRecord
  10. import java.time.Duration
  11. import java.util.Date
  12. object KafkaSourceStrategyApp extends FlinkStreaming {
  13. override def handle(): Unit = {
  14. KafkaSource()
  15. .getDataStream[User](
  16. deserializer = new UserSchema,
  17. strategy = WatermarkStrategy
  18. .forBoundedOutOfOrderness[KafkaRecord[User]](Duration.ofMinutes(1))
  19. .withTimestampAssigner(new SerializableTimestampAssigner[KafkaRecord[User]] {
  20. override def extractTimestamp(element: KafkaRecord[User], recordTimestamp: Long): Long = {
  21. element.value.orderTime.getTime
  22. }
  23. })
  24. ).map(_.value)
  25. .print()
  26. }
  27. }
  28. class UserSchema extends KafkaDeserializationSchema[User] {
  29. override def isEndOfStream(nextElement: User): Boolean = false
  30. override def getProducedType: TypeInformation[User] = getForClass(classOf[User])
  31. override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): User = {
  32. val value = new String(record.value())
  33. JsonUtils.read[User](value)
  34. }
  35. }
  36. case class User(name: String, age: Int, gender: Int, address: String, orderTime: Date)

Java

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  3. import org.apache.streampark.flink.core.java.source.KafkaSource;
  4. import org.apache.streampark.flink.core.scala.StreamingContext;
  5. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  6. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  7. import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
  8. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
  9. import org.apache.flink.api.common.functions.MapFunction;
  10. import org.apache.flink.api.common.typeinfo.TypeInformation;
  11. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
  12. import org.apache.kafka.clients.consumer.ConsumerRecord;
  13. import java.io.Serializable;
  14. import java.time.Duration;
  15. import java.util.Date;
  16. import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
  17. public class KafkaSourceStrategyJavaApp {
  18. public static void main(String[] args) {
  19. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  20. StreamingContext context = new StreamingContext(envConfig);
  21. new KafkaSource<JavaUser>(context)
  22. .deserializer(new JavaUserSchema())
  23. .strategy(
  24. WatermarkStrategy.<KafkaRecord<JavaUser>>forBoundedOutOfOrderness(Duration.ofMinutes(1))
  25. .withTimestampAssigner(
  26. (SerializableTimestampAssigner<KafkaRecord<JavaUser>>)
  27. (element, recordTimestamp) -> element.value().orderTime.getTime()
  28. )
  29. )
  30. .getDataStream()
  31. .map((MapFunction<KafkaRecord<JavaUser>, JavaUser>) KafkaRecord::value)
  32. .print();
  33. context.start();
  34. }
  35. }
  36. class JavaUserSchema implements KafkaDeserializationSchema<JavaUser> {
  37. private ObjectMapper mapper = new ObjectMapper();
  38. @Override public boolean isEndOfStream(JavaUser nextElement) return false;
  39. @Override public TypeInformation<JavaUser> getProducedType() return getForClass(JavaUser.class);
  40. @Override public JavaUser deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
  41. String value = new String(record.value());
  42. return mapper.readValue(value, JavaUser.class);
  43. }
  44. }
  45. class JavaUser implements Serializable {
  46. String name;
  47. Integer age;
  48. Integer gender;
  49. String address;
  50. Date orderTime;
  51. }

If the watermark assigner relies on messages read from Kafka to raise the watermark (which is usually the case), then all topics and partitions need to have a continuous stream of messages. Otherwise, the application’s watermark will not rise and all time-based arithmetic (such as time windows or functions with timers) will not work. A single Kafka partition can also cause this reaction. Consider setting the appropriate idelness timeouts to mitigate this problem.

Kafka Sink (Producer)

In StreamPark the Kafka Producer is called KafkaSink, which allows messages to be written to one or more Kafka topics.

Scala

  1. val source = KafkaSource().getDataStream[String]().map(_.value)
  2. KafkaSink().sink(source)

Java

  1. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  2. StreamingContext context = new StreamingContext(envConfig);
  3. DataStream<String> source = new KafkaSource<String>(context)
  4. .getDataStream()
  5. .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value);
  6. new KafkaSink<String>(context).sink(source);
  7. context.start();

sink is a specific method for writing data, and the list of parameters is as follows

Parameter Name Parameter Type Description Default Required
stream DataStream[T] data stream to write yes
alias String kafka instance alias no
serializationSchema SerializationSchema[T] serializer written SimpleStringSchema no
partitioner FlinkKafkaPartitioner[T] kafka partitioner KafkaEqualityPartitioner[T] no

Fault Tolerance and Semantics

After enabling Flink’s checkpointing, KafkaSink can provide once-exactly semantic, please refer to Chapter 2 on project configuration for the specific setting of checkpointing.

In addition to enabling checkpointing for Flink, you can also choose from three different modes by passing the appropriate semantic parameters to KafkaSink

EXACTLY_ONCE Provide exactly-once semantics with Kafka transactions AT_LEAST_ONCE At least once, it is guaranteed that no records will be lost (but records may be duplicated) * NONE Flink There is no guarantee of semantics, and the resulting records may be lost or duplicated

configure semantic under the kafka.sink

  1. kafka.sink:
  2. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  3. topic: kfk_sink
  4. transaction.timeout.ms: 1000
  5. semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  6. batch.size: 1
kafka EXACTLY_ONCE Semantic Description The Semantic.EXACTLY_ONCE relies on the ability for transactions to be committed. Transaction commits occur before the checkpoint is triggered and after recovery from the checkpoint. If the time between a Flink application crash and a full restart exceeds Kafka’s transaction timeout, then there will be caused data loss (Kafka automatically discards transactions that exceed the timeout). With this in mind, please configure the transaction timeout time based on the expected downtime. By default, the Kafka broker sets transaction.max.timeout.ms to 15 minutes. This property does not allow setting the transaction timeout larger than producers value. By default, FlinkKafkaProducer sets the transaction.timeout.ms property in the producer config to 1 hour, so you should increase the transaction.max.timeout. ms before using Semantic. In the read_committed mode of KafkaConsumer, any uncompleted (neither aborted nor completed) transaction will block all reads after the uncompleted transaction from the given Kafka topic. In other words, after following the following sequence of events.
User started transaction1 and used it to write some records User started transaction2 and used it to write some other records User committed transaction2
Even if the records in transaction2 are committed, they will not be visible to the consumer until transaction1 is committed or aborted. This has 2 levels of implications. First, during normal operation of the Flink application, the user can expect a delay in the visibility of records, it equals to the average time between completed checkpoints. * Second, in the case of a failed Flink application, the topic written by this application will be blocked until the application is restarted or the configured transaction timeout has elapsed, and normalcy will resume. This annotation only applies multi agents or applications writing to the same Kafka topic. Note: The Semantic.EXACTLY_ONCE mode uses a fixed size pool of KafkaProducers for each FlinkKafkaProducer instance. Each checkpoint uses one of the producers. If the number of concurrent checkpoints exceeds the pool size, FlinkKafkaProducer will throw an exception and cause the entire application to fail. Please configure the maximum pool size and the maximum number of concurrent checkpoints wisely. Note: Semantic.EXACTLY_ONCE will do everything possible to not leave any stay transactions that would otherwise block other consumers from reading data from this Kafka topic. However, if a Flink application fails before the first checkpoint, there will be no information about the previous pool size in the system after restarting such an application. Therefore, it is not safe to scale down a Flink application before the first checkpoint is completed and the concurrent number of scaling is greater than the value of the safety factor FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.

Multiple instance kafka specifies alias

If there are multiple instances of kafka that need to be configured at the time of writing, used alias to distinguish between multi kafka instances, configured as follows:

  1. kafka.sink:
  2. kafka_cluster1:
  3. bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
  4. topic: kfk_sink
  5. transaction.timeout.ms: 1000
  6. semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  7. batch.size: 1
  8. kafka_cluster2:
  9. bootstrap.servers: kfk6:9092,kfk7:9092,kfk8:9092
  10. topic: kfk_sink
  11. transaction.timeout.ms: 1000
  12. semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
  13. batch.size: 1

When writing data, you need to manually specify alias. Note that the scala api and java api are different in code. scala specifies parameters directly in the sink method, while the java api is It is set by the alias() method

Scala

  1. val source = KafkaSource().getDataStream[String]().map(_.value)
  2. KafkaSink().sink(source,alias = "kafka_cluster1")

Java

  1. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  2. StreamingContext context = new StreamingContext(envConfig);
  3. DataStream<String> source = new KafkaSource<String>(context)
  4. .getDataStream()
  5. .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value);
  6. new KafkaSink<String>(context).alias("kafka_cluster1").sink(source);
  7. context.start();

Specific SerializationSchema

Flink Kafka Producer needs know how to convert Java/Scala objects to binary data. KafkaSerializationSchema allows users to specify such a schema, please refer to the official documentation for how to do this and documentation

In KafkaSink, the default serialization is not specified, and the SimpleStringSchema is used for serialization, where the developer can specify a custom serializer, specified by the serializationSchema parameter, for example, to write the user object to a custom format kafka

Scala

  1. import org.apache.streampark.common.util.JsonUtils
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.flink.api.common.serialization.SerializationSchema
  4. import org.apache.streampark.flink.core.scala.sink.JdbcSink
  5. import org.apache.streampark.flink.core.scala.source.KafkaSource
  6. import org.apache.flink.api.scala._
  7. object KafkaSinkApp extends FlinkStreaming {
  8. override def handle(): Unit = {
  9. val source = KafkaSource()
  10. .getDataStream[String]()
  11. .map(x => JsonUtils.read[User](x.value))
  12. KafkaSink().sink[User](source, serialization = new SerializationSchema[User]() {
  13. override def serialize(user: User): Array[Byte] = {
  14. s"${user.name},${user.age},${user.gender},${user.address}".getBytes
  15. }
  16. })
  17. }
  18. }
  19. case class User(name: String, age: Int, gender: Int, address: String)

Java

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  3. import org.apache.streampark.flink.core.java.sink.KafkaSink;
  4. import org.apache.streampark.flink.core.java.source.KafkaSource;
  5. import org.apache.streampark.flink.core.scala.StreamingContext;
  6. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  7. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  8. import org.apache.flink.api.common.functions.FilterFunction;
  9. import org.apache.flink.api.common.functions.MapFunction;
  10. import org.apache.flink.api.common.serialization.SerializationSchema;
  11. import org.apache.flink.streaming.api.datastream.DataStream;
  12. import java.io.Serializable;
  13. public class kafkaSinkJavaApp {
  14. public static void main(String[] args) {
  15. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  16. StreamingContext context = new StreamingContext(envConfig);
  17. ObjectMapper mapper = new ObjectMapper();
  18. DataStream<JavaUser> source = new KafkaSource<String>(context)
  19. .getDataStream()
  20. .map((MapFunction<KafkaRecord<String>, JavaUser>) value ->
  21. mapper.readValue(value.value(), JavaUser.class));
  22. new KafkaSink<JavaUser>(context)
  23. .serializer(
  24. (SerializationSchema<JavaUser>) element ->
  25. String.format("%s,%d,%d,%s", element.name, element.age, element.gender, element.address).getBytes()
  26. ).sink(source);
  27. context.start();
  28. }
  29. }
  30. class JavaUser implements Serializable {
  31. String name;
  32. Integer age;
  33. Integer gender;
  34. String address;
  35. }

Specific SerializationSchema

Flink Kafka Producer needs know how to convert Java/Scala objects to binary data. KafkaSerializationSchema allows users to specify such a schema, please refer to the official documentation for how to do this and documentation

In KafkaSink, the default serialization is not specified, and the SimpleStringSchema is used for serialization, where the developer can specify a custom serializer, specified by the serializationSchema parameter, for example, to write the user object to a custom format kafka

Scala

  1. import org.apache.streampark.common.util.JsonUtils
  2. import org.apache.streampark.flink.core.scala.FlinkStreaming
  3. import org.apache.flink.api.common.serialization.SerializationSchema
  4. import org.apache.streampark.flink.core.scala.sink.JdbcSink
  5. import org.apache.streampark.flink.core.scala.source.KafkaSource
  6. import org.apache.flink.api.scala._
  7. object KafkaSinkApp extends FlinkStreaming {
  8. override def handle(): Unit = {
  9. val source = KafkaSource()
  10. .getDataStream[String]()
  11. .map(x => JsonUtils.read[User](x.value))
  12. KafkaSink().sink[User](source, serialization = new SerializationSchema[User]() {
  13. override def serialize(user: User): Array[Byte] = {
  14. s"${user.name},${user.age},${user.gender},${user.address}".getBytes
  15. }
  16. })
  17. }
  18. }
  19. case class User(name: String, age: Int, gender: Int, address: String)

Java

  1. import com.fasterxml.jackson.databind.ObjectMapper;
  2. import org.apache.streampark.flink.core.java.function.StreamEnvConfigFunction;
  3. import org.apache.streampark.flink.core.java.sink.KafkaSink;
  4. import org.apache.streampark.flink.core.java.source.KafkaSource;
  5. import org.apache.streampark.flink.core.scala.StreamingContext;
  6. import org.apache.streampark.flink.core.scala.source.KafkaRecord;
  7. import org.apache.streampark.flink.core.scala.util.StreamEnvConfig;
  8. import org.apache.flink.api.common.functions.FilterFunction;
  9. import org.apache.flink.api.common.functions.MapFunction;
  10. import org.apache.flink.api.common.serialization.SerializationSchema;
  11. import org.apache.flink.streaming.api.datastream.DataStream;
  12. import java.io.Serializable;
  13. public class kafkaSinkJavaApp {
  14. public static void main(String[] args) {
  15. StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
  16. StreamingContext context = new StreamingContext(envConfig);
  17. ObjectMapper mapper = new ObjectMapper();
  18. DataStream<JavaUser> source = new KafkaSource<String>(context)
  19. .getDataStream()
  20. .map((MapFunction<KafkaRecord<String>, JavaUser>) value ->
  21. mapper.readValue(value.value(), JavaUser.class));
  22. new KafkaSink<JavaUser>(context)
  23. .serializer(
  24. (SerializationSchema<JavaUser>) element ->
  25. String.format("%s,%d,%d,%s", element.name, element.age, element.gender, element.address).getBytes()
  26. ).sink(source);
  27. context.start();
  28. }
  29. }
  30. class JavaUser implements Serializable {
  31. String name;
  32. Integer age;
  33. Integer gender;
  34. String address;
  35. }

specific partitioner

KafkaSink allows you to specify a kafka partitioner, if you don’t specify it, the default is to use StreamPark built-in KafkaEqualityPartitioner partitioner, as the name, the partitioner can write data to each partition evenly, the scala api is set by the partitioner parameter to set the partitioner, java api is set by partitioner() method

The default partitioner used in Flink Kafka Connector is FlinkFixedPartitioner, which requires special attention to the parallelism of sink and the number of partitions of kafka, otherwise it will write to a partition