Sql Server CDC source connector

Support SQL Server Version

  • server:2019 (Or later version for information only)

Support Those Engines

SeaTunnel Zeta
Flink

Key Features

Description

The Sql Server CDC connector allows for reading snapshot data and incremental data from SqlServer database. This document describes how to setup the Sql Server CDC connector to run SQL queries against SqlServer databases.

Supported DataSource Info

Datasource Supported versions Driver Url Maven
SqlServer
  • server:2019 (Or later version for information only)
  • com.microsoft.sqlserver.jdbc.SQLServerDriver jdbc:sqlserver://localhost:1433;databaseName=column_type_test https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc

    Using Dependency

    Install Jdbc Driver

    1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/plugins/.

    For SeaTunnel Zeta Engine

    1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/lib/.

    Data Type Mapping

    SQLserver Data Type SeaTunnel Data Type
    CHAR
    VARCHAR
    NCHAR
    NVARCHAR
    TEXT
    NTEXT
    XML
    STRING
    BINARY
    VARBINARY
    IMAGE
    BYTES
    INTEGER
    INT
    INT
    SMALLINT
    TINYINT
    SMALLINT
    BIGINT BIGINT
    FLOAT(1~24)
    REAL
    FLOAT
    DOUBLE
    FLOAT(>24)
    DOUBLE
    NUMERIC(p,s)
    DECIMAL(p,s)
    MONEY
    SMALLMONEY
    DECIMAL(p, s)
    TIMESTAMP BYTES
    DATE DATE
    TIME(s) TIME(s)
    DATETIME(s)
    DATETIME2(s)
    DATETIMEOFFSET(s)
    SMALLDATETIME
    TIMESTAMP(s)
    BOOLEAN
    BIT
    BOOLEAN

    Source Options

    Name Type Required Default Description
    username String Yes - Name of the database to use when connecting to the database server.
    password String Yes - Password to use when connecting to the database server.
    database-names List Yes - Database name of the database to monitor.
    table-names List Yes - Table name is a combination of schema name and table name (databaseName.schemaName.tableName).
    table-names-config List No - Table config list. for example: [{“table”: “db1.schema1.table1”,”primaryKeys”:[“key1”]}]
    base-url String Yes - URL has to be with database, like “jdbc:sqlserver://localhost:1433;databaseName=test”.
    startup.mode Enum No INITIAL Optional startup mode for SqlServer CDC consumer, valid enumerations are “initial”, “earliest”, “latest” and “specific”.
    startup.timestamp Long No - Start from the specified epoch timestamp (in milliseconds).
    Note, This option is required when the “startup.mode” option used 'timestamp'.
    startup.specific-offset.file String No - Start from the specified binlog file name.
    Note, This option is required when the “startup.mode” option used 'specific'.
    startup.specific-offset.pos Long No - Start from the specified binlog file position.
    Note, This option is required when the “startup.mode” option used 'specific'.
    stop.mode Enum No NEVER Optional stop mode for SqlServer CDC consumer, valid enumerations are “never”.
    stop.timestamp Long No - Stop from the specified epoch timestamp (in milliseconds).
    Note, This option is required when the “stop.mode” option used 'timestamp'.
    stop.specific-offset.file String No - Stop from the specified binlog file name.
    Note, This option is required when the “stop.mode” option used 'specific'.
    stop.specific-offset.pos Long No - Stop from the specified binlog file position.
    Note, This option is required when the “stop.mode” option used 'specific'.
    incremental.parallelism Integer No 1 The number of parallel readers in the incremental phase.
    snapshot.split.size Integer No 8096 The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshotof table.
    snapshot.fetch.size Integer No 1024 The maximum fetch size for per poll when read table snapshot.
    server-time-zone String No UTC The session time zone in database server.
    connect.timeout Duration No 30s The maximum time that the connector should wait after trying to connect to the database server before timing out.
    connect.max-retries Integer No 3 The max retry times that the connector should retry to build database server connection.
    connection.pool.size Integer No 20 The connection pool size.
    chunk-key.even-distribution.factor.upper-bound Double No 100 The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 100.0.
    chunk-key.even-distribution.factor.lower-bound Double No 0.05 The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 0.05.
    sample-sharding.threshold int No 1000 This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-bound and chunk-key.even-distribution.factor.lower-bound, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.
    inverse-sampling.rate int No 1000 The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It’s especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.
    exactly_once Boolean No false Enable exactly once semantic.
    debezium.* config No - Pass-through Debezium’s properties to Debezium Embedded Engine which is used to capture data changes from SqlServer server.
    See more about
    the Debezium’s SqlServer Connector properties
    format Enum No DEFAULT Optional output format for SqlServer CDC, valid enumerations are “DEFAULT”、”COMPATIBLE_DEBEZIUM_JSON”.
    common-options no - Source plugin common parameters, please refer to Source Common Options for details.

    Enable Sql Server CDC

    1. Check whether the CDC Agent is enabled

    EXEC xp_servicecontrol N’querystate’, N’SQLServerAGENT’;
    If the result is running, prove that it is enabled. Otherwise, you need to manually enable it

    1. Enable the CDC Agent

    /opt/mssql/bin/mssql-conf setup

    1. The result is as follows

    1) Evaluation (free, no production use rights, 180-day limit) 2) Developer (free, no production use rights) 3) Express (free) 4) Web (PAID) 5) Standard (PAID) 6) Enterprise (PAID) 7) Enterprise Core (PAID) 8) I bought a license through a retail sales channel and have a product key to enter.

    1. Set the CDC at the library level Set the library level below to enable CDC. At this level, all tables under the libraries of the enabled CDC automatically enable CDC

    USE TestDB; — Replace with the actual database name
    EXEC sys.sp_cdc_enable_db;
    SELECT name, is_tracked_by_cdc FROM sys.tables WHERE name = ‘table’; — table Replace with the name of the table you want to check

    Task Example

    initiali read Simple

    This is a stream mode cdc initializes read table data will be read incrementally after successful read The following sql DDL is for reference only

    1. env {
    2. # You can set engine configuration here
    3. parallelism = 1
    4. job.mode = "STREAMING"
    5. checkpoint.interval = 5000
    6. }
    7. source {
    8. # This is a example source plugin **only for test and demonstrate the feature source plugin**
    9. SqlServer-CDC {
    10. result_table_name = "customers"
    11. username = "sa"
    12. password = "Y.sa123456"
    13. startup.mode="initial"
    14. database-names = ["column_type_test"]
    15. table-names = ["column_type_test.dbo.full_types"]
    16. base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    17. }
    18. }
    19. transform {
    20. }
    21. sink {
    22. console {
    23. source_table_name = "customers"
    24. }

    increment read Simple

    This is an incremental read that reads the changed data for printing

    1. env {
    2. # You can set engine configuration here
    3. parallelism = 1
    4. job.mode = "STREAMING"
    5. checkpoint.interval = 5000
    6. }
    7. source {
    8. # This is a example source plugin **only for test and demonstrate the feature source plugin**
    9. SqlServer-CDC {
    10. # Set up accurate one read
    11. exactly_once=true
    12. result_table_name = "customers"
    13. username = "sa"
    14. password = "Y.sa123456"
    15. startup.mode="latest"
    16. database-names = ["column_type_test"]
    17. table-names = ["column_type_test.dbo.full_types"]
    18. base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    19. }
    20. }
    21. transform {
    22. }
    23. sink {
    24. console {
    25. source_table_name = "customers"
    26. }

    Support custom primary key for table

    1. env {
    2. parallelism = 1
    3. job.mode = "STREAMING"
    4. checkpoint.interval = 5000
    5. }
    6. source {
    7. SqlServer-CDC {
    8. base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
    9. username = "sa"
    10. password = "Y.sa123456"
    11. database-names = ["column_type_test"]
    12. table-names = ["column_type_test.dbo.simple_types", "column_type_test.dbo.full_types"]
    13. table-names-config = [
    14. {
    15. table = "column_type_test.dbo.full_types"
    16. primaryKeys = ["id"]
    17. }
    18. ]
    19. }
    20. }
    21. sink {
    22. console {
    23. }