RocketMQ sink connector

Support Apache RocketMQ Version

  • 4.9.0 (Or a newer version, for reference)

Support These Engines

Spark
Flink
SeaTunnel Zeta

Key Features

By default, we will use 2pc to guarantee the message is sent to RocketMQ exactly once.

Description

Write Rows to a Apache RocketMQ topic.

Sink Options

Name Type Required Default Description
topic string yes - RocketMQ topic name.
name.srv.addr string yes - RocketMQ name server cluster address.
acl.enabled Boolean no false false
access.key String no When ACL_ENABLED is true, access key cannot be empty
secret.key String no When ACL_ENABLED is true, secret key cannot be empty
producer.group String no SeaTunnel-producer-Group SeaTunnel-producer-Group
partition.key.fields array no - -
format String no json Data format. The default format is json. Optional text format. The default field separator is “,”.If you customize the delimiter, add the “field_delimiter” option.
field.delimiter String no , Customize the field delimiter for data format.
producer.send.sync Boolean no false If true, the message will be sync sent.
common-options config no - Sink plugin common parameters, please refer to Sink Common Options for details.

partition.key.fields [array]

Configure which fields are used as the key of the RocketMQ message.

For example, if you want to use value of fields from upstream data as key, you can assign field names to this property.

Upstream data is the following:

name age data
Jack 16 data-example1
Mary 23 data-example2

If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.

Task Example

Fake to Rocketmq Simple

The data is randomly generated and asynchronously sent to the test topic

  1. env {
  2. parallelism = 1
  3. }
  4. source {
  5. FakeSource {
  6. schema = {
  7. fields {
  8. c_map = "map<string, string>"
  9. c_array = "array<int>"
  10. c_string = string
  11. c_boolean = boolean
  12. c_tinyint = tinyint
  13. c_smallint = smallint
  14. c_int = int
  15. c_bigint = bigint
  16. c_float = float
  17. c_double = double
  18. c_decimal = "decimal(30, 8)"
  19. c_bytes = bytes
  20. c_date = date
  21. c_timestamp = timestamp
  22. }
  23. }
  24. }
  25. }
  26. transform {
  27. # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  28. # please go to https://seatunnel.apache.org/docs/category/transform
  29. }
  30. sink {
  31. Rocketmq {
  32. name.srv.addr = "localhost:9876"
  33. topic = "test_topic"
  34. }
  35. }

Rocketmq To Rocketmq Simple

Consuming Rocketmq writes to c_int field Hash number of partitions written to different partitions This is the default asynchronous way to write

  1. env {
  2. parallelism = 1
  3. }
  4. source {
  5. Rocketmq {
  6. name.srv.addr = "localhost:9876"
  7. topics = "test_topic"
  8. result_table_name = "rocketmq_table"
  9. schema = {
  10. fields {
  11. c_map = "map<string, string>"
  12. c_array = "array<int>"
  13. c_string = string
  14. c_boolean = boolean
  15. c_tinyint = tinyint
  16. c_smallint = smallint
  17. c_int = int
  18. c_bigint = bigint
  19. c_float = float
  20. c_double = double
  21. c_decimal = "decimal(30, 8)"
  22. c_bytes = bytes
  23. c_date = date
  24. c_timestamp = timestamp
  25. }
  26. }
  27. }
  28. }
  29. sink {
  30. Rocketmq {
  31. name.srv.addr = "localhost:9876"
  32. topic = "test_topic_sink"
  33. partition.key.fields = ["c_int"]
  34. }
  35. }

Timestamp consumption write Simple

This is a stream consumption specified time stamp consumption, when there are new partitions added the program will refresh the perception and consumption at intervals, and write to another topic type

  1. env {
  2. parallelism = 1
  3. job.mode = "STREAMING"
  4. }
  5. source {
  6. Rocketmq {
  7. name.srv.addr = "localhost:9876"
  8. topics = "test_topic"
  9. result_table_name = "rocketmq_table"
  10. start.mode = "CONSUME_FROM_FIRST_OFFSET"
  11. batch.size = "400"
  12. consumer.group = "test_topic_group"
  13. format = "json"
  14. format = json
  15. schema = {
  16. fields {
  17. c_map = "map<string, string>"
  18. c_array = "array<int>"
  19. c_string = string
  20. c_boolean = boolean
  21. c_tinyint = tinyint
  22. c_smallint = smallint
  23. c_int = int
  24. c_bigint = bigint
  25. c_float = float
  26. c_double = double
  27. c_decimal = "decimal(30, 8)"
  28. c_bytes = bytes
  29. c_date = date
  30. c_timestamp = timestamp
  31. }
  32. }
  33. }
  34. }
  35. transform {
  36. # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  37. # please go to https://seatunnel.apache.org/docs/category/transform
  38. }
  39. sink {
  40. Rocketmq {
  41. name.srv.addr = "localhost:9876"
  42. topic = "test_topic"
  43. partition.key.fields = ["c_int"]
  44. producer.send.sync = true
  45. }
  46. }