Hive sink connector

Description

Write data to Hive.

In order to use this connector, You must ensure your spark/flink cluster already integrated hive. The tested hive version is 2.3.9.

If you use SeaTunnel Engine, You need put seatunnel-hadoop3-3.1.4-uber.jar and hive-exec-3.1.3.jar and libfb303-0.9.3.jar in $SEATUNNEL_HOME/lib/ dir.

Key features

By default, we use 2PC commit to ensure exactly-once

  • file format
    • text
    • csv
    • parquet
    • orc
    • json
  • compress codec
    • lzo

Options

name type required default value
table_name string yes -
metastore_uri string yes -
compress_codec string no none
hdfs_site_path string no -
hive_site_path string no -
hive.hadoop.conf Map no -
hive.hadoop.conf-path string no -
krb5_path string no /etc/krb5.conf
kerberos_principal string no -
kerberos_keytab_path string no -
abort_drop_partition_metadata boolean no true
common-options no -

table_name [string]

Target Hive table name eg: db1.table1, and if the source is multiple mode, you can use ${database_name}.${table_name} to generate the table name, it will replace the ${database_name} and ${table_name} with the value of the CatalogTable generate from the source.

metastore_uri [string]

Hive metastore uri

hdfs_site_path [string]

The path of hdfs-site.xml, used to load ha configuration of namenodes

hive_site_path [string]

The path of hive-site.xml

hive.hadoop.conf [map]

Properties in hadoop conf(‘core-site.xml’, ‘hdfs-site.xml’, ‘hive-site.xml’)

hive.hadoop.conf-path [string]

The specified loading path for the ‘core-site.xml’, ‘hdfs-site.xml’, ‘hive-site.xml’ files

krb5_path [string]

The path of krb5.conf, used to authentication kerberos

The path of hive-site.xml, used to authentication hive metastore

kerberos_principal [string]

The principal of kerberos

kerberos_keytab_path [string]

The keytab path of kerberos

abort_drop_partition_metadata [list]

Flag to decide whether to drop partition metadata from Hive Metastore during an abort operation. Note: this only affects the metadata in the metastore, the data in the partition will always be deleted(data generated during the synchronization process).

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Example

  1. Hive {
  2. table_name = "default.seatunnel_orc"
  3. metastore_uri = "thrift://namenode001:9083"
  4. }

example 1

We have a source table like this:

  1. create table test_hive_source(
  2. test_tinyint TINYINT,
  3. test_smallint SMALLINT,
  4. test_int INT,
  5. test_bigint BIGINT,
  6. test_boolean BOOLEAN,
  7. test_float FLOAT,
  8. test_double DOUBLE,
  9. test_string STRING,
  10. test_binary BINARY,
  11. test_timestamp TIMESTAMP,
  12. test_decimal DECIMAL(8,2),
  13. test_char CHAR(64),
  14. test_varchar VARCHAR(64),
  15. test_date DATE,
  16. test_array ARRAY<INT>,
  17. test_map MAP<STRING, FLOAT>,
  18. test_struct STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
  19. )
  20. PARTITIONED BY (test_par1 STRING, test_par2 STRING);

We need read data from the source table and write to another table:

  1. create table test_hive_sink_text_simple(
  2. test_tinyint TINYINT,
  3. test_smallint SMALLINT,
  4. test_int INT,
  5. test_bigint BIGINT,
  6. test_boolean BOOLEAN,
  7. test_float FLOAT,
  8. test_double DOUBLE,
  9. test_string STRING,
  10. test_binary BINARY,
  11. test_timestamp TIMESTAMP,
  12. test_decimal DECIMAL(8,2),
  13. test_char CHAR(64),
  14. test_varchar VARCHAR(64),
  15. test_date DATE
  16. )
  17. PARTITIONED BY (test_par1 STRING, test_par2 STRING);

The job config file can like this:

  1. env {
  2. parallelism = 3
  3. job.name="test_hive_source_to_hive"
  4. }
  5. source {
  6. Hive {
  7. table_name = "test_hive.test_hive_source"
  8. metastore_uri = "thrift://ctyun7:9083"
  9. }
  10. }
  11. sink {
  12. # choose stdout output plugin to output data to console
  13. Hive {
  14. table_name = "test_hive.test_hive_sink_text_simple"
  15. metastore_uri = "thrift://ctyun7:9083"
  16. hive.hadoop.conf = {
  17. bucket = "s3a://mybucket"
  18. }
  19. }

Hive on s3

Step 1

Create the lib dir for hive of emr.

  1. mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

Step 2

Get the jars from maven center to the lib.

  1. cd ${SEATUNNEL_HOME}/plugins/Hive/lib
  2. wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar
  3. wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

Step 3

Copy the jars from your environment on emr to the lib dir.

  1. cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
  2. cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
  3. cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
  4. cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib

Step 4

Run the case.

  1. env {
  2. parallelism = 1
  3. job.mode = "BATCH"
  4. }
  5. source {
  6. FakeSource {
  7. schema = {
  8. fields {
  9. pk_id = bigint
  10. name = string
  11. score = int
  12. }
  13. primaryKey {
  14. name = "pk_id"
  15. columnNames = [pk_id]
  16. }
  17. }
  18. rows = [
  19. {
  20. kind = INSERT
  21. fields = [1, "A", 100]
  22. },
  23. {
  24. kind = INSERT
  25. fields = [2, "B", 100]
  26. },
  27. {
  28. kind = INSERT
  29. fields = [3, "C", 100]
  30. }
  31. ]
  32. }
  33. }
  34. sink {
  35. Hive {
  36. table_name = "test_hive.test_hive_sink_on_s3"
  37. metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083"
  38. hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf"
  39. hive.hadoop.conf = {
  40. bucket="s3://ws-package"
  41. }
  42. }
  43. }

Hive on oss

Step 1

Create the lib dir for hive of emr.

  1. mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib

Step 2

Get the jars from maven center to the lib.

  1. cd ${SEATUNNEL_HOME}/plugins/Hive/lib
  2. wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar

Step 3

Copy the jars from your environment on emr to the lib dir and delete the conflicting jar.

  1. cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
  2. rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar

Step 4

Run the case.

  1. env {
  2. parallelism = 1
  3. job.mode = "BATCH"
  4. }
  5. source {
  6. FakeSource {
  7. schema = {
  8. fields {
  9. pk_id = bigint
  10. name = string
  11. score = int
  12. }
  13. primaryKey {
  14. name = "pk_id"
  15. columnNames = [pk_id]
  16. }
  17. }
  18. rows = [
  19. {
  20. kind = INSERT
  21. fields = [1, "A", 100]
  22. },
  23. {
  24. kind = INSERT
  25. fields = [2, "B", 100]
  26. },
  27. {
  28. kind = INSERT
  29. fields = [3, "C", 100]
  30. }
  31. ]
  32. }
  33. }
  34. sink {
  35. Hive {
  36. table_name = "test_hive.test_hive_sink_on_oss"
  37. metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083"
  38. hive.hadoop.conf-path = "/tmp/hadoop"
  39. hive.hadoop.conf = {
  40. bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com"
  41. }
  42. }
  43. }

example 2

We have multiple source table like this:

  1. create table test_1(
  2. )
  3. PARTITIONED BY (xx);
  4. create table test_2(
  5. )
  6. PARTITIONED BY (xx);
  7. ...

We need read data from these source tables and write to another tables:

The job config file can like this:

  1. env {
  2. # You can set flink configuration here
  3. parallelism = 3
  4. job.name="test_hive_source_to_hive"
  5. }
  6. source {
  7. Hive {
  8. tables_configs = [
  9. {
  10. table_name = "test_hive.test_1"
  11. metastore_uri = "thrift://ctyun6:9083"
  12. },
  13. {
  14. table_name = "test_hive.test_2"
  15. metastore_uri = "thrift://ctyun7:9083"
  16. }
  17. ]
  18. }
  19. }
  20. sink {
  21. # choose stdout output plugin to output data to console
  22. Hive {
  23. table_name = "${database_name}.${table_name}"
  24. metastore_uri = "thrift://ctyun7:9083"
  25. }
  26. }

Changelog

2.2.0-beta 2022-09-26

  • Add Hive Sink Connector

2.3.0-beta 2022-10-20

  • [Improve] Hive Sink supports automatic partition repair (3133)

2.3.0 2022-12-30

  • [BugFix] Fixed the following bugs that failed to write data to files (3258)
    • When field from upstream is null it will throw NullPointerException
    • Sink columns mapping failed
    • When restore writer from states getting transaction directly failed

Next version

  • [Improve] Support kerberos authentication (3840)
  • [Improve] Added partition_dir_expression validation logic (3886)