Postgre CDC source connector

Support Those Engines

SeaTunnel Zeta
Flink

Key features

Description

The Postgre CDC connector allows for reading snapshot data and incremental data from Postgre database. This document describes how to set up the Postgre CDC connector to run SQL queries against Postgre databases.

Supported DataSource Info

Datasource Supported versions Driver Url Maven
PostgreSQL Different dependency version has different driver class. org.postgresql.Driver jdbc:postgresql://localhost:5432/test Download
PostgreSQL If you want to manipulate the GEOMETRY type in PostgreSQL. org.postgresql.Driver jdbc:postgresql://localhost:5432/test Download

Using Dependency

Install Jdbc Driver

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/plugins/.

For SeaTunnel Zeta Engine

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/lib/.

Please download and put Postgre driver in ${SEATUNNEL_HOME}/lib/ dir. For example: cp postgresql-xxx.jar $SEATNUNNEL_HOME/lib/

Here are the steps to enable CDC (Change Data Capture) in PostgreSQL:

  1. Ensure the wal_level is set to logical: Modify the postgresql.conf configuration file by adding “wal_level = logical”, restart the PostgreSQL server for the changes to take effect. Alternatively, you can use SQL commands to modify the configuration directly:
  1. ALTER SYSTEM SET wal_level TO 'logical';
  2. SELECT pg_reload_conf();
  1. Change the REPLICA policy of the specified table to FULL
  1. ALTER TABLE your_table_name REPLICA IDENTITY FULL;

Data Type Mapping

PostgreSQL Data type SeaTunnel Data type
BOOL
BOOLEAN
_BOOL
ARRAY<BOOLEAN>
BYTEA
BYTES
_BYTEA
ARRAY<TINYINT>
INT2
SMALLSERIAL
INT4
SERIAL
INT
_INT2
_INT4
ARRAY<INT>
INT8
BIGSERIAL
BIGINT
_INT8
ARRAY<BIGINT>
FLOAT4
FLOAT
_FLOAT4
ARRAY<FLOAT>
FLOAT8
DOUBLE
_FLOAT8
ARRAY<DOUBLE>
NUMERIC(Get the designated column’s specified column size>0) DECIMAL(Get the designated column’s specified column size,Gets the number of digits in the specified column to the right of the decimal point)
NUMERIC(Get the designated column’s specified column size<0) DECIMAL(38, 18)
BPCHAR
CHARACTER
VARCHAR
TEXT
GEOMETRY
GEOGRAPHY
JSON
JSONB
STRING
_BPCHAR
_CHARACTER
_VARCHAR
_TEXT
ARRAY<STRING>
TIMESTAMP
TIMESTAMP
TIME
TIME
DATE
DATE
OTHER DATA TYPES NOT SUPPORTED YET

Source Options

Name Type Required Default Description
base-url String Yes - The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost:5432/postgres_cdc?loggerLevel=OFF.
username String Yes - Name of the database to use when connecting to the database server.
password String Yes - Password to use when connecting to the database server.
database-names List No - Database name of the database to monitor.
table-names List Yes - Table name of the database to monitor. The table name needs to include the database name, for example: database_name.table_name
table-names-config List No - Table config list. for example: [{“table”: “db1.schema1.table1”,”primaryKeys”:[“key1”]}]
startup.mode Enum No INITIAL Optional startup mode for Postgre CDC consumer, valid enumerations are initial, earliest, latest and specific.
initial: Synchronize historical data at startup, and then synchronize incremental data.
earliest: Startup from the earliest offset possible.
latest: Startup from the latest offset.
specific: Startup from user-supplied specific offsets.
snapshot.split.size Integer No 8096 The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table.
snapshot.fetch.size Integer No 1024 The maximum fetch size for per poll when read table snapshot.
slot.name String No - The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring. Default is seatunnel.
decoding.plugin.name String No pgoutput The name of the Postgres logical decoding plug-in installed on the server,Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,wal2json_rds_streaming and pgoutput.
server-time-zone String No UTC The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone.
connect.timeout.ms Duration No 30000 The maximum time that the connector should wait after trying to connect to the database server before timing out.
connect.max-retries Integer No 3 The max retry times that the connector should retry to build database server connection.
connection.pool.size Integer No 20 The jdbc connection pool size.
chunk-key.even-distribution.factor.upper-bound Double No 100 The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 100.0.
chunk-key.even-distribution.factor.lower-bound Double No 0.05 The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 0.05.
sample-sharding.threshold Integer No 1000 This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-bound and chunk-key.even-distribution.factor.lower-bound, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards.
inverse-sampling.rate Integer No 1000 The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It’s especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000.
exactly_once Boolean No false Enable exactly once semantic.
format Enum No DEFAULT Optional output format for Postgre CDC, valid enumerations are DEFAULTCOMPATIBLE_DEBEZIUM_JSON.
debezium Config No - Pass-through Debezium’s properties to Debezium Embedded Engine which is used to capture data changes from Postgre server.
common-options no - Source plugin common parameters, please refer to Source Common Options for details

Task Example

Simple

Support multi-table reading

  1. env {
  2. # You can set engine configuration here
  3. execution.parallelism = 1
  4. job.mode = "STREAMING"
  5. checkpoint.interval = 5000
  6. read_limit.bytes_per_second=7000000
  7. read_limit.rows_per_second=400
  8. }
  9. source {
  10. Postgres-CDC {
  11. result_table_name = "customers_Postgre_cdc"
  12. username = "postgres"
  13. password = "postgres"
  14. database-names = ["postgres_cdc"]
  15. schema-names = ["inventory"]
  16. table-names = ["postgres_cdc.inventory.postgres_cdc_table_1,postgres_cdc.inventory.postgres_cdc_table_2"]
  17. base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
  18. }
  19. }
  20. transform {
  21. }
  22. sink {
  23. jdbc {
  24. source_table_name = "customers_Postgre_cdc"
  25. url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
  26. driver = "org.postgresql.Driver"
  27. user = "postgres"
  28. password = "postgres"
  29. generate_sink_sql = true
  30. # You need to configure both database and table
  31. database = postgres_cdc
  32. chema = "inventory"
  33. tablePrefix = "sink_"
  34. primary_keys = ["id"]
  35. }
  36. }

Support custom primary key for table

  1. source {
  2. Postgres-CDC {
  3. result_table_name = "customers_mysql_cdc"
  4. username = "postgres"
  5. password = "postgres"
  6. database-names = ["postgres_cdc"]
  7. schema-names = ["inventory"]
  8. table-names = ["postgres_cdc.inventory.full_types_no_primary_key"]
  9. base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
  10. decoding.plugin.name = "decoderbufs"
  11. exactly_once = false
  12. table-names-config = [
  13. {
  14. table = "postgres_cdc.inventory.full_types_no_primary_key"
  15. primaryKeys = ["id"]
  16. }
  17. ]
  18. }
  19. }

Changelog

  • Add Postgre CDC Source Connector

next version