Some background services receive data through HTTP requests. In this scenario, Apache Flink can write result data through HTTP requests. Currently, Apache Flink officially does not provide a connector for writing data through HTTP requests. Apache StreamPark encapsulates HttpSink to write data asynchronously in real-time based on asynchttpclient.

HttpSink writes do not support transactions, writing data to the target service provides AT_LEAST_ONCE semantics. Data that fails to be retried multiple times will be written to external components (Apache Kafka, MySQL, HDFS, Apache HBase), and the data will be restored manually to achieve final data consistency.

http asynchronous write

Asynchronous writing uses asynchttpclient as the client, you need to import the jar of asynchttpclient first.

  1. <dependency>
  2. <groupId>org.asynchttpclient</groupId>
  3. <artifactId>async-http-client</artifactId>
  4. <optional>true</optional>
  5. </dependency>

Write with Apache StreamPark™

http asynchronous write support type

HttpSink supports get, post, patch, put, delete, options, trace of http protocol. Corresponding to the method of the same name of HttpSink, the specific information is as follows:

Scala

  1. class HttpSink(@(transient@param) ctx: StreamingContext,
  2. header: Map[String, String] = Map.empty[String, String],
  3. parallelism: Int = 0,
  4. name: String = null,
  5. uid: String = null) extends Sink {
  6. def get(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpGet.METHOD_NAME)
  7. def post(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpPost.METHOD_NAME)
  8. def patch(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpPatch.METHOD_NAME)
  9. def put(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpPut.METHOD_NAME)
  10. def delete(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpDelete.METHOD_NAME)
  11. def options(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpOptions.METHOD_NAME)
  12. def trace(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpTrace.METHOD_NAME)
  13. private[this] def sink(stream: DataStream[String], method: String): DataStreamSink[String] = {
  14. val params = ctx.parameter.toMap.filter(_._1.startsWith(HTTP_SINK_PREFIX)).map(x => x._1.drop(HTTP_SINK_PREFIX.length + 1) -> x._2)
  15. val sinkFun = new HttpSinkFunction(params, header, method)
  16. val sink = stream.addSink(sinkFun)
  17. afterSink(sink, parallelism, name, uid)
  18. }
  19. }

Configuration list of HTTP asynchronous write

  1. http.sink:
  2. threshold:
  3. numWriters: 3
  4. queueCapacity: 10000 #The maximum capacity of the queue, according to the size of a single record, and the size of the queue is estimated by itself. If the value is too large, the upstream data source is coming too fast, and the downstream write data may not keep up with OOM.
  5. timeout: 100 #Timeout for sending http requests
  6. retries: 3 #Maximum number of retries when sending fails
  7. successCode: 200 #Send success status code
  8. failover:
  9. table: record
  10. storage: mysql #kafka,hbase,hdfs
  11. jdbc:
  12. jdbcUrl: jdbc:mysql://localhost:3306/test
  13. username: root
  14. password: 123456
  15. kafka:
  16. topic: bigdata
  17. bootstrap.servers: localhost:9091,localhost:9092,localhost:9093
  18. hbase:
  19. zookeeper.quorum: localhost
  20. zookeeper.property.clientPort: 2181
  21. hdfs:
  22. namenode: hdfs://localhost:8020 # namenode rpc address and port, e.g: hdfs://hadoop:8020 , hdfs://hadoop:9000
  23. user: benjobs # user
  24. path: /http/failover # save path
  25. format: yyyy-MM-dd

HTTP writes data asynchronously

The program sample is scala

Scala

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.sink.HttpSink
  3. import org.apache.flink.api.scala._
  4. import org.apache.flink.streaming.api.scala.DataStream
  5. object HttpSinkApp extends FlinkStreaming {
  6. override def handle(): Unit = {
  7. val source = context.addSource(new TestSource)
  8. val value: DataStream[String] = source.map(x => s"http://127.0.0.1:8080?userId=(${x.userId}&siteId=${x.siteId})")
  9. HttpSink().post(value).setParallelism(1)
  10. }
  11. }

Since http can only write one piece of data at a time, the latency is relatively high, and it is not suitable for writing large amounts of data. It is necessary to set a reasonable threshold to improve performance. Since httpSink asynchronous writing fails, data will be added to the cache queue again, which may cause data in the same window to be written in two batches. It is recommended to fully test in scenarios with high real-time requirements. After the asynchronous write data reaches the maximum retry value, the data will be backed up to the external component, and the component connection will be initialized at this time. It is recommended to ensure the availability of the failover component.

Other configuration

All other configurations must comply with the StreamPark configuration. For specific configurable items and the role of each parameter, please refer Project configuration