Apache Paimon (Incubating) is a streaming data lake platform that supports high-speed data ingestion, change data tracking, and efficient real-time analytics.

This article assumes that you have mastered the basic knowledge and operation of Apache Paimon (Incubating). For the knowledge not mentioned in this article, you can obtain it from its Official Documentation.

By using kyuubi, we can run SQL queries towards Apache Paimon (Incubating) which is more convenient, easy to understand, and easy to expand than directly using flink.

Apache Paimon (Incubating) Integration

To enable the integration of kyuubi flink sql engine and Apache Paimon (Incubating), you need to:

Dependencies

The classpath of kyuubi flink sql engine with Apache Paimon (Incubating) supported consists of

  1. kyuubi-flink-sql-engine-1.9.1_2.12.jar, the engine jar deployed with Kyuubi distributions
  2. a copy of flink distribution
  3. paimon-flink-.jar (example: paimon-flink-1.16-0.4-SNAPSHOT.jar), which can be found in the Apache Paimon (Incubating) Supported Engines Flink
  4. flink-shaded-hadoop-2-uber-.jar, which code can be found in the Pre-bundled Hadoop Jar

In order to make the Apache Paimon (Incubating) packages visible for the runtime classpath of engines, you need to:

  1. Put the Apache Paimon (Incubating) packages into $FLINK_HOME/lib directly
  2. Setting the HADOOP_CLASSPATH environment variable or copy the Pre-bundled Hadoop Jar to flink/lib.

Please mind the compatibility of different Apache Paimon (Incubating) and Flink versions, which can be confirmed on the page of Apache Paimon (Incubating) multi engine support.

Apache Paimon (Incubating) Operations

Taking CREATE CATALOG as a example,

  1. CREATE CATALOG my_catalog WITH (
  2. 'type'='paimon',
  3. 'warehouse'='file:/tmp/paimon'
  4. );
  5. USE CATALOG my_catalog;

Taking CREATE TABLE as a example,

  1. CREATE TABLE MyTable (
  2. user_id BIGINT,
  3. item_id BIGINT,
  4. behavior STRING,
  5. dt STRING,
  6. PRIMARY KEY (dt, user_id) NOT ENFORCED
  7. ) PARTITIONED BY (dt) WITH (
  8. 'bucket' = '4'
  9. );

Taking Query Table as a example,

  1. SET 'execution.runtime-mode' = 'batch';
  2. SELECT * FROM orders WHERE catalog_id=1025;

Taking Streaming Query as a example,

  1. SET 'execution.runtime-mode' = 'streaming';
  2. SELECT * FROM MyTable /*+ OPTIONS ('log.scan'='latest') */;

Taking Rescale Bucket as a example,

  1. ALTER TABLE my_table SET ('bucket' = '4');
  2. INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01');