一些后台服务通过 HTTP 请求接收数据,这种场景下 Apache Flink 可以通过 HTTP 请求写入结果数据,目前 Apache Flink 官方未提供通过 HTTP 请求写入 数据的连接器。Apache StreamPark 基于 asynchttpclient 封装了 HttpSink 异步实时写入数据。

HttpSink 写入不支持事务,向目标服务写入数据可提供至少一次的处理语义。异步写入重试多次失败的数据会写入外部组件,最终通过人为介入来恢复数据,达到最终数据一致。

HTTP 异步写入

异步写入采用 asynchttpclient 作为客户端,需要先导入 asynchttpclient 相关依赖:

  1. <dependency>
  2. <groupId>org.asynchttpclient</groupId>
  3. <artifactId>async-http-client</artifactId>
  4. <optional>true</optional>
  5. </dependency>

Apache StreamPark™ 方式写入

http异步写入支持类型

HttpSink 支持http协议的get 、post 、patch 、put 、delete 、options 、trace 对应至HttpSink同名方法,具体信息如下:

Scala

  1. class HttpSink(@(transient@param) ctx: StreamingContext,
  2. header: Map[String, String] = Map.empty[String, String],
  3. parallelism: Int = 0,
  4. name: String = null,
  5. uid: String = null) extends Sink {
  6. def get(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpGet.METHOD_NAME)
  7. def post(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpPost.METHOD_NAME)
  8. def patch(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpPatch.METHOD_NAME)
  9. def put(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpPut.METHOD_NAME)
  10. def delete(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpDelete.METHOD_NAME)
  11. def options(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpOptions.METHOD_NAME)
  12. def trace(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpTrace.METHOD_NAME)
  13. private[this] def sink(stream: DataStream[String], method: String): DataStreamSink[String] = {
  14. val params = ctx.parameter.toMap.filter(_._1.startsWith(HTTP_SINK_PREFIX)).map(x => x._1.drop(HTTP_SINK_PREFIX.length + 1) -> x._2)
  15. val sinkFun = new HttpSinkFunction(params, header, method)
  16. val sink = stream.addSink(sinkFun)
  17. afterSink(sink, parallelism, name, uid)
  18. }
  19. }

http异步写入配置参数列表

  1. http.sink:
  2. threshold:
  3. numWriters: 3
  4. queueCapacity: 10000 #队列最大容量,视单条记录大小而自行估量队列大小,如值太大,上游数据源来的太快,下游写入数据跟不上可能会OOM.
  5. timeout: 100 #发送http请求的超时时间
  6. retries: 3 #发送失败时的最大重试次数
  7. successCode: 200 #发送成功状态码,这里可以有多个值,用","号分隔
  8. failover:
  9. table: record
  10. storage: mysql #kafka,hbase,hdfs
  11. jdbc: # 保存类型为MySQL,将失败的数据保存到MySQL
  12. jdbcUrl: jdbc:mysql://localhost:3306/test
  13. username: root
  14. password: 123456
  15. kafka:
  16. topic: bigdata
  17. bootstrap.servers: localhost:9091,localhost:9092,localhost:9093
  18. hbase:
  19. zookeeper.quorum: localhost
  20. zookeeper.property.clientPort: 2181
  21. hdfs:
  22. namenode: hdfs://localhost:8020 # namenode rpc address and port, e.g: hdfs://hadoop:8020 , hdfs://hadoop:9000
  23. user: benjobs # user
  24. path: /http/failover # save path
  25. format: yyyy-MM-dd

http异步写入数据

运行程序样例为scala

Scala

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.sink.HttpSink
  3. import org.apache.flink.api.scala._
  4. import org.apache.flink.streaming.api.scala.DataStream
  5. object HttpSinkApp extends FlinkStreaming {
  6. override def handle(): Unit = {
  7. //接入数据
  8. val source = context.addSource(new TestSource)
  9. val value: DataStream[String] = source.map(x => s"http://127.0.0.1:8080?userId=(${x.userId}&siteId=${x.siteId})")
  10. // 通过调用HttpSink和http协议对应的方法来写入数据
  11. HttpSink().post(value).setParallelism(1)
  12. }
  13. }

由于http一次只能写入一条数据,延迟比较高,不适合大数据量写入,需设置合理阈值提高性能

由于httpSink异步写入失败会重新将数据添加至缓存队列,可能造成同一窗口数据分两批次写入,实时性要求高的场景建议全面测试

异步写入数据达到重试最大值后,会将数据备份至外部组件,在此时才会初始化组件连接,建议确保failover 组件的可用性

其他配置

其他的所有的配置都必须遵守 StreamPark 配置,具体可配置项和各个参数的作用请参考项目配置