Http source connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Description

Used to read data from Http.

Key features

Supported DataSource Info

In order to use the Http connector, the following dependencies are required. They can be downloaded via install-plugin.sh or from the Maven central repository.

Datasource Supported Versions Dependency
Http universal Download

Source Options

Name Type Required Default Description
url String Yes - Http request url.
schema Config No - Http and seatunnel data structure mapping
schema.fields Config No - The schema fields of upstream data
json_field Config No - This parameter helps you configure the schema,so this parameter must be used with schema.
pageing Config No - This parameter is used for paging queries
pageing.page_field String No - This parameter is used to specify the page field name in the request parameter
pageing.total_page_size Int No - This parameter is used to control the total number of pages
pageing.batch_size Int No - The batch size returned per request is used to determine whether to continue when the total number of pages is unknown
content_json String No - This parameter can get some json data.If you only need the data in the ‘book’ section, configure content_field = "$.store.book.*".
format String No text The format of upstream data, now only support json text, default text.
method String No get Http request method, only supports GET, POST method.
headers Map No - Http headers.
params Map No - Http params,the program will automatically add http header application/x-www-form-urlencoded.
body String No - Http body,the program will automatically add http header application/json,body is jsonbody.
poll_interval_millis Int No - Request http api interval(millis) in stream mode.
retry Int No - The max retry times if request http return to IOException.
retry_backoff_multiplier_ms Int No 100 The retry-backoff times(millis) multiplier if request http failed.
retry_backoff_max_ms Int No 10000 The maximum retry-backoff times(millis) if request http failed
enable_multi_lines Boolean No false
connect_timeout_ms Int No 12000 Connection timeout setting, default 12s.
socket_timeout_ms Int No 60000 Socket timeout setting, default 60s.
common-options No - Source plugin common parameters, please refer to Source Common Options for details

How to Create a Http Data Synchronization Jobs

  1. env {
  2. parallelism = 1
  3. job.mode = "BATCH"
  4. }
  5. source {
  6. Http {
  7. result_table_name = "http"
  8. url = "http://mockserver:1080/example/http"
  9. method = "GET"
  10. format = "json"
  11. schema = {
  12. fields {
  13. c_map = "map<string, string>"
  14. c_array = "array<int>"
  15. c_string = string
  16. c_boolean = boolean
  17. c_tinyint = tinyint
  18. c_smallint = smallint
  19. c_int = int
  20. c_bigint = bigint
  21. c_float = float
  22. c_double = double
  23. c_bytes = bytes
  24. c_date = date
  25. c_decimal = "decimal(38, 18)"
  26. c_timestamp = timestamp
  27. c_row = {
  28. C_MAP = "map<string, string>"
  29. C_ARRAY = "array<int>"
  30. C_STRING = string
  31. C_BOOLEAN = boolean
  32. C_TINYINT = tinyint
  33. C_SMALLINT = smallint
  34. C_INT = int
  35. C_BIGINT = bigint
  36. C_FLOAT = float
  37. C_DOUBLE = double
  38. C_BYTES = bytes
  39. C_DATE = date
  40. C_DECIMAL = "decimal(38, 18)"
  41. C_TIMESTAMP = timestamp
  42. }
  43. }
  44. }
  45. }
  46. }
  47. # Console printing of the read Http data
  48. sink {
  49. Console {
  50. parallelism = 1
  51. }
  52. }

Parameter Interpretation

format

when you assign format is json, you should also assign schema option, for example:

upstream data is the following:

  1. {
  2. "code": 200,
  3. "data": "get success",
  4. "success": true
  5. }

you should assign schema as the following:

  1. schema {
  2. fields {
  3. code = int
  4. data = string
  5. success = boolean
  6. }
  7. }

connector will generate data as the following:

code data success
200 get success true

when you assign format is text, connector will do nothing for upstream data, for example:

upstream data is the following:

  1. {
  2. "code": 200,
  3. "data": "get success",
  4. "success": true
  5. }

connector will generate data as the following:

content
{“code”: 200, “data”: “get success”, “success”: true}

content_json

This parameter can get some json data.If you only need the data in the ‘book’ section, configure content_field = "$.store.book.*".

If your return data looks something like this.

  1. {
  2. "store": {
  3. "book": [
  4. {
  5. "category": "reference",
  6. "author": "Nigel Rees",
  7. "title": "Sayings of the Century",
  8. "price": 8.95
  9. },
  10. {
  11. "category": "fiction",
  12. "author": "Evelyn Waugh",
  13. "title": "Sword of Honour",
  14. "price": 12.99
  15. }
  16. ],
  17. "bicycle": {
  18. "color": "red",
  19. "price": 19.95
  20. }
  21. },
  22. "expensive": 10
  23. }

You can configure content_field = "$.store.book.*" and the result returned looks like this:

  1. [
  2. {
  3. "category": "reference",
  4. "author": "Nigel Rees",
  5. "title": "Sayings of the Century",
  6. "price": 8.95
  7. },
  8. {
  9. "category": "fiction",
  10. "author": "Evelyn Waugh",
  11. "title": "Sword of Honour",
  12. "price": 12.99
  13. }
  14. ]

Then you can get the desired result with a simpler schema,like

  1. Http {
  2. url = "http://mockserver:1080/contentjson/mock"
  3. method = "GET"
  4. format = "json"
  5. content_field = "$.store.book.*"
  6. schema = {
  7. fields {
  8. category = string
  9. author = string
  10. title = string
  11. price = string
  12. }
  13. }
  14. }

Here is an example:

json_field

This parameter helps you configure the schema,so this parameter must be used with schema.

If your data looks something like this:

  1. {
  2. "store": {
  3. "book": [
  4. {
  5. "category": "reference",
  6. "author": "Nigel Rees",
  7. "title": "Sayings of the Century",
  8. "price": 8.95
  9. },
  10. {
  11. "category": "fiction",
  12. "author": "Evelyn Waugh",
  13. "title": "Sword of Honour",
  14. "price": 12.99
  15. }
  16. ],
  17. "bicycle": {
  18. "color": "red",
  19. "price": 19.95
  20. }
  21. },
  22. "expensive": 10
  23. }

You can get the contents of ‘book’ by configuring the task as follows:

  1. source {
  2. Http {
  3. url = "http://mockserver:1080/jsonpath/mock"
  4. method = "GET"
  5. format = "json"
  6. json_field = {
  7. category = "$.store.book[*].category"
  8. author = "$.store.book[*].author"
  9. title = "$.store.book[*].title"
  10. price = "$.store.book[*].price"
  11. }
  12. schema = {
  13. fields {
  14. category = string
  15. author = string
  16. title = string
  17. price = string
  18. }
  19. }
  20. }
  21. }

pageing

  1. source {
  2. Http {
  3. url = "http://localhost:8080/mock/queryData"
  4. method = "GET"
  5. format = "json"
  6. params={
  7. page: "${page}"
  8. }
  9. content_field = "$.data.*"
  10. pageing={
  11. total_page_size=20
  12. page_field=page
  13. #when don't know the total_page_size use batch_size if read size<batch_size finish ,otherwise continue
  14. #batch_size=10
  15. }
  16. schema = {
  17. fields {
  18. name = string
  19. age = string
  20. }
  21. }
  22. }
  23. }

Changelog

2.2.0-beta 2022-09-26

  • Add Http Source Connector

new version

  • [Feature][Connector-V2][HTTP] Use json-path parsing (3510)