Assert sink connector
Description
A flink sink plugin which can assert illegal data by user defined rules
Key Features
Options
Name | Type | Required | Default | ||||
---|---|---|---|---|---|---|---|
rules | ConfigMap | yes | - | ||||
rules.field_rules | string | yes | - | ||||
rules.field_rules.field_name | string\ | ConfigMap | yes | - | |||
rules.field_rules.field_type | string | no | - | ||||
rules.field_rules.field_value | ConfigList | no | - | ||||
rules.field_rules.field_value.rule_type | string | no | - | ||||
rules.field_rules.field_value.rule_value | numeric | no | - | ||||
rules.field_rules.field_value.equals_to | boolean\ | numeric\ | string\ | ConfigList\ | ConfigMap | no | - |
rules.row_rules | string | yes | - | ||||
rules.row_rules.rule_type | string | no | - | ||||
rules.row_rules.rule_value | string | no | - | ||||
rules.catalog_table_rule | ConfigMap | no | - | ||||
rules.catalog_table_rule.primary_key_rule | ConfigMap | no | - | ||||
rules.catalog_table_rule.primary_key_rule.primary_key_name | string | no | - | ||||
rules.catalog_table_rule.primary_key_rule.primary_key_columns | ConfigList | no | - | ||||
rules.catalog_table_rule.constraint_key_rule | ConfigList | no | - | ||||
rules.catalog_table_rule.constraint_key_rule.constraint_key_name | string | no | - | ||||
rules.catalog_table_rule.constraint_key_rule.constraint_key_type | string | no | - | ||||
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns | ConfigList | no | - | ||||
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_column_name | string | no | - | ||||
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_sort_type | string | no | - | ||||
rules.catalog_table_rule.column_rule | ConfigList | no | - | ||||
rules.catalog_table_rule.column_rule.name | string | no | - | ||||
rules.catalog_table_rule.column_rule.type | string | no | - | ||||
rules.catalog_table_rule.column_rule.column_length | int | no | - | ||||
rules.catalog_table_rule.column_rule.nullable | boolean | no | - | ||||
rules.catalog_table_rule.column_rule.default_value | string | no | - | ||||
rules.catalog_table_rule.column_rule.comment | comment | no | - | ||||
rules.table-names | ConfigList | no | - | ||||
common-options | no | - |
rules [ConfigMap]
Rule definition of user’s available data. Each rule represents one field validation or row num validation.
field_rules [ConfigList]
field rules for field validation
field_name [string]
field name(string)
field_type [string | ConfigMap]
Field type declarations should adhere to this guide.
field_value [ConfigList]
A list value rule define the data value validation
rule_type [string]
The following rules are supported for now
- NOT_NULL
value can't be null
- NULL
value can be null
- MIN
define the minimum value of data
- MAX
define the maximum value of data
- MIN_LENGTH
define the minimum string length of a string data
- MAX_LENGTH
define the maximum string length of a string data
- MIN_ROW
define the minimun number of rows
- MAX_ROW
define the maximum number of rows
rule_value [numeric]
The value related to rule type. When the rule_type
is MIN
, MAX
, MIN_LENGTH
, MAX_LENGTH
, MIN_ROW
or MAX_ROW
, users need to assign a value to the rule_value
.
equals_to [boolean | numeric | string | ConfigList | ConfigMap]
equals_to
is used to compare whether the field value is equal to the configured expected value. You can assign values of all types to equals_to
. These types are detailed here. For instance, if one field is a row with three fields, and the declaration of row type is {a = array<string>, b = map<string, decimal(30, 2)>, c={c_0 = int, b = string}}
, users can assign the value [["a", "b"], { k0 = 9999.99, k1 = 111.11 }, [123, "abcd"]]
to equals_to
.
The way of defining field values is consistent with FakeSource.
equals_to
cannot be applied tonull
type fields. However, users can use the rule typeNULL
for verification, such as{rule_type = NULL}
.
catalog_table_rule [ConfigMap]
Used to assert the catalog table is same with the user defined table.
table-names [ConfigList]
Used to assert the table should be in the data.
common options
Sink plugin common parameters, please refer to Sink Common Options for details
Example
the whole config obey with hocon
style
Assert {
rules =
{
row_rules = [
{
rule_type = MAX_ROW
rule_value = 10
},
{
rule_type = MIN_ROW
rule_value = 5
}
],
field_rules = [{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
},
{
rule_type = MIN_LENGTH
rule_value = 5
},
{
rule_type = MAX_LENGTH
rule_value = 10
}
]
}, {
field_name = age
field_type = int
field_value = [
{
rule_type = NOT_NULL
equals_to = 23
},
{
rule_type = MIN
rule_value = 32767
},
{
rule_type = MAX
rule_value = 2147483647
}
]
}
]
catalog_table_rule {
primary_key_rule = {
primary_key_name = "primary key"
primary_key_columns = ["id"]
}
constraint_key_rule = [
{
constraint_key_name = "unique_name"
constraint_key_type = UNIQUE_KEY
constraint_key_columns = [
{
constraint_key_column_name = "id"
constraint_key_sort_type = ASC
}
]
}
]
column_rule = [
{
name = "id"
type = bigint
},
{
name = "name"
type = string
},
{
name = "age"
type = int
}
]
}
}
}
Here is a more complex example about equals_to
. The example involves FakeSource. You may want to learn it, please read this document.
source {
FakeSource {
row.num = 1
schema = {
fields {
c_null = "null"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_date = date
c_timestamp = timestamp
c_time = time
c_bytes = bytes
c_array = "array<int>"
c_map = "map<time, string>"
c_map_nest = "map<string, {c_int = int, c_string = string}>"
c_row = {
c_null = "null"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_date = date
c_timestamp = timestamp
c_time = time
c_bytes = bytes
c_array = "array<int>"
c_map = "map<string, string>"
}
}
}
rows = [
{
kind = INSERT
fields = [
null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
"bWlJWmo=",
[0, 1, 2],
"{ 12:01:26 = v0 }",
{ k1 = [123, "BBB-BB"]},
[
null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
"bWlJWmo=",
[0, 1, 2],
{ k0 = v0 }
]
]
}
]
result_table_name = "fake"
}
}
sink{
Assert {
source_table_name = "fake"
rules =
{
row_rules = [
{
rule_type = MAX_ROW
rule_value = 1
},
{
rule_type = MIN_ROW
rule_value = 1
}
],
field_rules = [
{
field_name = c_null
field_type = "null"
field_value = [
{
rule_type = NULL
}
]
},
{
field_name = c_string
field_type = string
field_value = [
{
rule_type = NOT_NULL
equals_to = "AAA"
}
]
},
{
field_name = c_boolean
field_type = boolean
field_value = [
{
rule_type = NOT_NULL
equals_to = false
}
]
},
{
field_name = c_tinyint
field_type = tinyint
field_value = [
{
rule_type = NOT_NULL
equals_to = 1
}
]
},
{
field_name = c_smallint
field_type = smallint
field_value = [
{
rule_type = NOT_NULL
equals_to = 1
}
]
},
{
field_name = c_int
field_type = int
field_value = [
{
rule_type = NOT_NULL
equals_to = 333
}
]
},
{
field_name = c_bigint
field_type = bigint
field_value = [
{
rule_type = NOT_NULL
equals_to = 323232
}
]
},
{
field_name = c_float
field_type = float
field_value = [
{
rule_type = NOT_NULL
equals_to = 3.1
}
]
},
{
field_name = c_double
field_type = double
field_value = [
{
rule_type = NOT_NULL
equals_to = 9.33333
}
]
},
{
field_name = c_decimal
field_type = "decimal(30, 8)"
field_value = [
{
rule_type = NOT_NULL
equals_to = 99999.99999999
}
]
},
{
field_name = c_date
field_type = date
field_value = [
{
rule_type = NOT_NULL
equals_to = "2012-12-21"
}
]
},
{
field_name = c_timestamp
field_type = timestamp
field_value = [
{
rule_type = NOT_NULL
equals_to = "2012-12-21T12:34:56"
}
]
},
{
field_name = c_time
field_type = time
field_value = [
{
rule_type = NOT_NULL
equals_to = "12:34:56"
}
]
},
{
field_name = c_bytes
field_type = bytes
field_value = [
{
rule_type = NOT_NULL
equals_to = "bWlJWmo="
}
]
},
{
field_name = c_array
field_type = "array<int>"
field_value = [
{
rule_type = NOT_NULL
equals_to = [0, 1, 2]
}
]
},
{
field_name = c_map
field_type = "map<time, string>"
field_value = [
{
rule_type = NOT_NULL
equals_to = "{ 12:01:26 = v0 }"
}
]
},
{
field_name = c_map_nest
field_type = "map<string, {c_int = int, c_string = string}>"
field_value = [
{
rule_type = NOT_NULL
equals_to = { k1 = [123, "BBB-BB"] }
}
]
},
{
field_name = c_row
field_type = {
c_null = "null"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_date = date
c_timestamp = timestamp
c_time = time
c_bytes = bytes
c_array = "array<int>"
c_map = "map<string, string>"
}
field_value = [
{
rule_type = NOT_NULL
equals_to = [
null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
"bWlJWmo=",
[0, 1, 2],
{ k0 = v0 }
]
}
]
}
]
}
}
}
Changelog
2.2.0-beta 2022-09-26
- Add Assert Sink Connector