SQL transform plugin

Description

Use SQL to transform given input row.

SQL transform use memory SQL engine, we can via SQL functions and ability of SQL engine to implement the transform task.

Options

name type required default value
source_table_name string yes -
result_table_name string yes -
query string yes -

source_table_name [string]

The source table name, the query SQL table name must match this field.

query [string]

The query SQL, it’s a simple SQL supported base function and criteria filter operation. But the complex SQL unsupported yet, include: multi source table/rows JOIN and AGGREGATE operation and the like.

the query expression can be select [table_name.]column_a to query the column that named column_a. and the table name is optional.
or select c_row.c_inner_row.column_b to query the inline struct column that named column_b within c_row column and c_inner_row column. In this query expression, can’t have table name.

Example

The data read from source is a table like this:

id name age
1 Joy Ding 20
2 May Ding 21
3 Kin Dom 24
4 Joy Dom 22

We use SQL query to transform the source data like this:

  1. transform {
  2. Sql {
  3. source_table_name = "fake"
  4. result_table_name = "fake1"
  5. query = "select id, concat(name, '_') as name, age+1 as age from fake where id>0"
  6. }
  7. }

Then the data in result table fake1 will update to

id name age
1 Joy Ding_ 21
2 May Ding_ 22
3 Kin Dom_ 25
4 Joy Dom_ 23

Struct query

if your upstream data schema is like this:

  1. source {
  2. FakeSource {
  3. result_table_name = "fake"
  4. row.num = 100
  5. string.template = ["innerQuery"]
  6. schema = {
  7. fields {
  8. name = "string"
  9. c_date = "date"
  10. c_row = {
  11. c_inner_row = {
  12. c_inner_int = "int"
  13. c_inner_string = "string"
  14. c_inner_timestamp = "timestamp"
  15. c_map_1 = "map<string, string>"
  16. c_map_2 = "map<string, map<string,string>>"
  17. }
  18. c_string = "string"
  19. }
  20. }
  21. }
  22. }
  23. }

Those query all are valid:

  1. select
  2. name,
  3. c_date,
  4. c_row,
  5. c_row.c_inner_row,
  6. c_row.c_string,
  7. c_row.c_inner_row.c_inner_int,
  8. c_row.c_inner_row.c_inner_string,
  9. c_row.c_inner_row.c_inner_timestamp,
  10. c_row.c_inner_row.c_map_1,
  11. c_row.c_inner_row.c_map_1.some_key

But this query are not valid:

  1. select
  2. c_row.c_inner_row.c_map_2.some_key.inner_map_key

The map must be the latest struct, can’t query the nesting map.

Job Config Example

  1. env {
  2. job.mode = "BATCH"
  3. }
  4. source {
  5. FakeSource {
  6. result_table_name = "fake"
  7. row.num = 100
  8. schema = {
  9. fields {
  10. id = "int"
  11. name = "string"
  12. age = "int"
  13. }
  14. }
  15. }
  16. }
  17. transform {
  18. Sql {
  19. source_table_name = "fake"
  20. result_table_name = "fake1"
  21. query = "select id, concat(name, '_') as name, age+1 as age from fake where id>0"
  22. }
  23. }
  24. sink {
  25. Console {
  26. source_table_name = "fake1"
  27. }
  28. }

Changelog

  • Support struct query

new version

  • Add SQL Transform Connector