Elasticsearch is a distributed, RESTful style search and data analysis engine. Apache Flink officially provides a connector for Elasticsearch, which is used to write data to Elasticsearch, which can provide at least once Semantics.

ElasticsearchSink uses TransportClient (before 6.x) or RestHighLevelClient (starting with 6.x) to communicate with the Elasticsearch cluster. Apache StreamPark further encapsulates Flink-connector-elasticsearch6, shields development details, and simplifies write operations for Elasticsearch6 and above.

Because there are conflicts between different versions of Flink Connector Elasticsearch, StreamPark temporarily only supports write operations of Elasticsearch6 and above. If you wants to using Elasticsearch5, you need to exclude the flink-connector-elasticsearch6 dependency and introduce the flink-connector-elasticsearch5 dependency to create org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink instance writes data.

Dependency of elastic writing

Different Elasticsearch versions rely on the Flink Connector Elasticsearch is not universal, the following information comes from the flink-docs-release-1.14 document:

Elasticsearch 5.x Maven dependencies

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
  4. <version>1.14.3</version>
  5. </dependency>

Elasticsearch 6.x Maven dependencies

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
  4. <version>1.14.3</version>
  5. </dependency>

Elasticsearch 7.x ans above Maven dependencies

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
  4. <version>1.14.3</version>
  5. </dependency>

Write data to Elasticsearch based on the official

The following code is taken from the official documentationElasticsearch based on the official

java, Elasticsearch 6.x ans above

  1. import org.apache.flink.api.common.functions.RuntimeContext;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
  5. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
  6. import org.apache.http.HttpHost;
  7. import org.elasticsearch.action.index.IndexRequest;
  8. import org.elasticsearch.client.Requests;
  9. import java.util.ArrayList;
  10. import java.util.HashMap;
  11. import java.util.List;
  12. import java.util.Map;
  13. DataStream<String> input=...;
  14. List<HttpHost> httpHosts=new ArrayList<>();
  15. httpHosts.add(new HttpHost("127.0.0.1",9200,"http"));
  16. httpHosts.add(new HttpHost("10.2.3.1",9200,"http"));
  17. // 使用 ElasticsearchSink.Builder 创建 ElasticsearchSink
  18. ElasticsearchSink.Builder<String> esSinkBuilder=new ElasticsearchSink.Builder<>(
  19. httpHosts,
  20. new ElasticsearchSinkFunction<String>(){
  21. public IndexRequest createIndexRequest(String element){
  22. Map<String, String> json=new HashMap<>();
  23. json.put("data",element);
  24. return Requests.indexRequest()
  25. .index("my-index")
  26. .type("my-type")
  27. .source(json);
  28. }
  29. @Override
  30. public void process(String element,RuntimeContext ctx,RequestIndexer indexer){
  31. indexer.add(createIndexRequest(element));
  32. }
  33. }
  34. );
  35. // Configuration for batch requests; the settings below cause the sink to commit immediately after receiving each element that would otherwise be cached
  36. esSinkBuilder.setBulkFlushMaxActions(1);
  37. A RestClientFactory that provides custom configuration information for internally created REST clients
  38. esSinkBuilder.setRestClientFactory(
  39. restClientBuilder->{
  40. restClientBuilder.setDefaultHeaders(...)
  41. restClientBuilder.setMaxRetryTimeoutMillis(...)
  42. restClientBuilder.setPathPrefix(...)
  43. restClientBuilder.setHttpClientConfigCallback(...)
  44. }
  45. );
  46. // Finally, build and add the sink to the job pipeline
  47. input.addSink(esSinkBuilder.build());

scala, Elasticsearch 6.x 及以上

  1. import org.apache.flink.api.common.functions.RuntimeContext
  2. import org.apache.flink.streaming.api.datastream.DataStream
  3. import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction
  4. import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer
  5. import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
  6. import org.apache.http.HttpHost
  7. import org.elasticsearch.action.index.IndexRequest
  8. import org.elasticsearch.client.Requests
  9. import java.util.ArrayList
  10. import java.util.List
  11. val input: DataStream[String] =
  12. ...
  13. val httpHosts = new java.util.ArrayList[HttpHost]
  14. httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
  15. httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"))
  16. val esSinkBuilder = new ElasticsearchSink.Builder[String](
  17. httpHosts,
  18. new ElasticsearchSinkFunction[String] {
  19. def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) {
  20. val json = new java.util.HashMap[String, String]
  21. json.put("data", element)
  22. val rqst: IndexRequest = Requests.indexRequest
  23. .index("my-index")
  24. .`type`("my-type")
  25. .source(json)
  26. indexer.add(rqst)
  27. }
  28. }
  29. )
  30. // Configuration for batch requests; the settings below cause the sink to commit immediately after receiving each element that would otherwise be cached
  31. esSinkBuilder.setBulkFlushMaxActions(1)
  32. // A RestClientFactory that provides custom configuration information for internally created REST clients
  33. esSinkBuilder.setRestClientFactory(new RestClientFactory {
  34. override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
  35. restClientBuilder.setDefaultHeaders(
  36. ...)
  37. restClientBuilder.setMaxRetryTimeoutMillis(
  38. ...)
  39. restClientBuilder.setPathPrefix(
  40. ...)
  41. restClientBuilder.setHttpClientConfigCallback(
  42. ...)
  43. }
  44. })
  45. // Finally, build and add the sink to the job pipeline
  46. input.addSink(esSinkBuilder.build)

The ElasticsearchSink created above is very inflexible to add parameters. StreamPark follows the concept of convention over configuration and automatic configuration. Users only need to configure es connection parameters and Flink operating parameters, and StreamPark will automatically assemble source and sink, which greatly simplifies development logic and improves development efficiency and maintainability.

Using Apache StreamPark™ writes to Elasticsearch

Please ensure that operation requests are sent to the Elasticsearch cluster at least once after enabling Flink checkpointing in ESSink.

1. 配置策略和连接信息

  1. #redis sink configure
  2. # Required parameter, used by multiple nodes host1:port, host2:port,
  3. host: localhost:9200
  4. # optional parameters
  5. # es:
  6. # disableFlushOnCheckpoint: true #默认为false
  7. # auth:
  8. # user:
  9. # password:
  10. # rest:
  11. # max.retry.timeout:
  12. # path.prefix:
  13. # content.type:
  14. # connect:
  15. # request.timeout:
  16. # timeout:
  17. # cluster.name: elasticsearch
  18. # client.transport.sniff:
  19. # bulk.flush.:

2. 写入Elasticsearch

Using StreamPark writes to Elasticsearch

Scala

  1. import org.apache.streampark.flink.core.scala.FlinkStreaming
  2. import org.apache.streampark.flink.core.scala.sink.ESSink
  3. import org.apache.streampark.flink.core.scala.util.ElasticSearchUtils
  4. import org.apache.flink.api.scala._
  5. import org.elasticsearch.action.index.IndexRequest
  6. import org.json4s.DefaultFormats
  7. import org.json4s.jackson.Serialization
  8. import java.util.Date
  9. object ConnectorApp extends FlinkStreaming {
  10. implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
  11. override def handle(): Unit = {
  12. val ds = context.fromCollection(List(
  13. OrderEntity(1, 1, 11.3d, 3.1d, new Date()),
  14. OrderEntity(2, 1, 12.3d, 3.2d, new Date()),
  15. OrderEntity(3, 1, 13.3d, 3.3d, new Date()),
  16. OrderEntity(4, 1, 14.3d, 3.4d, new Date()),
  17. OrderEntity(5, 1, 15.3d, 7.5d, new Date()),
  18. OrderEntity(6, 1, 16.3d, 3.6d, new Date()),
  19. OrderEntity(7, 1, 17.3d, 3.7d, new Date())
  20. ))
  21. // es sink.........
  22. //1)Define the writing rules for Index
  23. implicit def indexReq(x: OrderEntity): IndexRequest = ElasticSearchUtils.indexRequest(
  24. "flink_order",
  25. "_doc",
  26. s"${x.id}_${x.time.getTime}",
  27. Serialization.write(x)
  28. )
  29. //3)define esSink and sink = data. done
  30. ESSink().sink6[OrderEntity](ds)
  31. }
  32. case class OrderEntity(id: Int, num: Int, price: Double, gmv: Double, time: Date) extends Serializable
  33. }

Flink ElasticsearchSinkFunction可以执行多种类型请求,如(DeleteRequest、 UpdateRequest、IndexRequest),StreamPark也对以上功能进行了支持,对应方法如下:

  1. import org.apache.streampark.flink.core.scala.StreamingContext
  2. import org.apache.flink.streaming.api.datastream.DataStreamSink
  3. import org.apache.flink.streaming.api.scala.DataStream
  4. import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler
  5. import org.apache.flink.streaming.connectors.elasticsearch.util.RetryRejectedExecutionFailureHandler
  6. import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory
  7. import org.elasticsearch.action.delete.DeleteRequest
  8. import org.elasticsearch.action.index.IndexRequest
  9. import org.elasticsearch.action.update.UpdateRequest
  10. import java.util.Properties
  11. import scala.annotation.meta.param
  12. class ESSink(@(transient@param) context: StreamingContext,
  13. property: Properties = new Properties(),
  14. parallelism: Int = 0,
  15. name: String = null,
  16. uid: String = null) {
  17. /**
  18. * for ElasticSearch6
  19. *
  20. * @param stream
  21. * @param suffix
  22. * @param restClientFactory
  23. * @param failureHandler
  24. * @param f
  25. * @tparam T
  26. * @return
  27. */
  28. def sink6[T](stream: DataStream[T],
  29. suffix: String = "",
  30. restClientFactory: RestClientFactory = null,
  31. failureHandler: ActionRequestFailureHandler = new RetryRejectedExecutionFailureHandler)
  32. (implicit f: T => IndexRequest): DataStreamSink[T] = {
  33. new ES6Sink(context, property, parallelism, name, uid).sink[T](stream, suffix, restClientFactory, failureHandler, f)
  34. }
  35. def update6[T](stream: DataStream[T],
  36. suffix: String = "",
  37. restClientFactory: RestClientFactory = null,
  38. failureHandler: ActionRequestFailureHandler = new RetryRejectedExecutionFailureHandler)
  39. (f: T => UpdateRequest): DataStreamSink[T] = {
  40. new ES6Sink(context, property, parallelism, name, uid).sink[T](stream, suffix, restClientFactory, failureHandler, f)
  41. }
  42. def delete6[T](stream: DataStream[T],
  43. suffix: String = "",
  44. restClientFactory: RestClientFactory = null,
  45. failureHandler: ActionRequestFailureHandler = new RetryRejectedExecutionFailureHandler)
  46. (f: T => DeleteRequest): DataStreamSink[T] = {
  47. new ES6Sink(context, property, parallelism, name, uid).sink[T](stream, suffix, restClientFactory, failureHandler, f)
  48. }
  49. }

When the Flink checkpoint is enabled, the Flink Elasticsearch Sink guarantees that operation requests are sent to the Elasticsearch cluster at least once. It does this by waiting for all pending operation requests in the BulkProcessor while checkpointing. This effectively guarantees that all requests will be successfully acknowledged by Elasticsearch before triggering the checkpoint and proceeding to process records sent to the sink. If the user wants to disable flushing, they can configure disableFlushOnCheckpoint to true to do so, which essentially means that the sink will no longer provide any reliable delivery guarantees, even if checkpointing of the job topology is enabled.

Other configuration

deal with failed Elasticsearch request

An Elasticsearch operation request may fail for a variety of reasons. You can specify the failure handling logic by implementing ActionRequestFailureHandler. See Official Documentation - Handling Failed Elasticsearch Requests

Configure the internal batch processor

The BulkProcessor inside es can further configure its behavior of how to refresh the cache operation request, see the official documentation for details - Configuring the Internal Bulk Processor

Apache StreamPark™ configuration

All other configurations must comply with the StreamPark configuration. For specific configurable items and the role of each parameter, please refer to the project configuration