Assert sink connector

Description

A flink sink plugin which can assert illegal data by user defined rules

Key Features

Options

Name Type Required Default
rules ConfigMap yes -
rules.field_rules string yes -
rules.field_rules.field_name string\ ConfigMap yes -
rules.field_rules.field_type string no -
rules.field_rules.field_value ConfigList no -
rules.field_rules.field_value.rule_type string no -
rules.field_rules.field_value.rule_value numeric no -
rules.field_rules.field_value.equals_to boolean\ numeric\ string\ ConfigList\ ConfigMap no -
rules.row_rules string yes -
rules.row_rules.rule_type string no -
rules.row_rules.rule_value string no -
rules.catalog_table_rule ConfigMap no -
rules.catalog_table_rule.primary_key_rule ConfigMap no -
rules.catalog_table_rule.primary_key_rule.primary_key_name string no -
rules.catalog_table_rule.primary_key_rule.primary_key_columns ConfigList no -
rules.catalog_table_rule.constraint_key_rule ConfigList no -
rules.catalog_table_rule.constraint_key_rule.constraint_key_name string no -
rules.catalog_table_rule.constraint_key_rule.constraint_key_type string no -
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns ConfigList no -
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_column_name string no -
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_sort_type string no -
rules.catalog_table_rule.column_rule ConfigList no -
rules.catalog_table_rule.column_rule.name string no -
rules.catalog_table_rule.column_rule.type string no -
rules.catalog_table_rule.column_rule.column_length int no -
rules.catalog_table_rule.column_rule.nullable boolean no -
rules.catalog_table_rule.column_rule.default_value string no -
rules.catalog_table_rule.column_rule.comment comment no -
rules.table-names ConfigList no -
common-options no -

rules [ConfigMap]

Rule definition of user’s available data. Each rule represents one field validation or row num validation.

field_rules [ConfigList]

field rules for field validation

field_name [string]

field name(string)

field_type [string | ConfigMap]

Field type declarations should adhere to this guide.

field_value [ConfigList]

A list value rule define the data value validation

rule_type [string]

The following rules are supported for now

  • NOT_NULL value can't be null
  • NULL value can be null
  • MIN define the minimum value of data
  • MAX define the maximum value of data
  • MIN_LENGTH define the minimum string length of a string data
  • MAX_LENGTH define the maximum string length of a string data
  • MIN_ROW define the minimun number of rows
  • MAX_ROW define the maximum number of rows

rule_value [numeric]

The value related to rule type. When the rule_type is MIN, MAX, MIN_LENGTH, MAX_LENGTH, MIN_ROW or MAX_ROW, users need to assign a value to the rule_value.

equals_to [boolean | numeric | string | ConfigList | ConfigMap]

equals_to is used to compare whether the field value is equal to the configured expected value. You can assign values of all types to equals_to. These types are detailed here. For instance, if one field is a row with three fields, and the declaration of row type is {a = array<string>, b = map<string, decimal(30, 2)>, c={c_0 = int, b = string}}, users can assign the value [["a", "b"], { k0 = 9999.99, k1 = 111.11 }, [123, "abcd"]] to equals_to.

The way of defining field values is consistent with FakeSource.

equals_to cannot be applied to null type fields. However, users can use the rule type NULL for verification, such as {rule_type = NULL}.

catalog_table_rule [ConfigMap]

Used to assert the catalog table is same with the user defined table.

table-names [ConfigList]

Used to assert the table should be in the data.

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Example

the whole config obey with hocon style

  1. Assert {
  2. rules =
  3. {
  4. row_rules = [
  5. {
  6. rule_type = MAX_ROW
  7. rule_value = 10
  8. },
  9. {
  10. rule_type = MIN_ROW
  11. rule_value = 5
  12. }
  13. ],
  14. field_rules = [{
  15. field_name = name
  16. field_type = string
  17. field_value = [
  18. {
  19. rule_type = NOT_NULL
  20. },
  21. {
  22. rule_type = MIN_LENGTH
  23. rule_value = 5
  24. },
  25. {
  26. rule_type = MAX_LENGTH
  27. rule_value = 10
  28. }
  29. ]
  30. }, {
  31. field_name = age
  32. field_type = int
  33. field_value = [
  34. {
  35. rule_type = NOT_NULL
  36. equals_to = 23
  37. },
  38. {
  39. rule_type = MIN
  40. rule_value = 32767
  41. },
  42. {
  43. rule_type = MAX
  44. rule_value = 2147483647
  45. }
  46. ]
  47. }
  48. ]
  49. catalog_table_rule {
  50. primary_key_rule = {
  51. primary_key_name = "primary key"
  52. primary_key_columns = ["id"]
  53. }
  54. constraint_key_rule = [
  55. {
  56. constraint_key_name = "unique_name"
  57. constraint_key_type = UNIQUE_KEY
  58. constraint_key_columns = [
  59. {
  60. constraint_key_column_name = "id"
  61. constraint_key_sort_type = ASC
  62. }
  63. ]
  64. }
  65. ]
  66. column_rule = [
  67. {
  68. name = "id"
  69. type = bigint
  70. },
  71. {
  72. name = "name"
  73. type = string
  74. },
  75. {
  76. name = "age"
  77. type = int
  78. }
  79. ]
  80. }
  81. }
  82. }

Here is a more complex example about equals_to. The example involves FakeSource. You may want to learn it, please read this document.

  1. source {
  2. FakeSource {
  3. row.num = 1
  4. schema = {
  5. fields {
  6. c_null = "null"
  7. c_string = string
  8. c_boolean = boolean
  9. c_tinyint = tinyint
  10. c_smallint = smallint
  11. c_int = int
  12. c_bigint = bigint
  13. c_float = float
  14. c_double = double
  15. c_decimal = "decimal(30, 8)"
  16. c_date = date
  17. c_timestamp = timestamp
  18. c_time = time
  19. c_bytes = bytes
  20. c_array = "array<int>"
  21. c_map = "map<time, string>"
  22. c_map_nest = "map<string, {c_int = int, c_string = string}>"
  23. c_row = {
  24. c_null = "null"
  25. c_string = string
  26. c_boolean = boolean
  27. c_tinyint = tinyint
  28. c_smallint = smallint
  29. c_int = int
  30. c_bigint = bigint
  31. c_float = float
  32. c_double = double
  33. c_decimal = "decimal(30, 8)"
  34. c_date = date
  35. c_timestamp = timestamp
  36. c_time = time
  37. c_bytes = bytes
  38. c_array = "array<int>"
  39. c_map = "map<string, string>"
  40. }
  41. }
  42. }
  43. rows = [
  44. {
  45. kind = INSERT
  46. fields = [
  47. null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
  48. "bWlJWmo=",
  49. [0, 1, 2],
  50. "{ 12:01:26 = v0 }",
  51. { k1 = [123, "BBB-BB"]},
  52. [
  53. null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
  54. "bWlJWmo=",
  55. [0, 1, 2],
  56. { k0 = v0 }
  57. ]
  58. ]
  59. }
  60. ]
  61. result_table_name = "fake"
  62. }
  63. }
  64. sink{
  65. Assert {
  66. source_table_name = "fake"
  67. rules =
  68. {
  69. row_rules = [
  70. {
  71. rule_type = MAX_ROW
  72. rule_value = 1
  73. },
  74. {
  75. rule_type = MIN_ROW
  76. rule_value = 1
  77. }
  78. ],
  79. field_rules = [
  80. {
  81. field_name = c_null
  82. field_type = "null"
  83. field_value = [
  84. {
  85. rule_type = NULL
  86. }
  87. ]
  88. },
  89. {
  90. field_name = c_string
  91. field_type = string
  92. field_value = [
  93. {
  94. rule_type = NOT_NULL
  95. equals_to = "AAA"
  96. }
  97. ]
  98. },
  99. {
  100. field_name = c_boolean
  101. field_type = boolean
  102. field_value = [
  103. {
  104. rule_type = NOT_NULL
  105. equals_to = false
  106. }
  107. ]
  108. },
  109. {
  110. field_name = c_tinyint
  111. field_type = tinyint
  112. field_value = [
  113. {
  114. rule_type = NOT_NULL
  115. equals_to = 1
  116. }
  117. ]
  118. },
  119. {
  120. field_name = c_smallint
  121. field_type = smallint
  122. field_value = [
  123. {
  124. rule_type = NOT_NULL
  125. equals_to = 1
  126. }
  127. ]
  128. },
  129. {
  130. field_name = c_int
  131. field_type = int
  132. field_value = [
  133. {
  134. rule_type = NOT_NULL
  135. equals_to = 333
  136. }
  137. ]
  138. },
  139. {
  140. field_name = c_bigint
  141. field_type = bigint
  142. field_value = [
  143. {
  144. rule_type = NOT_NULL
  145. equals_to = 323232
  146. }
  147. ]
  148. },
  149. {
  150. field_name = c_float
  151. field_type = float
  152. field_value = [
  153. {
  154. rule_type = NOT_NULL
  155. equals_to = 3.1
  156. }
  157. ]
  158. },
  159. {
  160. field_name = c_double
  161. field_type = double
  162. field_value = [
  163. {
  164. rule_type = NOT_NULL
  165. equals_to = 9.33333
  166. }
  167. ]
  168. },
  169. {
  170. field_name = c_decimal
  171. field_type = "decimal(30, 8)"
  172. field_value = [
  173. {
  174. rule_type = NOT_NULL
  175. equals_to = 99999.99999999
  176. }
  177. ]
  178. },
  179. {
  180. field_name = c_date
  181. field_type = date
  182. field_value = [
  183. {
  184. rule_type = NOT_NULL
  185. equals_to = "2012-12-21"
  186. }
  187. ]
  188. },
  189. {
  190. field_name = c_timestamp
  191. field_type = timestamp
  192. field_value = [
  193. {
  194. rule_type = NOT_NULL
  195. equals_to = "2012-12-21T12:34:56"
  196. }
  197. ]
  198. },
  199. {
  200. field_name = c_time
  201. field_type = time
  202. field_value = [
  203. {
  204. rule_type = NOT_NULL
  205. equals_to = "12:34:56"
  206. }
  207. ]
  208. },
  209. {
  210. field_name = c_bytes
  211. field_type = bytes
  212. field_value = [
  213. {
  214. rule_type = NOT_NULL
  215. equals_to = "bWlJWmo="
  216. }
  217. ]
  218. },
  219. {
  220. field_name = c_array
  221. field_type = "array<int>"
  222. field_value = [
  223. {
  224. rule_type = NOT_NULL
  225. equals_to = [0, 1, 2]
  226. }
  227. ]
  228. },
  229. {
  230. field_name = c_map
  231. field_type = "map<time, string>"
  232. field_value = [
  233. {
  234. rule_type = NOT_NULL
  235. equals_to = "{ 12:01:26 = v0 }"
  236. }
  237. ]
  238. },
  239. {
  240. field_name = c_map_nest
  241. field_type = "map<string, {c_int = int, c_string = string}>"
  242. field_value = [
  243. {
  244. rule_type = NOT_NULL
  245. equals_to = { k1 = [123, "BBB-BB"] }
  246. }
  247. ]
  248. },
  249. {
  250. field_name = c_row
  251. field_type = {
  252. c_null = "null"
  253. c_string = string
  254. c_boolean = boolean
  255. c_tinyint = tinyint
  256. c_smallint = smallint
  257. c_int = int
  258. c_bigint = bigint
  259. c_float = float
  260. c_double = double
  261. c_decimal = "decimal(30, 8)"
  262. c_date = date
  263. c_timestamp = timestamp
  264. c_time = time
  265. c_bytes = bytes
  266. c_array = "array<int>"
  267. c_map = "map<string, string>"
  268. }
  269. field_value = [
  270. {
  271. rule_type = NOT_NULL
  272. equals_to = [
  273. null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
  274. "bWlJWmo=",
  275. [0, 1, 2],
  276. { k0 = v0 }
  277. ]
  278. }
  279. ]
  280. }
  281. ]
  282. }
  283. }
  284. }

Changelog

2.2.0-beta 2022-09-26

  • Add Assert Sink Connector

2.3.0-beta 2022-10-20

  • [Improve] 1.Support check the number of rows (2844) (3031):
    • check rows not empty
    • check minimum number of rows
    • check maximum number of rows
  • [Improve] 2.Support direct define of data values(row) (2844) (3031)
  • [Improve] 3.Support setting parallelism as 1 (2844) (3031)