Avro is very popular in streaming data pipeline. Now seatunnel supports Avro format in kafka connector.

How To Use

Kafka uses example

  • This is an example to generate data from fake source and sink to kafka with avro format.
  1. env {
  2. parallelism = 1
  3. job.mode = "BATCH"
  4. }
  5. source {
  6. FakeSource {
  7. row.num = 90
  8. schema = {
  9. fields {
  10. c_map = "map<string, string>"
  11. c_array = "array<int>"
  12. c_string = string
  13. c_boolean = boolean
  14. c_tinyint = tinyint
  15. c_smallint = smallint
  16. c_int = int
  17. c_bigint = bigint
  18. c_float = float
  19. c_double = double
  20. c_bytes = bytes
  21. c_date = date
  22. c_decimal = "decimal(38, 18)"
  23. c_timestamp = timestamp
  24. c_row = {
  25. c_map = "map<string, string>"
  26. c_array = "array<int>"
  27. c_string = string
  28. c_boolean = boolean
  29. c_tinyint = tinyint
  30. c_smallint = smallint
  31. c_int = int
  32. c_bigint = bigint
  33. c_float = float
  34. c_double = double
  35. c_bytes = bytes
  36. c_date = date
  37. c_decimal = "decimal(38, 18)"
  38. c_timestamp = timestamp
  39. }
  40. }
  41. }
  42. result_table_name = "fake"
  43. }
  44. }
  45. sink {
  46. Kafka {
  47. bootstrap.servers = "kafkaCluster:9092"
  48. topic = "test_avro_topic_fake_source"
  49. format = avro
  50. }
  51. }
  • This is an example read data from kafka with avro format and print to console.
  1. env {
  2. parallelism = 1
  3. job.mode = "BATCH"
  4. }
  5. source {
  6. Kafka {
  7. bootstrap.servers = "kafkaCluster:9092"
  8. topic = "test_avro_topic"
  9. result_table_name = "kafka_table"
  10. kafka.auto.offset.reset = "earliest"
  11. format = avro
  12. format_error_handle_way = skip
  13. schema = {
  14. fields {
  15. id = bigint
  16. c_map = "map<string, smallint>"
  17. c_array = "array<tinyint>"
  18. c_string = string
  19. c_boolean = boolean
  20. c_tinyint = tinyint
  21. c_smallint = smallint
  22. c_int = int
  23. c_bigint = bigint
  24. c_float = float
  25. c_double = double
  26. c_decimal = "decimal(2, 1)"
  27. c_bytes = bytes
  28. c_date = date
  29. c_timestamp = timestamp
  30. }
  31. }
  32. }
  33. }
  34. sink {
  35. Console {
  36. source_table_name = "kafka_table"
  37. }
  38. }