MongoDB CDC source connector

Support Those Engines

SeaTunnel Zeta
Flink

Key Features

Description

The MongoDB CDC connector allows for reading snapshot data and incremental data from MongoDB database.

Supported DataSource Info

In order to use the Mongodb CDC connector, the following dependencies are required. They can be downloaded via install-plugin.sh or from the Maven central repository.

Datasource Supported Versions Dependency
MongoDB universal Download

Availability Settings

1.MongoDB version: MongoDB version >= 4.0.

2.Cluster deployment: replica sets or sharded clusters.

3.Storage Engine: WiredTiger Storage Engine.

4.Permissions:changeStream and read

  1. use admin;
  2. db.createRole(
  3. {
  4. role: "strole",
  5. privileges: [{
  6. resource: { db: "", collection: "" },
  7. actions: [
  8. "splitVector",
  9. "listDatabases",
  10. "listCollections",
  11. "collStats",
  12. "find",
  13. "changeStream" ]
  14. }],
  15. roles: [
  16. { role: 'read', db: 'config' }
  17. ]
  18. }
  19. );
  20. db.createUser(
  21. {
  22. user: 'stuser',
  23. pwd: 'stpw',
  24. roles: [
  25. { role: 'strole', db: 'admin' }
  26. ]
  27. }
  28. );

Data Type Mapping

The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.

MongoDB BSON Type SeaTunnel Data Type
ObjectId STRING
String STRING
Boolean BOOLEAN
Binary BINARY
Int32 INTEGER
Int64 BIGINT
Double DOUBLE
Decimal128 DECIMAL
Date DATE
Timestamp TIMESTAMP
Object ROW
Array ARRAY

For specific types in MongoDB, we use Extended JSON format to map them to Seatunnel STRING type.

MongoDB BSON type SeaTunnel STRING
Symbol {“_value”: {“$symbol”: “12”}}
RegularExpression {“_value”: {“$regularExpression”: {“pattern”: “^9$”, “options”: “i”}}}
JavaScript {“_value”: {“$code”: “function() { return 10; }”}}
DbPointer {“_value”: {“$dbPointer”: {“$ref”: “db.coll”, “$id”: {“$oid”: “63932a00da01604af329e33c”}}}}

Tips

1.When using the DECIMAL type in SeaTunnel, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).

Source Options

Name Type Required Default Description
hosts String Yes - The comma-separated list of hostname and port pairs of the MongoDB servers. eg. localhost:27017,localhost:27018
username String No - Name of the database user to be used when connecting to MongoDB.
password String No - Password to be used when connecting to MongoDB.
database List Yes - Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. eg. db1,db2.
collection List Yes - Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. eg. db1.coll1,db2.coll2.
connection.options String No - The ampersand-separated connection options of MongoDB. eg. replicaSet=test&connectTimeoutMS=300000.
batch.size Long No 1024 The cursor batch size.
poll.max.batch.size Enum No 1024 Maximum number of change stream documents to include in a single batch when polling for new data.
poll.await.time.ms Long No 1000 The amount of time to wait before checking for new results on the change stream.
heartbeat.interval.ms String No 0 The length of time in milliseconds between sending heartbeat messages. Use 0 to disable.
incremental.snapshot.chunk.size.mb Long No 64 The chunk size mb of incremental snapshot.
common-options No - Source plugin common parameters, please refer to Source Common Options for details.

Tips:

1.If the collection changes at a slow pace, it is strongly recommended to set an appropriate value greater than 0 for the heartbeat.interval.ms parameter. When we recover a Seatunnel job from a checkpoint or savepoint, the heartbeat events can push the resumeToken forward to avoid its expiration.
2.MongoDB has a limit of 16MB for a single document. Change documents include additional information, so even if the original document is not larger than 15MB, the change document may exceed the 16MB limit, resulting in the termination of the Change Stream operation.
3.It is recommended to use immutable shard keys. In MongoDB, shard keys allow modifications after transactions are enabled, but changing the shard key can cause frequent shard migrations, resulting in additional performance overhead. Additionally, modifying the shard key can also cause the Update Lookup feature to become ineffective, leading to inconsistent results in CDC (Change Data Capture) scenarios.

How to Create a MongoDB CDC Data Synchronization Jobs

CDC Data Print to Client

The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and prints it on the local client:

  1. env {
  2. # You can set engine configuration here
  3. parallelism = 1
  4. job.mode = "STREAMING"
  5. checkpoint.interval = 5000
  6. }
  7. source {
  8. MongoDB-CDC {
  9. hosts = "mongo0:27017"
  10. database = ["inventory"]
  11. collection = ["inventory.products"]
  12. username = stuser
  13. password = stpw
  14. schema = {
  15. fields {
  16. "_id" : string,
  17. "name" : string,
  18. "description" : string,
  19. "weight" : string
  20. }
  21. }
  22. }
  23. }
  24. # Console printing of the read Mongodb data
  25. sink {
  26. Console {
  27. parallelism = 1
  28. }
  29. }

CDC Data Write to MysqlDB

The following example demonstrates how to create a data synchronization job that reads cdc data from MongoDB and write to mysql database:

  1. env {
  2. # You can set engine configuration here
  3. parallelism = 1
  4. job.mode = "STREAMING"
  5. checkpoint.interval = 5000
  6. }
  7. source {
  8. MongoDB-CDC {
  9. hosts = "mongo0:27017"
  10. database = ["inventory"]
  11. collection = ["inventory.products"]
  12. username = stuser
  13. password = stpw
  14. }
  15. }
  16. sink {
  17. jdbc {
  18. url = "jdbc:mysql://mysql_cdc_e2e:3306"
  19. driver = "com.mysql.cj.jdbc.Driver"
  20. user = "st_user"
  21. password = "seatunnel"
  22. generate_sink_sql = true
  23. # You need to configure both database and table
  24. database = mongodb_cdc
  25. table = products
  26. primary_keys = ["_id"]
  27. }
  28. }

Multi-table Synchronization

The following example demonstrates how to create a data synchronization job that read the cdc data of multiple library tables mongodb and prints it on the local client:

  1. env {
  2. # You can set engine configuration here
  3. parallelism = 1
  4. job.mode = "STREAMING"
  5. checkpoint.interval = 5000
  6. }
  7. source {
  8. MongoDB-CDC {
  9. hosts = "mongo0:27017"
  10. database = ["inventory","crm"]
  11. collection = ["inventory.products","crm.test"]
  12. username = stuser
  13. password = stpw
  14. }
  15. }
  16. # Console printing of the read Mongodb data
  17. sink {
  18. Console {
  19. parallelism = 1
  20. }
  21. }

Tips:

1.The cdc synchronization of multiple library tables cannot specify the schema, and can only output json data downstream. This is because MongoDB does not provide metadata information for querying, so if you want to support multiple tables, all tables can only be read as one structure.

Regular Expression Matching for Multiple Tables

The following example demonstrates how to create a data synchronization job that through regular expression read the data of multiple library tables mongodb and prints it on the local client:

Matching example Expressions Describe
Prefix matching ^(test).* Match the database name or table name with the prefix test, such as test1, test2, etc.
Suffix matching .*[p$] Match the database name or table name with the suffix p, such as cdcp, edcp, etc.
  1. env {
  2. # You can set engine configuration here
  3. parallelism = 1
  4. job.mode = "STREAMING"
  5. checkpoint.interval = 5000
  6. }
  7. source {
  8. MongoDB-CDC {
  9. hosts = "mongo0:27017"
  10. # So this example is used (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5.
  11. database = ["(^(test).*|^(tpc).*|txc|.*[p$]|t{2})"]
  12. collection = ["(t[5-8]|tt)"]
  13. username = stuser
  14. password = stpw
  15. }
  16. }
  17. # Console printing of the read Mongodb data
  18. sink {
  19. Console {
  20. parallelism = 1
  21. }
  22. }

Format of real-time streaming data

  1. {
  2. _id : { <BSON Object> }, // Identifier of the open change stream, can be assigned to the 'resumeAfter' parameter for subsequent resumption of this change stream
  3. "operationType" : "<operation>", // The type of change operation that occurred, such as: insert, delete, update, etc.
  4. "fullDocument" : { <document> }, // The full document data involved in the change operation. This field does not exist in delete operations
  5. "ns" : {
  6. "db" : "<database>", // The database where the change operation occurred
  7. "coll" : "<collection>" // The collection where the change operation occurred
  8. },
  9. "to" : { // These fields are displayed only when the operation type is 'rename'
  10. "db" : "<database>", // The new database name after the change
  11. "coll" : "<collection>" // The new collection name after the change
  12. },
  13. "source":{
  14. "ts_ms":"<timestamp>", // The timestamp when the change operation occurred
  15. "table":"<collection>" // The collection where the change operation occurred
  16. "db":"<database>", // The database where the change operation occurred
  17. "snapshot":"false" // Identify the current stage of data synchronization
  18. },
  19. "documentKey" : { "_id" : <value> }, // The _id field value of the document involved in the change operation
  20. "updateDescription" : { // Description of the update operation
  21. "updatedFields" : { <document> }, // The fields and values that the update operation modified
  22. "removedFields" : [ "<field>", ... ] // The fields and values that the update operation removed
  23. }
  24. "clusterTime" : <Timestamp>, // The timestamp of the Oplog log entry corresponding to the change operation
  25. "txnNumber" : <NumberLong>, // If the change operation is executed in a multi-document transaction, this field and value are displayed, representing the transaction number
  26. "lsid" : { // Represents information related to the Session in which the transaction is located
  27. "id" : <UUID>,
  28. "uid" : <BinData>
  29. }
  30. }