Skip to content

Commit

Permalink
[pulsar-io] Elasticsearch sink support for Elastic 8 - switch to java…
Browse files Browse the repository at this point in the history
…-client (#35)

[pulsar-io] Elasticsearch sink support for Elastic 8 - switch to java-client

(cherry picked from commit d781147)
  • Loading branch information
nicoloboschi committed Mar 17, 2022
1 parent 31f066c commit b8c0fbc
Show file tree
Hide file tree
Showing 41 changed files with 2,300 additions and 467 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ flexible messaging model and an intuitive client API.</description>
<hdfs-offload-version3>3.3.1</hdfs-offload-version3>
<json-smart.version>2.4.7</json-smart.version>
<opensearch.version>1.2.4</opensearch.version>
<elasticsearch-java.version>8.1.0</elasticsearch-java.version>
<presto.version>334</presto.version>
<scala.binary.version>2.13</scala.binary.version>
<scala-library.version>2.13.6</scala-library.version>
Expand Down Expand Up @@ -1121,6 +1122,12 @@ flexible messaging model and an intuitive client API.</description>
<version>${opensearch.version}</version>
</dependency>

<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch-java.version}</version>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions pulsar-io/elastic-search/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@
<artifactId>opensearch-rest-high-level-client</artifactId>
</dependency>

<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ public class ElasticSearchConfig implements Serializable {
)
private boolean ignoreUnsupportedFields = false;


@FieldDoc(
required = false,
defaultValue = "AUTO",
help = "Specify compatibility mode with the ElasticSearch cluster. " +
"'AUTO' value will try to auto detect the correct compatibility mode to use. " +
"Use 'ELASTICSEARCH_7' if the target cluster is running ElasticSearch 7 or prior. " +
"Use 'ELASTICSEARCH' if the target cluster is running ElasticSearch 8 or higher. " +
"Use 'OPENSEARCH' if the target cluster is running OpenSearch."
)
private CompatibilityMode compatibilityMode = CompatibilityMode.AUTO;

public enum MalformedDocAction {
IGNORE,
WARN,
Expand All @@ -270,6 +282,13 @@ public enum NullValueAction {
FAIL
}

public enum CompatibilityMode {
AUTO,
ELASTICSEARCH_7,
ELASTICSEARCH,
OPENSEARCH
}

public static ElasticSearchConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), ElasticSearchConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,20 @@ public long randomWaitInMs(int attempt, long backoffInMs) {
return ThreadLocalRandom.current().nextLong(0, waitInMs(attempt, backoffInMs));
}

protected <T> T retry(Callable<T> function, int maxAttempts, long initialBackoff, String source) throws Exception {
public <T> T retry(Callable<T> function, int maxAttempts, long initialBackoff, String source) throws Exception {
return retry(function, maxAttempts, initialBackoff, source, new Time());
}

protected <T> T retry(Callable<T> function, int maxAttempts, long initialBackoff, String source, Time clock) throws Exception {
public <T> T retry(Callable<T> function, int maxAttempts, long initialBackoff, String source, Time clock) throws Exception {
Exception lastException = null;
for(int i = 0; i < maxAttempts || maxAttempts == -1; i++) {
try {
return function.call();
} catch (Exception e) {
lastException = e;
long backoff = randomWaitInMs(i, initialBackoff);
log.info("Trying source={} attempt {}/{} failed, waiting {}ms", source, i, maxAttempts, backoff);
log.info("Executing '{}', attempt {}/{}, next retry in {} ms, caused by: {}", source, i,
maxAttempts, backoff, e.getMessage());
clock.sleep(backoff);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.elasticsearch.client;

import lombok.Builder;
import lombok.Getter;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Processor for "bulk" call to the Elastic REST Endpoint.
*/
public interface BulkProcessor extends Closeable {

@Builder
@Getter
class BulkOperationRequest {
private long operationId;
}

@Builder
@Getter
class BulkOperationResult {
private String error;
private String index;
private String documentId;
public boolean isError() {
return error != null;
}
}

interface Listener {

void afterBulk(long executionId, List<BulkOperationRequest> bulkOperationList, List<BulkOperationResult> results);

void afterBulk(long executionId, List<BulkOperationRequest> bulkOperationList, Throwable throwable);
}

@Builder
@Getter
class BulkIndexRequest {
private long requestId;
private String index;
private String documentId;
private String documentSource;
}

@Builder
@Getter
class BulkDeleteRequest {
private long requestId;
private String index;
private String documentId;
}


void appendIndexRequest(BulkIndexRequest request) throws IOException;

void appendDeleteRequest(BulkDeleteRequest request) throws IOException;

void flush();

void awaitClose(long timeout, TimeUnit unit) throws InterruptedException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.elasticsearch.client;

import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.conn.NHttpClientConnectionManager;
import org.apache.http.nio.conn.NoopIOSessionStrategy;
import org.apache.http.nio.conn.SchemeIOSessionStrategy;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConnectionException;
import org.apache.pulsar.io.elasticsearch.ElasticSearchSslConfig;
import org.apache.pulsar.io.elasticsearch.RandomExponentialRetry;
import org.elasticsearch.client.RestClientBuilder;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public abstract class RestClient implements Closeable {

protected final ElasticSearchConfig config;
protected final ConfigCallback configCallback;
private final ScheduledExecutorService executorService;

public RestClient(ElasticSearchConfig elasticSearchConfig, BulkProcessor.Listener bulkProcessorListener) throws MalformedURLException {
this.config = elasticSearchConfig;
this.configCallback = new ConfigCallback();

// idle+expired connection evictor thread
this.executorService = Executors.newSingleThreadScheduledExecutor();
this.executorService.scheduleAtFixedRate(() -> {
configCallback.connectionManager.closeExpiredConnections();
configCallback.connectionManager.closeIdleConnections(
config.getConnectionIdleTimeoutInMs(), TimeUnit.MILLISECONDS);
},
config.getConnectionIdleTimeoutInMs(),
config.getConnectionIdleTimeoutInMs(),
TimeUnit.MILLISECONDS
);
}

public abstract boolean indexExists(String index) throws IOException;
public abstract boolean createIndex(String index) throws IOException;
public abstract boolean deleteIndex(String index) throws IOException;

public abstract boolean indexDocument(String index, String documentId, String documentSource) throws IOException;
public abstract boolean deleteDocument(String index, String documentId) throws IOException;

public abstract long totalHits(String index) throws IOException;

public abstract BulkProcessor getBulkProcessor();

public class ConfigCallback implements RestClientBuilder.HttpClientConfigCallback,
org.opensearch.client.RestClientBuilder.HttpClientConfigCallback {
final NHttpClientConnectionManager connectionManager;
final CredentialsProvider credentialsProvider;

public ConfigCallback() {
this.connectionManager = buildConnectionManager(RestClient.this.config);
this.credentialsProvider = buildCredentialsProvider(RestClient.this.config);
}

@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder builder) {
builder.setMaxConnPerRoute(config.getBulkConcurrentRequests());
builder.setMaxConnTotal(config.getBulkConcurrentRequests());
builder.setConnectionManager(connectionManager);

if (this.credentialsProvider != null) {
builder.setDefaultCredentialsProvider(credentialsProvider);
}
return builder;
}

public NHttpClientConnectionManager buildConnectionManager(ElasticSearchConfig config) {
try {
IOReactorConfig ioReactorConfig = IOReactorConfig.custom()
.setConnectTimeout(config.getConnectTimeoutInMs())
.setSoTimeout(config.getSocketTimeoutInMs())
.build();
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
PoolingNHttpClientConnectionManager connManager;
if (config.getSsl().isEnabled()) {
ElasticSearchSslConfig sslConfig = config.getSsl();
HostnameVerifier hostnameVerifier = config.getSsl().isHostnameVerification()
? SSLConnectionSocketFactory.getDefaultHostnameVerifier()
: new NoopHostnameVerifier();
String[] cipherSuites = null;
if (!Strings.isNullOrEmpty(sslConfig.getCipherSuites())) {
cipherSuites = sslConfig.getCipherSuites().split(",");
}
String[] protocols = null;
if (!Strings.isNullOrEmpty(sslConfig.getProtocols())) {
protocols = sslConfig.getProtocols().split(",");
}
Registry<SchemeIOSessionStrategy> registry = RegistryBuilder.<SchemeIOSessionStrategy>create()
.register("http", NoopIOSessionStrategy.INSTANCE)
.register("https", new SSLIOSessionStrategy(
buildSslContext(config),
protocols,
cipherSuites,
hostnameVerifier))
.build();
connManager = new PoolingNHttpClientConnectionManager(ioReactor, registry);
} else {
connManager = new PoolingNHttpClientConnectionManager(ioReactor);
}
return connManager;
} catch(Exception e) {
throw new ElasticSearchConnectionException(e);
}
}

private SSLContext buildSslContext(ElasticSearchConfig config) throws NoSuchAlgorithmException, KeyManagementException, CertificateException, KeyStoreException, IOException, UnrecoverableKeyException {
ElasticSearchSslConfig sslConfig = config.getSsl();
SSLContextBuilder sslContextBuilder = SSLContexts.custom();
if (!Strings.isNullOrEmpty(sslConfig.getProvider())) {
sslContextBuilder.setProvider(sslConfig.getProvider());
}
if (!Strings.isNullOrEmpty(sslConfig.getProtocols())) {
sslContextBuilder.setProtocol(sslConfig.getProtocols());
}
if (!Strings.isNullOrEmpty(sslConfig.getTruststorePath()) && !Strings.isNullOrEmpty(sslConfig.getTruststorePassword())) {
sslContextBuilder.loadTrustMaterial(new File(sslConfig.getTruststorePath()), sslConfig.getTruststorePassword().toCharArray());
}
if (!Strings.isNullOrEmpty(sslConfig.getKeystorePath()) && !Strings.isNullOrEmpty(sslConfig.getKeystorePassword())) {
sslContextBuilder.loadKeyMaterial(new File(sslConfig.getKeystorePath()),
sslConfig.getKeystorePassword().toCharArray(),
sslConfig.getKeystorePassword().toCharArray());
}
return sslContextBuilder.build();
}

private CredentialsProvider buildCredentialsProvider(ElasticSearchConfig config) {
if (StringUtils.isEmpty(config.getUsername()) || StringUtils.isEmpty(config.getPassword())) {
return null;
}
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(config.getUsername(), config.getPassword()));
return credentialsProvider;
}
}

@Override
public void close() {
executorService.shutdown();
}
}
Loading

0 comments on commit b8c0fbc

Please sign in to comment.