CREATE Catalog
Hive catalog
This creates an Iceberg catalog named hive_catalog that can be configured using 'catalog-type'='hive', which loads tables from Hive metastore:
CREATE CATALOG hive_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://localhost:9083','clients'='5','property-version'='1','warehouse'='hdfs://nn:8020/warehouse/path');
The following properties can be set if using the Hive catalog:
uri: The Hive metastore’s thrift URI. (Required)clients: The Hive metastore client pool size, default value is 2. (Optional)warehouse: The Hive warehouse location, users should specify this path if neither set thehive-conf-dirto specify a location containing ahive-site.xmlconfiguration file nor add a correcthive-site.xmlto classpath.hive-conf-dir: Path to a directory containing ahive-site.xmlconfiguration file which will be used to provide custom Hive configuration values. The value ofhive.metastore.warehouse.dirfrom<hive-conf-dir>/hive-site.xml(or hive configure file from classpath) will be overwritten with thewarehousevalue if setting bothhive-conf-dirandwarehousewhen creating iceberg catalog.hadoop-conf-dir: Path to a directory containingcore-site.xmlandhdfs-site.xmlconfiguration files which will be used to provide custom Hadoop configuration values.
Hadoop catalog
Iceberg also supports a directory-based catalog in HDFS that can be configured using 'catalog-type'='hadoop':
CREATE CATALOG hadoop_catalog WITH ('type'='iceberg','catalog-type'='hadoop','warehouse'='hdfs://nn:8020/warehouse/path','property-version'='1');
The following properties can be set if using the Hadoop catalog:
warehouse: The HDFS directory to store metadata files and data files. (Required)
Execute the sql command USE CATALOG hadoop_catalog to set the current catalog.
REST catalog
This creates an iceberg catalog named rest_catalog that can be configured using 'catalog-type'='rest', which loads tables from a REST catalog:
CREATE CATALOG rest_catalog WITH ('type'='iceberg','catalog-type'='rest','uri'='https://localhost/'The following properties can be set if using the REST catalog:* `uri`: The URL to the REST Catalog (Required)* `credential`: A credential to exchange for a token in the OAuth2 client credentials flow (Optional)* `token`: A token which will be used to interact with the server (Optional)#### Custom catalogFlink also supports loading a custom Iceberg `Catalog` implementation by specifying the `catalog-impl` property:```sqlCREATE CATALOG my_catalog WITH ('type'='iceberg','catalog-impl'='com.my.custom.CatalogImpl','my-additional-catalog-config'='my-value');
Create through YAML config
Catalogs can be registered in sql-client-defaults.yaml before starting the SQL client.
catalogs:- name: my_catalogtype: icebergcatalog-type: hadoopwarehouse: hdfs://nn:8020/warehouse/path
Create through SQL Files
The Flink SQL Client supports the -i startup option to execute an initialization SQL file to set up environment when starting up the SQL Client.
-- define available catalogsCREATE CATALOG hive_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://localhost:9083','warehouse'='hdfs://nn:8020/warehouse/path');USE CATALOG hive_catalog;
Using -i <init.sql> option to initialize SQL Client session:
/path/to/bin/sql-client.sh -i /path/to/init.sql
CREATE DATABASE
By default, Iceberg will use the default database in Flink. Using the following example to create a separate database in order to avoid creating tables under the default database:
CREATE DATABASE iceberg_db;USE iceberg_db;
CREATE TABLE
CREATE TABLE `hive_catalog`.`default`.`sample` (id BIGINT COMMENT 'unique id',data STRING NOT NULL) WITH ('format-version'='2');
Table create commands support the commonly used Flink create clauses including:
PARTITION BY (column1, column2, ...)to configure partitioning, Flink does not yet support hidden partitioning.COMMENT 'table document'to set a table description.WITH ('key'='value', ...)to set table configuration which will be stored in Iceberg table properties.
Currently, it does not support computed column and watermark definition etc.
PRIMARY KEY
Primary key constraint can be declared for a column or a set of columns, which must be unique and do not contain null.
It’s required for UPSERT mode.
CREATE TABLE `hive_catalog`.`default`.`sample` (id BIGINT COMMENT 'unique id',data STRING NOT NULL,PRIMARY KEY(`id`) NOT ENFORCED) WITH ('format-version'='2');
PARTITIONED BY
To create a partition table, use PARTITIONED BY:
CREATE TABLE `hive_catalog`.`default`.`sample` (id BIGINT COMMENT 'unique id',data STRING NOT NULL)PARTITIONED BY (data)WITH ('format-version'='2');
Iceberg supports hidden partitioning but Flink doesn’t support partitioning by a function on columns. There is no way to support hidden partitions in the Flink DDL.
CREATE TABLE LIKE
To create a table with the same schema, partitioning, and table properties as another table, use CREATE TABLE LIKE.
CREATE TABLE `hive_catalog`.`default`.`sample` (id BIGINT COMMENT 'unique id',data STRING);CREATE TABLE `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`;
For more details, refer to the Flink CREATE TABLE documentation.
ALTER TABLE
Iceberg only support altering table properties:
ALTER TABLE `hive_catalog`.`default`.`sample` SET ('write.format.default'='avro');
ALTER TABLE .. RENAME TO
ALTER TABLE `hive_catalog`.`default`.`sample` RENAME TO `hive_catalog`.`default`.`new_sample`;
DROP TABLE
To delete a table, run:
DROP TABLE `hive_catalog`.`default`.`sample`;
