RocketMQ source connector

Support Apache RocketMQ Version

  • 4.9.0 (Or a newer version, for reference)

Support These Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Description

Source connector for Apache RocketMQ.

Source Options

Name Type Required Default Description
topics String yes - RocketMQ topic name. If there are multiple topics, use , to split, for example: "tpc1,tpc2".
name.srv.addr String yes - RocketMQ name server cluster address.
acl.enabled Boolean no false If true, access control is enabled, and access key and secret key need to be configured.
access.key String no
secret.key String no When ACL_ENABLED is true, secret key cannot be empty.
batch.size int no 100 RocketMQ consumer pull batch size
consumer.group String no SeaTunnel-Consumer-Group RocketMQ consumer group id, used to distinguish different consumer groups.
commit.on.checkpoint Boolean no true If true the consumer’s offset will be periodically committed in the background.
schema no - The structure of the data, including field names and field types.
format String no json Data format. The default format is json. Optional text format. The default field separator is “,”.If you customize the delimiter, add the “field.delimiter” option.
field.delimiter String no , Customize the field delimiter for data format
start.mode String no CONSUME_FROM_GROUP_OFFSETS The initial consumption pattern of consumers,there are several types: [CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP],[CONSUME_FROM_SPECIFIC_OFFSETS]
start.mode.offsets no
start.mode.timestamp Long no The time required for consumption mode to be “CONSUME_FROM_TIMESTAMP”.
partition.discovery.interval.millis long no -1 The interval for dynamically discovering topics and partitions.
common-options config no - Source plugin common parameters, please refer to Source Common Options for details.

start.mode.offsets

The offset required for consumption mode to be “CONSUME_FROM_SPECIFIC_OFFSETS”.

for example:

  1. start.mode.offsets = {
  2. topic1-0 = 70
  3. topic1-1 = 10
  4. topic1-2 = 10
  5. }

Task Example

Simple:

Consumer reads Rocketmq data and prints it to the console type

  1. env {
  2. parallelism = 1
  3. job.mode = "BATCH"
  4. }
  5. source {
  6. Rocketmq {
  7. name.srv.addr = "rocketmq-e2e:9876"
  8. topics = "test_topic_json"
  9. result_table_name = "rocketmq_table"
  10. schema = {
  11. fields {
  12. id = bigint
  13. c_map = "map<string, smallint>"
  14. c_array = "array<tinyint>"
  15. c_string = string
  16. c_boolean = boolean
  17. c_tinyint = tinyint
  18. c_smallint = smallint
  19. c_int = int
  20. c_bigint = bigint
  21. c_float = float
  22. c_double = double
  23. c_decimal = "decimal(2, 1)"
  24. c_bytes = bytes
  25. c_date = date
  26. c_timestamp = timestamp
  27. }
  28. }
  29. }
  30. }
  31. transform {
  32. # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  33. # please go to https://seatunnel.apache.org/docs/category/transform
  34. }
  35. sink {
  36. Console {
  37. }
  38. }

Specified format consumption Simple:

When I consume the topic data in json format parsing and pulling the number of bars each time is 400, the consumption starts from the original location

  1. env {
  2. parallelism = 1
  3. job.mode = "BATCH"
  4. }
  5. source {
  6. Rocketmq {
  7. name.srv.addr = "localhost:9876"
  8. topics = "test_topic"
  9. result_table_name = "rocketmq_table"
  10. start.mode = "CONSUME_FROM_FIRST_OFFSET"
  11. batch.size = "400"
  12. consumer.group = "test_topic_group"
  13. format = "json"
  14. format = json
  15. schema = {
  16. fields {
  17. c_map = "map<string, string>"
  18. c_array = "array<int>"
  19. c_string = string
  20. c_boolean = boolean
  21. c_tinyint = tinyint
  22. c_smallint = smallint
  23. c_int = int
  24. c_bigint = bigint
  25. c_float = float
  26. c_double = double
  27. c_decimal = "decimal(30, 8)"
  28. c_bytes = bytes
  29. c_date = date
  30. c_timestamp = timestamp
  31. }
  32. }
  33. }
  34. }
  35. transform {
  36. # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  37. # please go to https://seatunnel.apache.org/docs/category/transform
  38. }
  39. sink {
  40. Console {
  41. }
  42. }

Specified timestamp Simple:

This is to specify a time to consume, and I dynamically sense the existence of a new partition every 1000 milliseconds to pull the consumption

  1. env {
  2. parallelism = 1
  3. spark.app.name = "SeaTunnel"
  4. spark.executor.instances = 2
  5. spark.executor.cores = 1
  6. spark.executor.memory = "1g"
  7. spark.master = local
  8. job.mode = "BATCH"
  9. }
  10. source {
  11. Rocketmq {
  12. name.srv.addr = "localhost:9876"
  13. topics = "test_topic"
  14. partition.discovery.interval.millis = "1000"
  15. start.mode.timestamp="1694508382000"
  16. consumer.group="test_topic_group"
  17. format="json"
  18. format = json
  19. schema = {
  20. fields {
  21. c_map = "map<string, string>"
  22. c_array = "array<int>"
  23. c_string = string
  24. c_boolean = boolean
  25. c_tinyint = tinyint
  26. c_smallint = smallint
  27. c_int = int
  28. c_bigint = bigint
  29. c_float = float
  30. c_double = double
  31. c_decimal = "decimal(30, 8)"
  32. c_bytes = bytes
  33. c_date = date
  34. c_timestamp = timestamp
  35. }
  36. }
  37. }
  38. }
  39. transform {
  40. # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
  41. # please go to https://seatunnel.apache.org/docs/category/transform
  42. }
  43. sink {
  44. Console {
  45. }
  46. }