diff --git a/pom.xml b/pom.xml
index 4579486f94803..9d28e848fcc2b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -156,6 +156,7 @@ flexible messaging model and an intuitive client API.
3.3.1
2.4.7
1.2.4
+ 8.1.0
334
2.13
2.13.6
@@ -1121,6 +1122,12 @@ flexible messaging model and an intuitive client API.
${opensearch.version}
+
+ co.elastic.clients
+ elasticsearch-java
+ ${elasticsearch-java.version}
+
+
joda-time
joda-time
diff --git a/pulsar-io/elastic-search/pom.xml b/pulsar-io/elastic-search/pom.xml
index 378936d3ba67a..d4fc7fcf6ed5d 100644
--- a/pulsar-io/elastic-search/pom.xml
+++ b/pulsar-io/elastic-search/pom.xml
@@ -87,6 +87,11 @@
opensearch-rest-high-level-client
+
+ co.elastic.clients
+ elasticsearch-java
+
+
org.testcontainers
elasticsearch
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index 9a797212fa128..8ac43a59afb23 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -19,80 +19,30 @@
package org.apache.pulsar.io.elasticsearch;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.http.HttpHost;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.config.Registry;
-import org.apache.http.config.RegistryBuilder;
-import org.apache.http.conn.ssl.NoopHostnameVerifier;
-import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
-import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
-import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
-import org.apache.http.impl.nio.reactor.IOReactorConfig;
-import org.apache.http.nio.conn.NHttpClientConnectionManager;
-import org.apache.http.nio.conn.NoopIOSessionStrategy;
-import org.apache.http.nio.conn.SchemeIOSessionStrategy;
-import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
-import org.apache.http.nio.reactor.ConnectingIOReactor;
-import org.apache.http.ssl.SSLContextBuilder;
-import org.apache.http.ssl.SSLContexts;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
-import org.opensearch.action.DocWriteRequest;
-import org.opensearch.action.DocWriteResponse;
-import org.opensearch.action.admin.indices.create.CreateIndexRequest;
-import org.opensearch.action.admin.indices.create.CreateIndexResponse;
-import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.opensearch.action.admin.indices.refresh.RefreshRequest;
-import org.opensearch.action.bulk.BulkItemResponse;
-import org.opensearch.action.bulk.BulkProcessor;
-import org.opensearch.action.bulk.BulkRequest;
-import org.opensearch.action.bulk.BulkResponse;
-import org.opensearch.action.delete.DeleteRequest;
-import org.opensearch.action.delete.DeleteResponse;
-import org.opensearch.action.index.IndexRequest;
-import org.opensearch.action.index.IndexResponse;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.action.search.SearchResponse;
-import org.opensearch.action.support.master.AcknowledgedResponse;
-import org.opensearch.client.Node;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.client.Requests;
-import org.opensearch.client.RestClient;
-import org.opensearch.client.RestClientBuilder;
-import org.opensearch.client.RestHighLevelClient;
-import org.opensearch.client.indices.GetIndexRequest;
-import org.opensearch.common.settings.Settings;
-import org.opensearch.common.unit.ByteSizeUnit;
-import org.opensearch.common.unit.ByteSizeValue;
-import org.opensearch.common.unit.TimeValue;
-import org.opensearch.common.xcontent.XContentType;
-import org.opensearch.index.query.QueryBuilders;
-import org.opensearch.search.SearchHit;
-import org.opensearch.search.builder.SearchSourceBuilder;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.SSLContext;
-import java.io.File;
+import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
+import org.apache.pulsar.io.elasticsearch.client.RestClient;
+import org.apache.pulsar.io.elasticsearch.client.RestClientFactory;
+import org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestClient;
+import org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient;
+
import java.io.IOException;
import java.net.MalformedURLException;
-import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@Slf4j
@@ -105,114 +55,56 @@ public class ElasticSearchClient implements AutoCloseable {
};
private ElasticSearchConfig config;
- private ConfigCallback configCallback;
- private RestHighLevelClient client;
+ private RestClient client;
+ private final RandomExponentialRetry backoffRetry;
final Set indexCache = new HashSet<>();
final Map topicToIndexCache = new HashMap<>();
- final RandomExponentialRetry backoffRetry;
- final BulkProcessor bulkProcessor;
- final ConcurrentMap, Record> records = new ConcurrentHashMap<>();
+ final ConcurrentMap records = new ConcurrentHashMap<>();
final AtomicReference irrecoverableError = new AtomicReference<>();
- final ScheduledExecutorService executorService;
+ final AtomicLong bulkOperationIdGenerator = new AtomicLong();
- ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) throws MalformedURLException {
+ public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) throws MalformedURLException {
this.config = elasticSearchConfig;
- this.configCallback = new ConfigCallback();
- this.backoffRetry = new RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
- if (config.isBulkEnabled() == false) {
- bulkProcessor = null;
- } else {
- BulkProcessor.Builder builder = BulkProcessor.builder(
- (bulkRequest, bulkResponseActionListener) -> client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener),
- new BulkProcessor.Listener() {
- @Override
- public void beforeBulk(long l, BulkRequest bulkRequest) {
- }
-
- @Override
- public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
- log.trace("Bulk request id={} size={}:", l, bulkRequest.requests().size());
- for (int i = 0; i < bulkResponse.getItems().length; i++) {
- DocWriteRequest> request = bulkRequest.requests().get(i);
- Record record = records.get(request);
- BulkItemResponse bulkItemResponse = bulkResponse.getItems()[i];
- if (bulkItemResponse.isFailed()) {
- record.fail();
- try {
- hasIrrecoverableError(bulkItemResponse);
- } catch(Exception e) {
- log.warn("Unrecoverable error:", e);
- }
- } else {
- record.ack();
- }
- records.remove(request);
- }
- }
-
- @Override
- public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
- log.warn("Bulk request id={} failed:", l, throwable);
- for (DocWriteRequest> request : bulkRequest.requests()) {
- Record record = records.remove(request);
- record.fail();
- }
- }
+ final BulkProcessor.Listener bulkListener = new BulkProcessor.Listener() {
+
+ private Record removeAndGetRecordForOperation(BulkProcessor.BulkOperationRequest operation) {
+ return records.remove(operation.getOperationId());
+
+ }
+ @Override
+ public void afterBulk(long executionId, List bulkOperationList,
+ List results) {
+ if (log.isTraceEnabled()) {
+ log.trace("Bulk request id={} size={}:", executionId, bulkOperationList.size());
+ }
+ int index = 0;
+ for (BulkProcessor.BulkOperationResult result: results) {
+ final Record record = removeAndGetRecordForOperation(bulkOperationList.get(index++));
+ if (result.isError()) {
+ record.fail();
+ checkForIrrecoverableError(result);
+ } else {
+ record.ack();
}
- )
- .setBulkActions(config.getBulkActions())
- .setBulkSize(new ByteSizeValue(config.getBulkSizeInMb(), ByteSizeUnit.MB))
- .setConcurrentRequests(config.getBulkConcurrentRequests())
- .setBackoffPolicy(new RandomExponentialBackoffPolicy(backoffRetry,
- config.getRetryBackoffInMs(),
- config.getMaxRetries()
- ));
- if (config.getBulkFlushIntervalInMs() > 0) {
- builder.setFlushInterval(new TimeValue(config.getBulkFlushIntervalInMs(), TimeUnit.MILLISECONDS));
+ }
}
- this.bulkProcessor = builder.build();
- }
- // idle+expired connection evictor thread
- this.executorService = Executors.newSingleThreadScheduledExecutor();
- this.executorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- configCallback.connectionManager.closeExpiredConnections();
- configCallback.connectionManager.closeIdleConnections(
- config.getConnectionIdleTimeoutInMs(), TimeUnit.MILLISECONDS);
- }
- },
- config.getConnectionIdleTimeoutInMs(),
- config.getConnectionIdleTimeoutInMs(),
- TimeUnit.MILLISECONDS
- );
-
- URL url = new URL(config.getElasticSearchUrl());
- log.info("ElasticSearch URL {}", url);
- RestClientBuilder builder = RestClient.builder(new HttpHost(url.getHost(), url.getPort(), url.getProtocol()))
- .setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
- @Override
- public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
- return builder
- .setContentCompressionEnabled(config.isCompressionEnabled())
- .setConnectionRequestTimeout(config.getConnectionRequestTimeoutInMs())
- .setConnectTimeout(config.getConnectTimeoutInMs())
- .setSocketTimeout(config.getSocketTimeoutInMs());
- }
- })
- .setHttpClientConfigCallback(this.configCallback)
- .setFailureListener(new RestClient.FailureListener() {
- public void onFailure(Node node) {
- log.warn("Node host={} failed", node.getHost());
- }
- });
- this.client = new RestHighLevelClient(builder);
+ @Override
+ public void afterBulk(long executionId, List bulkOperationList, Throwable throwable) {
+ log.warn("Bulk request id={} failed:", executionId, throwable);
+ for (BulkProcessor.BulkOperationRequest operation: bulkOperationList) {
+ final Record record = removeAndGetRecordForOperation(operation);
+ record.fail();
+ }
+ }
+ };
+ this.backoffRetry = new RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
+ this.client = retry(() -> RestClientFactory.createClient(config, bulkListener), -1, "client creation");
}
- void failed(Exception e) throws Exception {
+ void failed(Exception e) {
if (irrecoverableError.compareAndSet(null, e)) {
log.error("Irrecoverable error:", e);
}
@@ -222,24 +114,28 @@ boolean isFailed() {
return irrecoverableError.get() != null;
}
- void hasIrrecoverableError(BulkItemResponse bulkItemResponse) throws Exception {
+ void checkForIrrecoverableError(BulkProcessor.BulkOperationResult result) {
+ if (!result.isError()) {
+ return;
+ }
+ final String errorCause = result.getError();
for (String error : malformedErrors) {
- if (bulkItemResponse.getFailureMessage().contains(error)) {
+ if (errorCause.contains(error)) {
switch (config.getMalformedDocAction()) {
case IGNORE:
break;
case WARN:
log.warn("Ignoring malformed document index={} id={}",
- bulkItemResponse.getIndex(),
- bulkItemResponse.getId(),
- bulkItemResponse.getFailure().getCause());
+ result.getIndex(),
+ result.getDocumentId(),
+ error);
break;
case FAIL:
log.error("Failure due to the malformed document index={} id={}",
- bulkItemResponse.getIndex(),
- bulkItemResponse.getId(),
- bulkItemResponse.getFailure().getCause());
- failed(bulkItemResponse.getFailure().getCause());
+ result.getIndex(),
+ result.getDocumentId(),
+ error);
+ failed(new Exception(error));
break;
}
}
@@ -250,14 +146,19 @@ public void bulkIndex(Record record, Pair idAndDoc) throws Excep
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
- IndexRequest indexRequest = Requests.indexRequest(config.getIndexName());
- if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
- indexRequest.id(idAndDoc.getLeft());
- indexRequest.type(config.getTypeName());
- indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
-
- records.put(indexRequest, record);
- bulkProcessor.add(indexRequest);
+ final String documentId = idAndDoc.getLeft();
+ final String documentSource = idAndDoc.getRight();
+
+ final long operationId = bulkOperationIdGenerator.incrementAndGet();
+ final BulkProcessor.BulkIndexRequest bulkIndexRequest = BulkProcessor.BulkIndexRequest.builder()
+ .index(config.getIndexName())
+ .documentId(documentId)
+ .documentSource(documentSource)
+ .requestId(operationId)
+ .build();
+
+ records.put(operationId, record);
+ client.getBulkProcessor().appendIndexRequest(bulkIndexRequest);
} catch(Exception e) {
log.debug("index failed id=" + idAndDoc.getLeft(), e);
record.fail();
@@ -276,20 +177,18 @@ public boolean indexDocument(Record record, Pair
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
- IndexRequest indexRequest = Requests.indexRequest(config.getIndexName());
- if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
- indexRequest.id(idAndDoc.getLeft());
- indexRequest.type(config.getTypeName());
- indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
- IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
- if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED) ||
- indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) {
+
+ final String indexName = config.getIndexName();
+ final String documentId = idAndDoc.getLeft();
+ final String documentSource = idAndDoc.getRight();
+
+ final boolean createdOrUpdated = client.indexDocument(indexName, documentId, documentSource);
+ if (createdOrUpdated) {
record.ack();
- return true;
} else {
record.fail();
- return false;
}
+ return createdOrUpdated;
} catch (final Exception ex) {
log.error("index failed id=" + idAndDoc.getLeft(), ex);
record.fail();
@@ -301,14 +200,18 @@ public void bulkDelete(Record record, String id) throws Exception
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
- DeleteRequest deleteRequest = Requests.deleteRequest(config.getIndexName());
- deleteRequest.id(id);
- deleteRequest.type(config.getTypeName());
- records.put(deleteRequest, record);
- bulkProcessor.add(deleteRequest);
+ final long operationId = bulkOperationIdGenerator.incrementAndGet();
+ final BulkProcessor.BulkDeleteRequest bulkDeleteRequest = BulkProcessor.BulkDeleteRequest.builder()
+ .index(config.getIndexName())
+ .documentId(id)
+ .requestId(operationId)
+ .build();
+
+ records.put(operationId, record);
+ client.getBulkProcessor().appendDeleteRequest(bulkDeleteRequest);
} catch(Exception e) {
- log.debug("delete failed id=" + id, e);
+ log.debug("delete failed id: {}", id, e);
record.fail();
throw e;
}
@@ -325,20 +228,17 @@ public boolean deleteDocument(Record record, String id) throws Ex
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
- DeleteRequest deleteRequest = Requests.deleteRequest(config.getIndexName());
- deleteRequest.id(id);
- deleteRequest.type(config.getTypeName());
- DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT);
- log.debug("delete result=" + deleteResponse.getResult());
- if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED) ||
- deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
+
+
+ final boolean deleted = client.deleteDocument(config.getIndexName(), id);
+ if (deleted) {
record.ack();
- return true;
+ } else {
+ record.fail();
}
- record.fail();
- return false;
+ return deleted;
} catch (final Exception ex) {
- log.debug("index failed id=" + id, ex);
+ log.debug("index failed id: {}", id, ex);
record.fail();
throw ex;
}
@@ -348,25 +248,14 @@ public boolean deleteDocument(Record record, String id) throws Ex
* Flushes the bulk processor.
*/
public void flush() {
- bulkProcessor.flush();
+ client.getBulkProcessor().flush();
}
@Override
public void close() {
- try {
- if (bulkProcessor != null) {
- bulkProcessor.awaitClose(5000L, TimeUnit.MILLISECONDS);
- }
- } catch (InterruptedException e) {
- log.warn("Elasticsearch bulk processor close error:", e);
- }
- try {
- this.executorService.shutdown();
- if (this.client != null) {
- this.client.close();
- }
- } catch (IOException e) {
- log.warn("Elasticsearch client close error:", e);
+ if (client != null) {
+ client.close();
+ client = null;
}
}
@@ -429,154 +318,33 @@ public String topicToIndexName(String topicName) {
}
@VisibleForTesting
- public boolean createIndexIfNeeded(String indexName) throws IOException {
+ public boolean createIndexIfNeeded(String indexName) {
if (indexExists(indexName)) {
return false;
}
- final CreateIndexRequest cireq = new CreateIndexRequest(indexName);
- cireq.settings(Settings.builder()
- .put("index.number_of_shards", config.getIndexNumberOfShards())
- .put("index.number_of_replicas", config.getIndexNumberOfReplicas()));
- return retry(() -> {
- CreateIndexResponse resp = client.indices().create(cireq, RequestOptions.DEFAULT);
- if (!resp.isAcknowledged() || !resp.isShardsAcknowledged()) {
- throw new IOException("Unable to create index.");
- }
- return true;
- }, "create index");
+ return retry(() -> client.createIndex(indexName), "create index");
}
- public boolean indexExists(final String indexName) throws IOException {
- final GetIndexRequest request = new GetIndexRequest(indexName);
- return retry(() -> client.indices().exists(request, RequestOptions.DEFAULT), "index exists");
+ public boolean indexExists(final String indexName) {
+ return retry(() -> client.indexExists(indexName), "index exists");
}
- @VisibleForTesting
- protected long totalHits(String indexName) throws IOException {
- client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
- SearchResponse response = client.search(
- new SearchRequest()
- .indices(indexName)
- .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
- RequestOptions.DEFAULT);
- for(SearchHit searchHit : response.getHits()) {
- System.out.println(searchHit.getId()+": "+searchHit.getFields());
- }
- return response.getHits().getTotalHits().value;
- }
-
- @VisibleForTesting
- protected SearchResponse search(String indexName) throws IOException {
- client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
- return client.search(
- new SearchRequest()
- .indices(indexName)
- .source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
- RequestOptions.DEFAULT);
- }
-
- @VisibleForTesting
- protected AcknowledgedResponse delete(String indexName) throws IOException {
- return client.indices().delete(new DeleteIndexRequest(indexName), RequestOptions.DEFAULT);
+ private T retry(Callable callable, String source) {
+ return retry(callable, config.getMaxRetries(), source);
}
- private T retry(Callable callable, String source) {
+ private T retry(Callable callable, int maxRetries, String source) {
try {
- return backoffRetry.retry(callable, config.getMaxRetries(), config.getRetryBackoffInMs(), source);
+ return backoffRetry.retry(callable, maxRetries, config.getRetryBackoffInMs(), source);
} catch (Exception e) {
log.error("error in command {} wth retry", source, e);
throw new ElasticSearchConnectionException(source + " failed", e);
}
}
- public class ConfigCallback implements RestClientBuilder.HttpClientConfigCallback {
- final NHttpClientConnectionManager connectionManager;
- final CredentialsProvider credentialsProvider;
-
- public ConfigCallback() {
- this.connectionManager = buildConnectionManager(ElasticSearchClient.this.config);
- this.credentialsProvider = buildCredentialsProvider(ElasticSearchClient.this.config);
- }
-
- @Override
- public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder builder) {
- builder.setMaxConnPerRoute(config.getBulkConcurrentRequests());
- builder.setMaxConnTotal(config.getBulkConcurrentRequests());
- builder.setConnectionManager(connectionManager);
-
- if (this.credentialsProvider != null) {
- builder.setDefaultCredentialsProvider(credentialsProvider);
- }
- return builder;
- }
-
- public NHttpClientConnectionManager buildConnectionManager(ElasticSearchConfig config) {
- try {
- IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
- .setConnectTimeout(config.getConnectTimeoutInMs())
- .setSoTimeout(config.getSocketTimeoutInMs())
- .build();
- ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
- PoolingNHttpClientConnectionManager connManager;
- if (config.getSsl().isEnabled()) {
- ElasticSearchSslConfig sslConfig = config.getSsl();
- HostnameVerifier hostnameVerifier = config.getSsl().isHostnameVerification()
- ? SSLConnectionSocketFactory.getDefaultHostnameVerifier()
- : new NoopHostnameVerifier();
- String[] cipherSuites = null;
- if (!Strings.isNullOrEmpty(sslConfig.getCipherSuites())) {
- cipherSuites = sslConfig.getCipherSuites().split(",");
- }
- String[] protocols = null;
- if (!Strings.isNullOrEmpty(sslConfig.getProtocols())) {
- protocols = sslConfig.getProtocols().split(",");
- }
- Registry registry = RegistryBuilder.create()
- .register("http", NoopIOSessionStrategy.INSTANCE)
- .register("https", new SSLIOSessionStrategy(
- buildSslContext(config),
- protocols,
- cipherSuites,
- hostnameVerifier))
- .build();
- connManager = new PoolingNHttpClientConnectionManager(ioReactor, registry);
- } else {
- connManager = new PoolingNHttpClientConnectionManager(ioReactor);
- }
- return connManager;
- } catch(Exception e) {
- throw new ElasticSearchConnectionException(e);
- }
- }
+ RestClient getRestClient() {
+ return client;
+ }
- private SSLContext buildSslContext(ElasticSearchConfig config) throws NoSuchAlgorithmException, KeyManagementException, CertificateException, KeyStoreException, IOException, UnrecoverableKeyException {
- ElasticSearchSslConfig sslConfig = config.getSsl();
- SSLContextBuilder sslContextBuilder = SSLContexts.custom();
- if (!Strings.isNullOrEmpty(sslConfig.getProvider())) {
- sslContextBuilder.setProvider(sslConfig.getProvider());
- }
- if (!Strings.isNullOrEmpty(sslConfig.getProtocols())) {
- sslContextBuilder.setProtocol(sslConfig.getProtocols());
- }
- if (!Strings.isNullOrEmpty(sslConfig.getTruststorePath()) && !Strings.isNullOrEmpty(sslConfig.getTruststorePassword())) {
- sslContextBuilder.loadTrustMaterial(new File(sslConfig.getTruststorePath()), sslConfig.getTruststorePassword().toCharArray());
- }
- if (!Strings.isNullOrEmpty(sslConfig.getKeystorePath()) && !Strings.isNullOrEmpty(sslConfig.getKeystorePassword())) {
- sslContextBuilder.loadKeyMaterial(new File(sslConfig.getKeystorePath()),
- sslConfig.getKeystorePassword().toCharArray(),
- sslConfig.getKeystorePassword().toCharArray());
- }
- return sslContextBuilder.build();
- }
- private CredentialsProvider buildCredentialsProvider(ElasticSearchConfig config) {
- if (StringUtils.isEmpty(config.getUsername()) || StringUtils.isEmpty(config.getPassword())) {
- return null;
- }
- CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY,
- new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
- return credentialsProvider;
- }
- }
}
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
index 2a2710f6bfb0a..7d7e9f921f03a 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java
@@ -258,6 +258,18 @@ public class ElasticSearchConfig implements Serializable {
)
private boolean ignoreUnsupportedFields = false;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "AUTO",
+ help = "Specify compatibility mode with the ElasticSearch cluster. " +
+ "'AUTO' value will try to auto detect the correct compatibility mode to use. " +
+ "Use 'ELASTICSEARCH_7' if the target cluster is running ElasticSearch 7 or prior. " +
+ "Use 'ELASTICSEARCH' if the target cluster is running ElasticSearch 8 or higher. " +
+ "Use 'OPENSEARCH' if the target cluster is running OpenSearch."
+ )
+ private CompatibilityMode compatibilityMode = CompatibilityMode.AUTO;
+
public enum MalformedDocAction {
IGNORE,
WARN,
@@ -270,6 +282,13 @@ public enum NullValueAction {
FAIL
}
+ public enum CompatibilityMode {
+ AUTO,
+ ELASTICSEARCH_7,
+ ELASTICSEARCH,
+ OPENSEARCH
+ }
+
public static ElasticSearchConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), ElasticSearchConfig.class);
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialRetry.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialRetry.java
index f8c43798493ab..d51f6442bb4be 100644
--- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialRetry.java
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/RandomExponentialRetry.java
@@ -58,11 +58,11 @@ public long randomWaitInMs(int attempt, long backoffInMs) {
return ThreadLocalRandom.current().nextLong(0, waitInMs(attempt, backoffInMs));
}
- protected T retry(Callable function, int maxAttempts, long initialBackoff, String source) throws Exception {
+ public T retry(Callable function, int maxAttempts, long initialBackoff, String source) throws Exception {
return retry(function, maxAttempts, initialBackoff, source, new Time());
}
- protected T retry(Callable function, int maxAttempts, long initialBackoff, String source, Time clock) throws Exception {
+ public T retry(Callable function, int maxAttempts, long initialBackoff, String source, Time clock) throws Exception {
Exception lastException = null;
for(int i = 0; i < maxAttempts || maxAttempts == -1; i++) {
try {
@@ -70,7 +70,8 @@ protected T retry(Callable function, int maxAttempts, long initialBackoff
} catch (Exception e) {
lastException = e;
long backoff = randomWaitInMs(i, initialBackoff);
- log.info("Trying source={} attempt {}/{} failed, waiting {}ms", source, i, maxAttempts, backoff);
+ log.info("Executing '{}', attempt {}/{}, next retry in {} ms, caused by: {}", source, i,
+ maxAttempts, backoff, e.getMessage());
clock.sleep(backoff);
}
}
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/BulkProcessor.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/BulkProcessor.java
new file mode 100644
index 0000000000000..c3adcb7e1abd5
--- /dev/null
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/BulkProcessor.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.elasticsearch.client;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Processor for "bulk" call to the Elastic REST Endpoint.
+ */
+public interface BulkProcessor extends Closeable {
+
+ @Builder
+ @Getter
+ class BulkOperationRequest {
+ private long operationId;
+ }
+
+ @Builder
+ @Getter
+ class BulkOperationResult {
+ private String error;
+ private String index;
+ private String documentId;
+ public boolean isError() {
+ return error != null;
+ }
+ }
+
+ interface Listener {
+
+ void afterBulk(long executionId, List bulkOperationList, List results);
+
+ void afterBulk(long executionId, List bulkOperationList, Throwable throwable);
+ }
+
+ @Builder
+ @Getter
+ class BulkIndexRequest {
+ private long requestId;
+ private String index;
+ private String documentId;
+ private String documentSource;
+ }
+
+ @Builder
+ @Getter
+ class BulkDeleteRequest {
+ private long requestId;
+ private String index;
+ private String documentId;
+ }
+
+
+ void appendIndexRequest(BulkIndexRequest request) throws IOException;
+
+ void appendDeleteRequest(BulkDeleteRequest request) throws IOException;
+
+ void flush();
+
+ void awaitClose(long timeout, TimeUnit unit) throws InterruptedException;
+
+}
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java
new file mode 100644
index 0000000000000..5711e72420774
--- /dev/null
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.elasticsearch.client;
+
+import com.google.common.base.Strings;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.config.Registry;
+import org.apache.http.config.RegistryBuilder;
+import org.apache.http.conn.ssl.NoopHostnameVerifier;
+import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
+import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.nio.conn.NHttpClientConnectionManager;
+import org.apache.http.nio.conn.NoopIOSessionStrategy;
+import org.apache.http.nio.conn.SchemeIOSessionStrategy;
+import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
+import org.apache.http.nio.reactor.ConnectingIOReactor;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
+import org.apache.pulsar.io.elasticsearch.ElasticSearchConnectionException;
+import org.apache.pulsar.io.elasticsearch.ElasticSearchSslConfig;
+import org.apache.pulsar.io.elasticsearch.RandomExponentialRetry;
+import org.elasticsearch.client.RestClientBuilder;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public abstract class RestClient implements Closeable {
+
+ protected final ElasticSearchConfig config;
+ protected final ConfigCallback configCallback;
+ private final ScheduledExecutorService executorService;
+
+ public RestClient(ElasticSearchConfig elasticSearchConfig, BulkProcessor.Listener bulkProcessorListener) throws MalformedURLException {
+ this.config = elasticSearchConfig;
+ this.configCallback = new ConfigCallback();
+
+ // idle+expired connection evictor thread
+ this.executorService = Executors.newSingleThreadScheduledExecutor();
+ this.executorService.scheduleAtFixedRate(() -> {
+ configCallback.connectionManager.closeExpiredConnections();
+ configCallback.connectionManager.closeIdleConnections(
+ config.getConnectionIdleTimeoutInMs(), TimeUnit.MILLISECONDS);
+ },
+ config.getConnectionIdleTimeoutInMs(),
+ config.getConnectionIdleTimeoutInMs(),
+ TimeUnit.MILLISECONDS
+ );
+ }
+
+ public abstract boolean indexExists(String index) throws IOException;
+ public abstract boolean createIndex(String index) throws IOException;
+ public abstract boolean deleteIndex(String index) throws IOException;
+
+ public abstract boolean indexDocument(String index, String documentId, String documentSource) throws IOException;
+ public abstract boolean deleteDocument(String index, String documentId) throws IOException;
+
+ public abstract long totalHits(String index) throws IOException;
+
+ public abstract BulkProcessor getBulkProcessor();
+
+ public class ConfigCallback implements RestClientBuilder.HttpClientConfigCallback,
+ org.opensearch.client.RestClientBuilder.HttpClientConfigCallback {
+ final NHttpClientConnectionManager connectionManager;
+ final CredentialsProvider credentialsProvider;
+
+ public ConfigCallback() {
+ this.connectionManager = buildConnectionManager(RestClient.this.config);
+ this.credentialsProvider = buildCredentialsProvider(RestClient.this.config);
+ }
+
+ @Override
+ public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder builder) {
+ builder.setMaxConnPerRoute(config.getBulkConcurrentRequests());
+ builder.setMaxConnTotal(config.getBulkConcurrentRequests());
+ builder.setConnectionManager(connectionManager);
+
+ if (this.credentialsProvider != null) {
+ builder.setDefaultCredentialsProvider(credentialsProvider);
+ }
+ return builder;
+ }
+
+ public NHttpClientConnectionManager buildConnectionManager(ElasticSearchConfig config) {
+ try {
+ IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
+ .setConnectTimeout(config.getConnectTimeoutInMs())
+ .setSoTimeout(config.getSocketTimeoutInMs())
+ .build();
+ ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
+ PoolingNHttpClientConnectionManager connManager;
+ if (config.getSsl().isEnabled()) {
+ ElasticSearchSslConfig sslConfig = config.getSsl();
+ HostnameVerifier hostnameVerifier = config.getSsl().isHostnameVerification()
+ ? SSLConnectionSocketFactory.getDefaultHostnameVerifier()
+ : new NoopHostnameVerifier();
+ String[] cipherSuites = null;
+ if (!Strings.isNullOrEmpty(sslConfig.getCipherSuites())) {
+ cipherSuites = sslConfig.getCipherSuites().split(",");
+ }
+ String[] protocols = null;
+ if (!Strings.isNullOrEmpty(sslConfig.getProtocols())) {
+ protocols = sslConfig.getProtocols().split(",");
+ }
+ Registry registry = RegistryBuilder.create()
+ .register("http", NoopIOSessionStrategy.INSTANCE)
+ .register("https", new SSLIOSessionStrategy(
+ buildSslContext(config),
+ protocols,
+ cipherSuites,
+ hostnameVerifier))
+ .build();
+ connManager = new PoolingNHttpClientConnectionManager(ioReactor, registry);
+ } else {
+ connManager = new PoolingNHttpClientConnectionManager(ioReactor);
+ }
+ return connManager;
+ } catch(Exception e) {
+ throw new ElasticSearchConnectionException(e);
+ }
+ }
+
+ private SSLContext buildSslContext(ElasticSearchConfig config) throws NoSuchAlgorithmException, KeyManagementException, CertificateException, KeyStoreException, IOException, UnrecoverableKeyException {
+ ElasticSearchSslConfig sslConfig = config.getSsl();
+ SSLContextBuilder sslContextBuilder = SSLContexts.custom();
+ if (!Strings.isNullOrEmpty(sslConfig.getProvider())) {
+ sslContextBuilder.setProvider(sslConfig.getProvider());
+ }
+ if (!Strings.isNullOrEmpty(sslConfig.getProtocols())) {
+ sslContextBuilder.setProtocol(sslConfig.getProtocols());
+ }
+ if (!Strings.isNullOrEmpty(sslConfig.getTruststorePath()) && !Strings.isNullOrEmpty(sslConfig.getTruststorePassword())) {
+ sslContextBuilder.loadTrustMaterial(new File(sslConfig.getTruststorePath()), sslConfig.getTruststorePassword().toCharArray());
+ }
+ if (!Strings.isNullOrEmpty(sslConfig.getKeystorePath()) && !Strings.isNullOrEmpty(sslConfig.getKeystorePassword())) {
+ sslContextBuilder.loadKeyMaterial(new File(sslConfig.getKeystorePath()),
+ sslConfig.getKeystorePassword().toCharArray(),
+ sslConfig.getKeystorePassword().toCharArray());
+ }
+ return sslContextBuilder.build();
+ }
+
+ private CredentialsProvider buildCredentialsProvider(ElasticSearchConfig config) {
+ if (StringUtils.isEmpty(config.getUsername()) || StringUtils.isEmpty(config.getPassword())) {
+ return null;
+ }
+ CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
+ return credentialsProvider;
+ }
+ }
+
+ @Override
+ public void close() {
+ executorService.shutdown();
+ }
+}
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClientFactory.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClientFactory.java
new file mode 100644
index 0000000000000..4edef71564eb2
--- /dev/null
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClientFactory.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.elasticsearch.client;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
+import org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestClient;
+import org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient;
+import org.opensearch.client.Request;
+import org.opensearch.client.Response;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Map;
+
+@Slf4j
+public class RestClientFactory {
+
+ private static final ObjectMapper MAPPER = new ObjectMapper();
+
+ public static RestClient createClient(ElasticSearchConfig config, BulkProcessor.Listener bulkListener) throws IOException {
+ if (config.getCompatibilityMode() == ElasticSearchConfig.CompatibilityMode.ELASTICSEARCH) {
+ log.info("Found compatibilityMode set to '{}', using the ElasticSearch Java client.", config.getCompatibilityMode());
+ return new ElasticSearchJavaRestClient(config, bulkListener);
+ } else if (config.getCompatibilityMode() == ElasticSearchConfig.CompatibilityMode.ELASTICSEARCH_7 ||
+ config.getCompatibilityMode() == ElasticSearchConfig.CompatibilityMode.OPENSEARCH) {
+ log.info("Found compatibilityMode set to '{}', using the OpenSearch High Level Rest API Client.", config.getCompatibilityMode());
+ return new OpenSearchHighLevelRestClient(config, bulkListener);
+ }
+ log.info("Found compatibilityMode set to '{}', will try to auto detect the best client to use.", config.getCompatibilityMode());
+ try {
+ final Map jsonResponse = requestInfo(config);
+ final boolean useOpenSearchHighLevelClient = useOpenSearchHighLevelClient(jsonResponse);
+ log.info("useOpenSearchHighLevelClient={}, got info response: {}", useOpenSearchHighLevelClient, jsonResponse);
+ if (useOpenSearchHighLevelClient) {
+ return new OpenSearchHighLevelRestClient(config, bulkListener);
+ }
+ return new ElasticSearchJavaRestClient(config, bulkListener);
+ } catch (IOException ioException) {
+ log.warn("Got error while performing info request to detect Elastic version: {}",
+ ioException.getMessage());
+ throw ioException;
+ }
+ }
+
+ private static Map requestInfo(ElasticSearchConfig config) throws IOException {
+ try (final OpenSearchHighLevelRestClient openSearchHighLevelRestClient =
+ new OpenSearchHighLevelRestClient(config, null)) {
+ final Response response = openSearchHighLevelRestClient.getClient().getLowLevelClient()
+ .performRequest(new Request(HttpGet.METHOD_NAME, "/"));
+
+ return (Map) MAPPER.readValue(response.getEntity().getContent(), Map.class);
+ }
+ }
+
+ private static boolean useOpenSearchHighLevelClient(Map jsonResponse) {
+ final Map versionMap = (Map) jsonResponse.get("version");
+ final String distribution = (String) versionMap.get("distribution");
+ if (!StringUtils.isBlank(distribution)) {
+ if (distribution.equals("opensearch")) {
+ return true;
+ }
+ }
+ final String version = (String) versionMap.get("number");
+ if (StringUtils.isBlank(version)) {
+ return true;
+ }
+ final String mainVersion = version.substring(0, version.indexOf("."));
+ try {
+ final int numVersion = Integer.parseInt(mainVersion);
+ if (numVersion <= 7) {
+ return true;
+ }
+ // For Elastic 8+ use Elastic Java client
+ return false;
+ } catch (NumberFormatException nfe) {
+ log.warn("Not able to parse version: {}", mainVersion, nfe);
+ return true;
+ }
+ }
+
+
+
+}
diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
new file mode 100644
index 0000000000000..cf72aca4be1f8
--- /dev/null
+++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.elasticsearch.client.elastic;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.BulkRequest;
+import co.elastic.clients.elasticsearch.core.BulkResponse;
+import co.elastic.clients.elasticsearch.core.IndexRequest;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
+import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
+import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
+import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
+import org.apache.pulsar.io.elasticsearch.RandomExponentialRetry;
+import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class ElasticBulkProcessor implements BulkProcessor {
+ private final ElasticSearchConfig config;
+ private final ElasticsearchClient client;
+
+ private final AtomicLong executionIdGen = new AtomicLong();
+ private final int bulkActions;
+ private final long bulkSize;
+ private final List pendingOperations = new ArrayList<>();
+ private final BulkRequestHandler bulkRequestHandler;
+ private volatile boolean closed = false;
+ private final ReentrantLock lock;
+ private final ExecutorService internalExecutorService;
+ private ScheduledFuture> futureFlushTask;
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ public ElasticBulkProcessor(ElasticSearchConfig config, ElasticsearchClient client, Listener listener) {
+ this.config = config;
+ this.client = client;
+ this.lock = new ReentrantLock();
+ this.bulkActions = config.getBulkActions();
+ this.bulkSize = config.getBulkSizeInMb() * 1024 * 1024;
+ this.internalExecutorService = Executors.newFixedThreadPool(Math.max(1, config.getBulkConcurrentRequests()),
+ new DefaultThreadFactory("elastic-bulk-executor"));
+ this.bulkRequestHandler = new BulkRequestHandler(new RandomExponentialRetry(config.getMaxRetryTimeInSec()),
+ config.getBulkConcurrentRequests(), listener);
+
+ ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
+ new DefaultThreadFactory("elastic-flush-task"));
+ if (config.getBulkFlushIntervalInMs() > 0) {
+ futureFlushTask = executor.scheduleWithFixedDelay(new Flush(),
+ config.getBulkFlushIntervalInMs(),
+ config.getBulkFlushIntervalInMs(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void appendIndexRequest(BulkIndexRequest request) throws IOException {
+ final Map mapped = mapper.readValue(request.getDocumentSource(), Map.class);
+
+ final IndexOperation
+
+
+
+ co.elastic.clients
+ elasticsearch-java
+ test
+
+
+
+ org.testcontainers
+ elasticsearch
+ test
+
+
com.rabbitmq
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ElasticSearchContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ElasticSearchContainer.java
deleted file mode 100644
index aa743e77b9a4a..0000000000000
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/ElasticSearchContainer.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.containers;
-
-import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
-
-public class ElasticSearchContainer extends ChaosContainer {
-
- public static final String NAME = "ElasticSearch";
- static final Integer[] PORTS = { 9200, 9300 };
-
- private static final String IMAGE_NAME = "docker.elastic.co/elasticsearch/elasticsearch:7.13.3";
-
- public ElasticSearchContainer(String clusterName) {
- super(clusterName, IMAGE_NAME);
- }
-
- @Override
- protected void configure() {
- super.configure();
- this.withNetworkAliases(NAME)
- .withExposedPorts(PORTS)
- .withEnv("discovery.type", "single-node")
- .withCreateContainerCmdModifier(createContainerCmd -> {
- createContainerCmd.withHostName(NAME);
- createContainerCmd.withName(clusterName + "-" + NAME);
- })
- .waitingFor(new HostPortWaitStrategy());
- }
-
-}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
new file mode 100644
index 0000000000000..699f99d491350
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sinks;
+
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {
+
+ public ElasticSearch7SinkTester(boolean schemaEnable) {
+ super(schemaEnable);
+ }
+
+ @Override
+ protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+ return new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64")
+ .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
+ }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
new file mode 100644
index 0000000000000..e24f23fac89ec
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sinks;
+
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {
+
+ public ElasticSearch8SinkTester(boolean schemaEnable) {
+ super(schemaEnable);
+ }
+
+ @Override
+ protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+ return new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:8.1.0")
+ .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
+ .withEnv("xpack.security.enabled", "false")
+ .withEnv("xpack.security.http.ssl.enabled", "false");
+ }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
index 490e7bdcf95bf..06aa4174b8aab 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearchSinkTester.java
@@ -25,6 +25,13 @@
import java.util.LinkedHashMap;
import java.util.Map;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import io.vertx.core.http.RequestOptions;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
@@ -35,19 +42,17 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.tests.integration.containers.ElasticSearchContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
-import org.opensearch.action.search.SearchRequest;
-import org.opensearch.action.search.SearchResponse;
-import org.opensearch.client.RequestOptions;
-import org.opensearch.client.RestClient;
-import org.opensearch.client.RestClientBuilder;
-import org.opensearch.client.RestHighLevelClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
-public class ElasticSearchSinkTester extends SinkTester {
+public abstract class ElasticSearchSinkTester extends SinkTester {
- private RestHighLevelClient elasticClient;
+ private static final String NAME = "elastic-search";
+
+ private ElasticsearchClient elasticClient;
private boolean schemaEnable;
private final Schema> kvSchema;
@@ -73,9 +78,9 @@ public Schema> getInputTopicSchema() {
}
public ElasticSearchSinkTester(boolean schemaEnable) {
- super(ElasticSearchContainer.NAME, SinkType.ELASTIC_SEARCH);
+ super(NAME, SinkType.ELASTIC_SEARCH);
- sinkConfig.put("elasticSearchUrl", "http://" + ElasticSearchContainer.NAME + ":9200");
+ sinkConfig.put("elasticSearchUrl", "http://" + NAME + ":9200");
sinkConfig.put("indexName", "test-index");
this.schemaEnable = schemaEnable;
if (schemaEnable) {
@@ -89,11 +94,6 @@ public ElasticSearchSinkTester(boolean schemaEnable) {
}
- @Override
- protected ElasticSearchContainer createSinkService(PulsarCluster cluster) {
- return new ElasticSearchContainer(cluster.getClusterName());
- }
-
@Override
public void prepareSink() throws Exception {
RestClientBuilder builder = RestClient.builder(
@@ -101,16 +101,18 @@ public void prepareSink() throws Exception {
"localhost",
serviceContainer.getMappedPort(9200),
"http"));
- elasticClient = new RestHighLevelClient(builder);
+ ElasticsearchTransport transport = new RestClientTransport(builder.build(),
+ new JacksonJsonpMapper());
+ elasticClient = new ElasticsearchClient(transport);
}
@Override
public void validateSinkResult(Map kvs) {
- SearchRequest searchRequest = new SearchRequest("test-index");
-
Awaitility.await().untilAsserted(() -> {
- SearchResponse searchResult = elasticClient.search(searchRequest, RequestOptions.DEFAULT);
- assertTrue(searchResult.getHits().getTotalHits().value > 0, searchResult.toString());
+ SearchResponse searchResult = elasticClient.search(new SearchRequest.Builder().index("test-index")
+ .q("*:*")
+ .build(), Map.class);
+ assertTrue(searchResult.hits().total().value() > 0, searchResult.toString());
});
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
new file mode 100644
index 0000000000000..5710a438a8152
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io.sinks;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.http.HttpHost;
+import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
+import org.awaitility.Awaitility;
+import org.opensearch.action.search.SearchRequest;
+import org.opensearch.action.search.SearchResponse;
+import org.opensearch.client.RequestOptions;
+import org.opensearch.client.RestClient;
+import org.opensearch.client.RestClientBuilder;
+import org.opensearch.client.RestHighLevelClient;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.Map;
+
+import static org.testng.Assert.assertTrue;
+
+public class OpenSearchSinkTester extends ElasticSearchSinkTester {
+
+ private RestHighLevelClient elasticClient;
+
+
+ public OpenSearchSinkTester(boolean schemaEnable) {
+ super(schemaEnable);
+ }
+
+ @Override
+ protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
+ DockerImageName dockerImageName = DockerImageName.parse("opensearchproject/opensearch:1.2.4")
+ .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
+ return new ElasticsearchContainer(dockerImageName)
+ .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
+ .withEnv("bootstrap.memory_lock", "true")
+ .withEnv("plugins.security.disabled", "true");
+ }
+
+ @Override
+ public void prepareSink() throws Exception {
+ RestClientBuilder builder = RestClient.builder(
+ new HttpHost(
+ "localhost",
+ serviceContainer.getMappedPort(9200),
+ "http"));
+ elasticClient = new RestHighLevelClient(builder);
+ }
+
+ @Override
+ public void validateSinkResult(Map kvs) {
+ org.opensearch.action.search.SearchRequest searchRequest = new SearchRequest("test-index");
+
+ Awaitility.await().untilAsserted(() -> {
+ SearchResponse searchResult = elasticClient.search(searchRequest, RequestOptions.DEFAULT);
+ assertTrue(searchResult.getHits().getTotalHits().value > 0, searchResult.toString());
+ });
+ }
+
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
index f0ca3155abc95..8dbef586665b5 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
@@ -55,13 +55,33 @@ public void testJdbcSink() throws Exception {
}
@Test(groups = "sink")
- public void testElasticSearchSinkRawData() throws Exception {
- testSink(new ElasticSearchSinkTester(false), true);
+ public void testElasticSearch7SinkRawData() throws Exception {
+ testSink(new ElasticSearch7SinkTester(false), true);
}
@Test(groups = "sink")
- public void testElasticSearchSinkSchemaEnabled() throws Exception {
- testSink(new ElasticSearchSinkTester(true), true);
+ public void testElasticSearchSink7SchemaEnabled() throws Exception {
+ testSink(new ElasticSearch7SinkTester(true), true);
+ }
+
+ @Test(groups = "sink")
+ public void testElasticSearch8SinkRawData() throws Exception {
+ testSink(new ElasticSearch8SinkTester(false), true);
+ }
+
+ @Test(groups = "sink")
+ public void testElasticSearch8SinkSchemaEnabled() throws Exception {
+ testSink(new ElasticSearch8SinkTester(true), true);
+ }
+
+ @Test(groups = "sink")
+ public void testOpenSearchSinkRawData() throws Exception {
+ testSink(new OpenSearchSinkTester(false), true);
+ }
+
+ @Test(groups = "sink")
+ public void testOpenSearchSinkSchemaEnabled() throws Exception {
+ testSink(new OpenSearchSinkTester(true), true);
}
@Test(enabled = false, groups = "sink")