Paimon sink connector

Description

Sink connector for Apache Paimon. It can support cdc mode 、auto create table.

Key features

Options

name type required default value Description
warehouse String Yes - Paimon warehouse path
database String Yes - The database you want to access
table String Yes - The table you want to access
hdfs_site_path String No - The path of hdfs-site.xml
schema_save_mode Enum No CREATE_SCHEMA_WHEN_NOT_EXIST The schema save mode
data_save_mode Enum No APPEND_DATA The data save mode
paimon.table.primary-keys String No - Default comma-separated list of columns (primary key) that identify a row in tables.(Notice: The partition field needs to be included in the primary key fields)
paimon.table.partition-keys String No - Default comma-separated list of partition fields to use when creating tables.
paimon.table.write-props Map No - Properties passed through to paimon table initialization, reference.
paimon.hadoop.conf Map No - Properties in hadoop conf
paimon.hadoop.conf-path String No - The specified loading path for the ‘core-site.xml’, ‘hdfs-site.xml’, ‘hive-site.xml’ files

Examples

Single table

  1. env {
  2. parallelism = 1
  3. job.mode = "STREAMING"
  4. checkpoint.interval = 5000
  5. }
  6. source {
  7. Mysql-CDC {
  8. base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
  9. username = "root"
  10. password = "******"
  11. table-names = ["seatunnel.role"]
  12. }
  13. }
  14. transform {
  15. }
  16. sink {
  17. Paimon {
  18. catalog_name="seatunnel_test"
  19. warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
  20. database="seatunnel"
  21. table="role"
  22. }
  23. }

Single table(Specify hadoop HA config and kerberos config)

  1. env {
  2. parallelism = 1
  3. job.mode = "STREAMING"
  4. checkpoint.interval = 5000
  5. }
  6. source {
  7. Mysql-CDC {
  8. base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
  9. username = "root"
  10. password = "******"
  11. table-names = ["seatunnel.role"]
  12. }
  13. }
  14. transform {
  15. }
  16. sink {
  17. Paimon {
  18. catalog_name="seatunnel_test"
  19. warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/"
  20. database="seatunnel"
  21. table="role"
  22. paimon.hadoop.conf = {
  23. fs.defaultFS = "hdfs://nameservice1"
  24. dfs.nameservices = "nameservice1"
  25. dfs.ha.namenodes.nameservice1 = "nn1,nn2"
  26. dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
  27. dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
  28. dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
  29. dfs.client.use.datanode.hostname = "true"
  30. security.kerberos.login.principal = "your-kerberos-principal"
  31. security.kerberos.login.keytab = "your-kerberos-keytab-path"
  32. }
  33. }
  34. }

Single table with write props of paimon

  1. env {
  2. parallelism = 1
  3. job.mode = "STREAMING"
  4. checkpoint.interval = 5000
  5. }
  6. source {
  7. Mysql-CDC {
  8. base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
  9. username = "root"
  10. password = "******"
  11. table-names = ["seatunnel.role"]
  12. }
  13. }
  14. sink {
  15. Paimon {
  16. catalog_name="seatunnel_test"
  17. warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
  18. database="seatunnel"
  19. table="role"
  20. paimon.table.write-props = {
  21. bucket = 2
  22. file.format = "parquet"
  23. }
  24. paimon.table.partition-keys = "dt"
  25. paimon.table.primary-keys = "pk_id,dt"
  26. }
  27. }

Multiple table

  1. env {
  2. parallelism = 1
  3. job.mode = "STREAMING"
  4. checkpoint.interval = 5000
  5. }
  6. source {
  7. Mysql-CDC {
  8. base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
  9. username = "root"
  10. password = "******"
  11. table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
  12. }
  13. }
  14. transform {
  15. }
  16. sink {
  17. Paimon {
  18. catalog_name="seatunnel_test"
  19. warehouse="file:///tmp/seatunnel/paimon/hadoop-sink/"
  20. database="${database_name}"
  21. table="${table_name}"
  22. }
  23. }