一些后台服务通过 HTTP 请求接收数据,这种场景下 Apache Flink 可以通过 HTTP 请求写入结果数据,目前 Apache Flink 官方未提供通过 HTTP 请求写入 数据的连接器。Apache StreamPark 基于 asynchttpclient 封装了 HttpSink 异步实时写入数据。
HttpSink 写入不支持事务,向目标服务写入数据可提供至少一次的处理语义。异步写入重试多次失败的数据会写入外部组件,最终通过人为介入来恢复数据,达到最终数据一致。
HTTP 异步写入
异步写入采用 asynchttpclient 作为客户端,需要先导入 asynchttpclient 相关依赖:
<dependency><groupId>org.asynchttpclient</groupId><artifactId>async-http-client</artifactId><optional>true</optional></dependency>
Apache StreamPark™ 方式写入
http异步写入支持类型
HttpSink 支持http协议的get 、post 、patch 、put 、delete 、options 、trace 对应至HttpSink同名方法,具体信息如下:
Scala
class HttpSink(@(transient@param) ctx: StreamingContext,header: Map[String, String] = Map.empty[String, String],parallelism: Int = 0,name: String = null,uid: String = null) extends Sink {def get(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpGet.METHOD_NAME)def post(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpPost.METHOD_NAME)def patch(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpPatch.METHOD_NAME)def put(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpPut.METHOD_NAME)def delete(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpDelete.METHOD_NAME)def options(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpOptions.METHOD_NAME)def trace(stream: DataStream[String]): DataStreamSink[String] = sink(stream, HttpTrace.METHOD_NAME)private[this] def sink(stream: DataStream[String], method: String): DataStreamSink[String] = {val params = ctx.parameter.toMap.filter(_._1.startsWith(HTTP_SINK_PREFIX)).map(x => x._1.drop(HTTP_SINK_PREFIX.length + 1) -> x._2)val sinkFun = new HttpSinkFunction(params, header, method)val sink = stream.addSink(sinkFun)afterSink(sink, parallelism, name, uid)}}
http异步写入配置参数列表
http.sink:threshold:numWriters: 3queueCapacity: 10000 #队列最大容量,视单条记录大小而自行估量队列大小,如值太大,上游数据源来的太快,下游写入数据跟不上可能会OOM.timeout: 100 #发送http请求的超时时间retries: 3 #发送失败时的最大重试次数successCode: 200 #发送成功状态码,这里可以有多个值,用","号分隔failover:table: recordstorage: mysql #kafka,hbase,hdfsjdbc: # 保存类型为MySQL,将失败的数据保存到MySQLjdbcUrl: jdbc:mysql://localhost:3306/testusername: rootpassword: 123456kafka:topic: bigdatabootstrap.servers: localhost:9091,localhost:9092,localhost:9093hbase:zookeeper.quorum: localhostzookeeper.property.clientPort: 2181hdfs:namenode: hdfs://localhost:8020 # namenode rpc address and port, e.g: hdfs://hadoop:8020 , hdfs://hadoop:9000user: benjobs # userpath: /http/failover # save pathformat: yyyy-MM-dd
http异步写入数据
运行程序样例为scala
Scala
import org.apache.streampark.flink.core.scala.FlinkStreamingimport org.apache.streampark.flink.core.scala.sink.HttpSinkimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.scala.DataStreamobject HttpSinkApp extends FlinkStreaming {override def handle(): Unit = {//接入数据val source = context.addSource(new TestSource)val value: DataStream[String] = source.map(x => s"http://127.0.0.1:8080?userId=(${x.userId}&siteId=${x.siteId})")// 通过调用HttpSink和http协议对应的方法来写入数据HttpSink().post(value).setParallelism(1)}}
由于http一次只能写入一条数据,延迟比较高,不适合大数据量写入,需设置合理阈值提高性能
由于httpSink异步写入失败会重新将数据添加至缓存队列,可能造成同一窗口数据分两批次写入,实时性要求高的场景建议全面测试
异步写入数据达到重试最大值后,会将数据备份至外部组件,在此时才会初始化组件连接,建议确保failover 组件的可用性
其他配置
其他的所有的配置都必须遵守 StreamPark 配置,具体可配置项和各个参数的作用请参考项目配置
