Elasticsearch is a distributed, RESTful style search and data analysis engine. Apache Flink officially provides a connector for Elasticsearch, which is used to write data to Elasticsearch, which can provide at least once Semantics.
ElasticsearchSink uses TransportClient (before 6.x) or RestHighLevelClient (starting with 6.x) to communicate with the Elasticsearch cluster. Apache StreamPark further encapsulates Flink-connector-elasticsearch6, shields development details, and simplifies write operations for Elasticsearch6 and above.
Because there are conflicts between different versions of Flink Connector Elasticsearch, StreamPark temporarily only supports write operations of Elasticsearch6 and above. If you wants to using Elasticsearch5, you need to exclude the flink-connector-elasticsearch6 dependency and introduce the flink-connector-elasticsearch5 dependency to create org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink instance writes data.
Dependency of elastic writing
Different Elasticsearch versions rely on the Flink Connector Elasticsearch is not universal, the following information comes from the flink-docs-release-1.14 document:
Elasticsearch 5.x Maven dependencies
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch5_2.11</artifactId><version>1.14.3</version></dependency>
Elasticsearch 6.x Maven dependencies
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>1.14.3</version></dependency>
Elasticsearch 7.x ans above Maven dependencies
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_2.11</artifactId><version>1.14.3</version></dependency>
Write data to Elasticsearch based on the official
The following code is taken from the official documentationElasticsearch based on the official
java, Elasticsearch 6.x ans above
import org.apache.flink.api.common.functions.RuntimeContext;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;import org.apache.http.HttpHost;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.client.Requests;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;DataStream<String> input=...;List<HttpHost> httpHosts=new ArrayList<>();httpHosts.add(new HttpHost("127.0.0.1",9200,"http"));httpHosts.add(new HttpHost("10.2.3.1",9200,"http"));// 使用 ElasticsearchSink.Builder 创建 ElasticsearchSinkElasticsearchSink.Builder<String> esSinkBuilder=new ElasticsearchSink.Builder<>(httpHosts,new ElasticsearchSinkFunction<String>(){public IndexRequest createIndexRequest(String element){Map<String, String> json=new HashMap<>();json.put("data",element);return Requests.indexRequest().index("my-index").type("my-type").source(json);}@Overridepublic void process(String element,RuntimeContext ctx,RequestIndexer indexer){indexer.add(createIndexRequest(element));}});// Configuration for batch requests; the settings below cause the sink to commit immediately after receiving each element that would otherwise be cachedesSinkBuilder.setBulkFlushMaxActions(1);A RestClientFactory that provides custom configuration information for internally created REST clientsesSinkBuilder.setRestClientFactory(restClientBuilder->{restClientBuilder.setDefaultHeaders(...)restClientBuilder.setMaxRetryTimeoutMillis(...)restClientBuilder.setPathPrefix(...)restClientBuilder.setHttpClientConfigCallback(...)});// Finally, build and add the sink to the job pipelineinput.addSink(esSinkBuilder.build());
scala, Elasticsearch 6.x 及以上
import org.apache.flink.api.common.functions.RuntimeContextimport org.apache.flink.streaming.api.datastream.DataStreamimport org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunctionimport org.apache.flink.streaming.connectors.elasticsearch.RequestIndexerimport org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkimport org.apache.http.HttpHostimport org.elasticsearch.action.index.IndexRequestimport org.elasticsearch.client.Requestsimport java.util.ArrayListimport java.util.Listval input: DataStream[String] =...val httpHosts = new java.util.ArrayList[HttpHost]httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"))val esSinkBuilder = new ElasticsearchSink.Builder[String](httpHosts,new ElasticsearchSinkFunction[String] {def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {val json = new java.util.HashMap[String, String]json.put("data", element)val rqst: IndexRequest = Requests.indexRequest.index("my-index").`type`("my-type").source(json)indexer.add(rqst)}})// Configuration for batch requests; the settings below cause the sink to commit immediately after receiving each element that would otherwise be cachedesSinkBuilder.setBulkFlushMaxActions(1)// A RestClientFactory that provides custom configuration information for internally created REST clientsesSinkBuilder.setRestClientFactory(new RestClientFactory {override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {restClientBuilder.setDefaultHeaders(...)restClientBuilder.setMaxRetryTimeoutMillis(...)restClientBuilder.setPathPrefix(...)restClientBuilder.setHttpClientConfigCallback(...)}})// Finally, build and add the sink to the job pipelineinput.addSink(esSinkBuilder.build)
The ElasticsearchSink created above is very inflexible to add parameters. StreamPark follows the concept of convention over configuration and automatic configuration.
Users only need to configure es connection parameters and Flink operating parameters, and StreamPark will automatically assemble source and sink,
which greatly simplifies development logic and improves development efficiency and maintainability.
Using Apache StreamPark™ writes to Elasticsearch
Please ensure that operation requests are sent to the Elasticsearch cluster at least once after enabling Flink checkpointing in ESSink.
1. 配置策略和连接信息
#redis sink configure# Required parameter, used by multiple nodes host1:port, host2:port,host: localhost:9200# optional parameters# es:# disableFlushOnCheckpoint: true #默认为false# auth:# user:# password:# rest:# max.retry.timeout:# path.prefix:# content.type:# connect:# request.timeout:# timeout:# cluster.name: elasticsearch# client.transport.sniff:# bulk.flush.:
2. 写入Elasticsearch
Using StreamPark writes to Elasticsearch
Scala
import org.apache.streampark.flink.core.scala.FlinkStreamingimport org.apache.streampark.flink.core.scala.sink.ESSinkimport org.apache.streampark.flink.core.scala.util.ElasticSearchUtilsimport org.apache.flink.api.scala._import org.elasticsearch.action.index.IndexRequestimport org.json4s.DefaultFormatsimport org.json4s.jackson.Serializationimport java.util.Dateobject ConnectorApp extends FlinkStreaming {implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormatsoverride def handle(): Unit = {val ds = context.fromCollection(List(OrderEntity(1, 1, 11.3d, 3.1d, new Date()),OrderEntity(2, 1, 12.3d, 3.2d, new Date()),OrderEntity(3, 1, 13.3d, 3.3d, new Date()),OrderEntity(4, 1, 14.3d, 3.4d, new Date()),OrderEntity(5, 1, 15.3d, 7.5d, new Date()),OrderEntity(6, 1, 16.3d, 3.6d, new Date()),OrderEntity(7, 1, 17.3d, 3.7d, new Date())))// es sink.........//1)Define the writing rules for Indeximplicit def indexReq(x: OrderEntity): IndexRequest = ElasticSearchUtils.indexRequest("flink_order","_doc",s"${x.id}_${x.time.getTime}",Serialization.write(x))//3)define esSink and sink = data. doneESSink().sink6[OrderEntity](ds)}case class OrderEntity(id: Int, num: Int, price: Double, gmv: Double, time: Date) extends Serializable}
Flink ElasticsearchSinkFunction可以执行多种类型请求,如(DeleteRequest、 UpdateRequest、IndexRequest),StreamPark也对以上功能进行了支持,对应方法如下:
import org.apache.streampark.flink.core.scala.StreamingContextimport org.apache.flink.streaming.api.datastream.DataStreamSinkimport org.apache.flink.streaming.api.scala.DataStreamimport org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandlerimport org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandlerimport org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactoryimport org.elasticsearch.action.delete.DeleteRequestimport org.elasticsearch.action.index.IndexRequestimport org.elasticsearch.action.update.UpdateRequestimport java.util.Propertiesimport scala.annotation.meta.paramclass ESSink(@(transient@param) context: StreamingContext,property: Properties = new Properties(),parallelism: Int = 0,name: String = null,uid: String = null) {/*** for ElasticSearch6** @param stream* @param suffix* @param restClientFactory* @param failureHandler* @param f* @tparam T* @return*/def sink6[T](stream: DataStream[T],suffix: String = "",restClientFactory: RestClientFactory = null,failureHandler: ActionRequestFailureHandler = new RetryRejectedExecutionFailureHandler)(implicit f: T => IndexRequest): DataStreamSink[T] = {new ES6Sink(context, property, parallelism, name, uid).sink[T](stream, suffix, restClientFactory, failureHandler, f)}def update6[T](stream: DataStream[T],suffix: String = "",restClientFactory: RestClientFactory = null,failureHandler: ActionRequestFailureHandler = new RetryRejectedExecutionFailureHandler)(f: T => UpdateRequest): DataStreamSink[T] = {new ES6Sink(context, property, parallelism, name, uid).sink[T](stream, suffix, restClientFactory, failureHandler, f)}def delete6[T](stream: DataStream[T],suffix: String = "",restClientFactory: RestClientFactory = null,failureHandler: ActionRequestFailureHandler = new RetryRejectedExecutionFailureHandler)(f: T => DeleteRequest): DataStreamSink[T] = {new ES6Sink(context, property, parallelism, name, uid).sink[T](stream, suffix, restClientFactory, failureHandler, f)}}
When the Flink checkpoint is enabled, the Flink Elasticsearch Sink guarantees that operation requests are sent to the Elasticsearch cluster at least once. It does this by waiting for all pending operation requests in the BulkProcessor while checkpointing. This effectively guarantees that all requests will be successfully acknowledged by Elasticsearch before triggering the checkpoint and proceeding to process records sent to the sink. If the user wants to disable flushing, they can configure disableFlushOnCheckpoint to true to do so, which essentially means that the sink will no longer provide any reliable delivery guarantees, even if checkpointing of the job topology is enabled.
Other configuration
deal with failed Elasticsearch request
An Elasticsearch operation request may fail for a variety of reasons. You can specify the failure handling logic by implementing ActionRequestFailureHandler. See Official Documentation - Handling Failed Elasticsearch Requests
Configure the internal batch processor
The BulkProcessor inside es can further configure its behavior of how to refresh the cache operation request, see the official documentation for details - Configuring the Internal Bulk Processor
Apache StreamPark™ configuration
All other configurations must comply with the StreamPark configuration. For specific configurable items and the role of each parameter, please refer to the project configuration
