Oracle CDC source connector

Support Those Engines

SeaTunnel Zeta
Flink

Key features

Description

The Oracle CDC connector allows for reading snapshot data and incremental data from Oracle database. This document describes how to set up the Oracle CDC connector to run SQL queries against Oracle databases.

Supported DataSource Info

Datasource Supported versions Driver Url Maven
Oracle Different dependency version has different driver class. oracle.jdbc.OracleDriver jdbc:oracle:thin:@datasource01:1523:xe https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8

Database Dependency

Install Jdbc Driver

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/plugins/.
  2. To support the i18n character set, copy the orai18n.jar to the $SEATNUNNEL_HOME/plugins/ directory.

For SeaTunnel Zeta Engine

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/lib/.
  2. To support the i18n character set, copy the orai18n.jar to the $SEATNUNNEL_HOME/lib/ directory.

Enable Oracle Logminer

To enable Oracle CDC (Change Data Capture) using Logminer in Seatunnel, which is a built-in tool provided by Oracle, follow the steps below:

Enabling Logminer without CDB (Container Database) mode.

  1. The operating system creates an empty file directory to store Oracle archived logs and user tablespaces.
  1. mkdir -p /opt/oracle/oradata/recovery_area
  2. mkdir -p /opt/oracle/oradata/ORCLCDB
  3. chown -R oracle /opt/oracle/***
  1. Login as admin and enable Oracle archived logs.
  1. sqlplus /nolog;
  2. connect sys as sysdba;
  3. alter system set db_recovery_file_dest_size = 10G;
  4. alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
  5. shutdown immediate;
  6. startup mount;
  7. alter database archivelog;
  8. alter database open;
  9. ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
  10. archive log list;
  1. Login as admin and create an account called logminer_user with the password “oracle”, and grant it privileges to read tables and logs.
  1. CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  2. CREATE USER logminer_user IDENTIFIED BY oracle DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;
  3. GRANT CREATE SESSION TO logminer_user;
  4. GRANT SELECT ON V_$DATABASE to logminer_user;
  5. GRANT SELECT ON V_$LOG TO logminer_user;
  6. GRANT SELECT ON V_$LOGFILE TO logminer_user;
  7. GRANT SELECT ON V_$LOGMNR_LOGS TO logminer_user;
  8. GRANT SELECT ON V_$LOGMNR_CONTENTS TO logminer_user;
  9. GRANT SELECT ON V_$ARCHIVED_LOG TO logminer_user;
  10. GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO logminer_user;
  11. GRANT EXECUTE ON DBMS_LOGMNR TO logminer_user;
  12. GRANT EXECUTE ON DBMS_LOGMNR_D TO logminer_user;
Oracle 11g is not supported
  1. GRANT LOGMINING TO logminer_user;
Grant privileges only to the tables that need to be collected
  1. GRANT SELECT ANY TABLE TO logminer_user;
  2. GRANT ANALYZE ANY TO logminer_user;

To enable Logminer in Oracle with CDB (Container Database) + PDB (Pluggable Database) mode, follow the steps below:

  1. The operating system creates an empty file directory to store Oracle archived logs and user tablespaces.
  1. mkdir -p /opt/oracle/oradata/recovery_area
  2. mkdir -p /opt/oracle/oradata/ORCLCDB
  3. mkdir -p /opt/oracle/oradata/ORCLCDB/ORCLPDB1
  4. chown -R oracle /opt/oracle/***
  1. Login as admin and enable logging
  1. sqlplus /nolog
  2. connect sys as sysdba; # Password: oracle
  3. alter system set db_recovery_file_dest_size = 10G;
  4. alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
  5. shutdown immediate
  6. startup mount
  7. alter database archivelog;
  8. alter database open;
  9. archive log list;
  1. Executing in CDB
  1. ALTER TABLE TEST.* ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
  2. ALTER TABLE TEST.T2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
  1. Creating debeziume account

Operating in CDB

  1. sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  2. CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
  3. SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  4. exit;

Operating in PDB

  1. sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
  2. CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
  3. SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  4. exit;
  1. Operating in CDB
  1. sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
  2. CREATE USER c##dbzuser IDENTIFIED BY dbz
  3. DEFAULT TABLESPACE logminer_tbs
  4. QUOTA UNLIMITED ON logminer_tbs
  5. CONTAINER=ALL;
  6. GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
  7. GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
  8. GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
  9. GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
  10. GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
  11. GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  12. GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
  13. GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
  14. GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
  15. GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
  16. GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
  17. GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
  18. GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
  19. GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
  20. GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
  21. GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
  22. GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
  23. GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
  24. GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
  25. GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
  26. GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
  27. GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
  28. GRANT analyze any TO debeziume_1 CONTAINER=ALL;
  29. exit;

Data Type Mapping

Oracle Data type SeaTunnel Data type
INTEGER INT
FLOAT DECIMAL(38, 18)
NUMBER(precision <= 9, scale == 0) INT
NUMBER(9 < precision <= 18, scale == 0) BIGINT
NUMBER(18 < precision, scale == 0) DECIMAL(38, 0)
NUMBER(precision == 0, scale == 0) DECIMAL(38, 18)
NUMBER(scale != 0) DECIMAL(38, 18)
BINARY_DOUBLE DOUBLE
BINARY_FLOAT
REAL
FLOAT
CHAR
NCHAR
NVARCHAR2
VARCHAR2
LONG
ROWID
NCLOB
CLOB
STRING
DATE DATE
TIMESTAMP
TIMESTAMP WITH LOCAL TIME ZONE
TIMESTAMP
BLOB
RAW
LONG RAW
BFILE
BYTES

Source Options

Name Type Required Default Description
base-url String Yes - The URL of the JDBC connection. Refer to a case: idbc:oracle:thin:datasource01:1523:xe.
username String Yes - Name of the database to use when connecting to the database server.
password String Yes - Password to use when connecting to the database server.
database-names List No - Database name of the database to monitor.
schema-names List No - Schema name of the database to monitor.
table-names List Yes - Table name of the database to monitor. The table name needs to include the database name, for example: database_name.table_name
table-names-config List No - Table config list. for example: [{“table”: “db1.schema1.table1”,”primaryKeys”:[“key1”]}]
startup.mode Enum No INITIAL Optional startup mode for Oracle CDC consumer, valid enumerations are initial, earliest, latest and specific.
initial: Synchronize historical data at startup, and then synchronize incremental data.
earliest: Startup from the earliest offset possible.
latest: Startup from the latest offset.
specific: Startup from user-supplied specific offsets.
startup.specific-offset.file String No - Start from the specified binlog file name. Note, This option is required when the startup.mode option used specific.
startup.specific-offset.pos Long No - Start from the specified binlog file position. Note, This option is required when the startup.mode option used specific.
stop.mode Enum No NEVER Optional stop mode for Oracle CDC consumer, valid enumerations are never, latest or specific.
never: Real-time job don’t stop the source.
latest: Stop from the latest offset.
specific: Stop from user-supplied specific offset.
stop.specific-offset.file String No - Stop from the specified binlog file name. Note, This option is required when the stop.mode option used specific.
stop.specific-offset.pos Long No - Stop from the specified binlog file position. Note, This option is required when the stop.mode option used specific.
snapshot.split.size Integer No 8096 The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.
snapshot.fetch.size Integer No 1024 The maximum fetch size for per poll when read table snapshot.
server-time-zone String No UTC The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone.
connect.timeout.ms Duration No 30000 The maximum time that the connector should wait after trying to connect to the database server before timing out.
connect.max-retries Integer No 3 The max retry times that the connector should retry to build database server connection.
connection.pool.size Integer No 20 The jdbc connection pool size.
chunk-key.even-distribution.factor.upper-bound Double No 100 The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 100.0.
chunk-key.even-distribution.factor.lower-bound Double No 0.05 The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 0.05.
sample-sharding.threshold Integer No 1000 This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-bound and chunk-key.even-distribution.factor.lower-bound, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.
inverse-sampling.rate Integer No 1000 The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It’s especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.
exactly_once Boolean No false Enable exactly once semantic.
format Enum No DEFAULT Optional output format for Oracle CDC, valid enumerations are DEFAULTCOMPATIBLE_DEBEZIUM_JSON.
debezium Config No - Pass-through Debezium’s properties to Debezium Embedded Engine which is used to capture data changes from Oracle server.
common-options no - Source plugin common parameters, please refer to Source Common Options for details

Task Example

Simple

Support multi-table reading

  1. source {
  2. # This is a example source plugin **only for test and demonstrate the feature source plugin**
  3. Oracle-CDC {
  4. result_table_name = "customers"
  5. username = "system"
  6. password = "oracle"
  7. database-names = ["XE"]
  8. schema-names = ["DEBEZIUM"]
  9. table-names = ["XE.DEBEZIUM.FULL_TYPES"]
  10. base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
  11. source.reader.close.timeout = 120000
  12. }
  13. }

Support custom primary key for table

  1. source {
  2. Oracle-CDC {
  3. result_table_name = "customers"
  4. base-url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
  5. source.reader.close.timeout = 120000
  6. username = "system"
  7. password = "oracle"
  8. database-names = ["XE"]
  9. schema-names = ["DEBEZIUM"]
  10. table-names = ["XE.DEBEZIUM.FULL_TYPES"]
  11. table-names-config = [
  12. {
  13. table = "XE.DEBEZIUM.FULL_TYPES"
  14. primaryKeys = ["ID"]
  15. }
  16. ]
  17. }
  18. }

Support debezium-compatible format send to kafka

Must be used with kafka connector sink, see compatible debezium format for details

Changelog

  • Add Oracle CDC Source Connector

next version