Changelog-Data-Capture Format Format: Serialization Schema Format: Deserialization Schema
Canal is a CDC (Changelog Data Capture) tool that can stream changes in real-time from MySQL into other systems. Canal provides a unified format schema for changelog and supports to serialize messages using JSON and protobuf (protobuf is the default format for Canal).
SeaTunnel supports to interpret Canal JSON messages as INSERT/UPDATE/DELETE messages into seatunnel system. This is useful in many cases to leverage this feature, such as
synchronizing incremental data from databases to other systemsauditing logsreal-time materialized views on databasestemporal join changing history of a database table and so on.
SeaTunnel also supports to encode the INSERT/UPDATE/DELETE messages in SeaTunnel as Canal JSON messages, and emit to storage like Kafka. However, currently SeaTunnel can’t combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, SeaTunnel encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Canal messages.
Format Options
| Option | Default | Required | Description |
|---|---|---|---|
| format | (none) | yes | Specify what format to use, here should be ‘canal_json’. |
| canal_json.ignore-parse-errors | false | no | Skip fields and rows with parse errors instead of failing. Fields are set to null in case of errors. |
| canal_json.database.include | (none) | no | An optional regular expression to only read the specific databases changelog rows by regular matching the “database” meta field in the Canal record. The pattern string is compatible with Java’s Pattern. |
| canal_json.table.include | (none) | no | An optional regular expression to only read the specific tables changelog rows by regular matching the “table” meta field in the Canal record. The pattern string is compatible with Java’s Pattern. |
How to use
Kafka uses example
Canal provides a unified format for changelog, here is a simple example for an update operation captured from a MySQL products table:
{"data": [{"id": "111","name": "scooter","description": "Big 2-wheel scooter","weight": "5.18"}],"database": "inventory","es": 1589373560000,"id": 9,"isDdl": false,"mysqlType": {"id": "INTEGER","name": "VARCHAR(255)","description": "VARCHAR(512)","weight": "FLOAT"},"old": [{"weight": "5.15"}],"pkNames": ["id"],"sql": "","sqlType": {"id": 4,"name": 12,"description": 12,"weight": 7},"table": "products","ts": 1589373560798,"type": "UPDATE"}
Note: please refer to Canal documentation about the meaning of each fields.
The MySQL products table has 4 columns (id, name, description and weight). The above JSON message is an update change event on the products table where the weight value of the row with id = 111 is changed from 5.15 to 5.18. Assuming the messages have been synchronized to Kafka topic products_binlog, then we can use the following SeaTunnel to consume this topic and interpret the change events.
env {parallelism = 1job.mode = "BATCH"}source {Kafka {bootstrap.servers = "kafkaCluster:9092"topic = "products_binlog"result_table_name = "kafka_name"start_mode = earliestschema = {fields {id = "int"name = "string"description = "string"weight = "string"}},format = canal_json}}transform {}sink {Kafka {bootstrap.servers = "localhost:9092"topic = "consume-binlog"format = canal_json}}
