This connector provides sinks that can request document actions to an Elasticsearch Index. To use this connector, add one of the following dependencies to your project, depending on the version of the Elasticsearch installation:
Maven Dependency | Supported since | Elasticsearch version |
---|---|---|
flink-connector-elasticsearch_2.10 | 1.0.0 | 1.x |
flink-connector-elasticsearch2_2.10 | 1.0.0 | 2.x |
flink-connector-elasticsearch5_2.10 | 1.3.0 | 5.x |
Note that the streaming connectors are currently not part of the binary distribution. See here for information about how to package the program with the libraries for cluster execution.
Instructions for setting up an Elasticsearch cluster can be found
here.
Make sure to set and remember a cluster name. This must be set when
creating an ElasticsearchSink
for requesting document actions against your cluster.
The ElasticsearchSink
uses a TransportClient
to communicate with an
Elasticsearch cluster.
The example below shows how to configure and create a sink:
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
List<TransportAddress> transportAddresses = new ArrayList<String>();
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));
input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));
input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")
val transportAddresses = new java.util.ArrayList[TransportAddress]
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))
input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")
val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))
input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
Note how a Map
of String
s is used to configure the ElasticsearchSink
.
The configuration keys are documented in the Elasticsearch documentation
here.
Especially important is the cluster.name
parameter that must correspond to
the name of your cluster.
Also note that the example only demonstrates performing a single index
request for each incoming element. Generally, the ElasticsearchSinkFunction
can be used to perform multiple requests of different types (ex.,
DeleteRequest
, UpdateRequest
, etc.).
Internally, each parallel instance of the Flink Elasticsearch Sink uses
a BulkProcessor
to send action requests to the cluster.
This will buffer elements before sending them in bulk to the cluster. The BulkProcessor
executes bulk requests one at a time, i.e. there will be no two concurrent
flushes of the buffered actions in progress.
With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees
at-least-once delivery of action requests to Elasticsearch clusters. It does
so by waiting for all pending action requests in the BulkProcessor
at the
time of checkpoints. This effectively assures that all requests before the
checkpoint was triggered have been successfully acknowledged by Elasticsearch, before
proceeding to process more records sent to the sink.
More details on checkpoints and fault tolerance are in the fault tolerance docs.
To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
NOTE: Users can disable flushing if they wish to do so, by calling disableFlushOnCheckpoint() on the created ElasticsearchSink. Be aware that this essentially means the sink will not provide any strong delivery guarantees anymore, even with checkpoint for the topology enabled.
For Elasticsearch versions 1.x, communication using an embedded node is
also supported. See here
for information about the differences between communicating with Elasticsearch
with an embedded node and a TransportClient
.
Below is an example of how to create an ElasticsearchSink
use an
embedded node instead of a TransportClient
:
DataStream<String> input = ...;
Map<String, String> config = new HashMap<>;
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");
input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
val input: DataStream[String] = ...
val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")
input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
The difference is that now we do not need to provide a list of addresses of Elasticsearch nodes.
Elasticsearch action requests may fail due to a variety of reasons, including
temporarily saturated node queue capacity or malformed documents to be indexed.
The Flink Elasticsearch Sink allows the user to specify how request
failures are handled, by simply implementing an ActionRequestFailureHandler
and
providing it to the constructor.
Below is an example:
DataStream<String> input = ...;
input.addSink(new ElasticsearchSink<>(
config, transportAddresses,
new ElasticsearchSinkFunction<String>() {...},
new ActionRequestFailureHandler() {
@Override
void onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) throw Throwable {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// full queue; re-add document for indexing
indexer.add(action);
} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
throw failure;
}
}
}));
val input: DataStream[String] = ...
input.addSink(new ElasticsearchSink(
config, transportAddresses,
new ElasticsearchSinkFunction[String] {...},
new ActionRequestFailureHandler {
@throws(classOf[Throwable])
override def onFailure(ActionRequest action,
Throwable failure,
int restStatusCode,
RequestIndexer indexer) {
if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
// full queue; re-add document for indexing
indexer.add(action)
} else if (ExceptionUtils.containsThrowable(failure, ElasticsearchParseException.class)) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw custom exceptions
throw failure
}
}
}))
The above example will let the sink re-add requests that failed due to
queue capacity saturation and drop requests with malformed documents, without
failing the sink. For all other failures, the sink will fail. If a ActionRequestFailureHandler
is not provided to the constructor, the sink will fail for any kind of error.
Note that onFailure
is called for failures that still occur only after the
BulkProcessor
internally finishes all backoff retry attempts.
By default, the BulkProcessor
retries to a maximum of 8 attempts with
an exponential backoff. For more information on the behaviour of the
internal BulkProcessor
and how to configure it, please see the following section.
By default, if a failure handler is not provided, the sink uses a
NoOpFailureHandler
that simply fails for all kinds of exceptions. The
connector also provides a RetryRejectedExecutionFailureHandler
implementation
that always re-add requests that have failed due to queue capacity saturation.
IMPORTANT: Re-adding requests back to the internal BulkProcessor on failures will lead to longer checkpoints, as the sink will also need to wait for the re-added requests to be flushed when checkpointing. For example, when using RetryRejectedExecutionFailureHandler, checkpoints will need to wait until Elasticsearch node queues have enough capacity for all the pending requests. This also means that if re-added requests never succeed, the checkpoint will never finish.
Failure handling for Elasticsearch 1.x: For Elasticsearch 1.x, it is not feasible to match the type of the failure because the exact type could not be retrieved through the older version Java client APIs (thus, the types will be general Exceptions and only differ in the failure message). In this case, it is recommended to match on the provided REST status code.
The internal BulkProcessor
can be further configured for its behaviour
on how buffered action requests are flushed, by setting the following values in
the provided Map<String, String>
:
For versions 2.x and above, configuring how temporary request errors are retried is also supported:
EsRejectedExecutionException
.CONSTANT
or EXPONENTIAL
More information about Elasticsearch can be found here.
For the execution of your Flink program, it is recommended to build a so-called uber-jar (executable jar) containing all your dependencies (see here for further information).
However, when an uber-jar containing an Elasticsearch sink is executed,
an IllegalArgumentException
may occur, which is caused by conflicting
files of Elasticsearch and it’s dependencies in META-INF/services
:
IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist. You need to add the corresponding JAR file supporting this SPI to your classpath. The current classpath supports the following names: [es090, completion090, XBloomFilter]]
If the uber-jar is built using Maven, this issue can be avoided by adding the following to the Maven POM file in the plugins section:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>