Skip to content

Commit

Permalink
Merge pull request #94 from Aiven-Open/jjaakola-aiven-add-es8-compati…
Browse files Browse the repository at this point in the history
…bility

feat: add ElasticSearch 8 compatibility.

This breaks compatibility with ElasticSearch <= 6.
  • Loading branch information
eliax1996 authored Apr 11, 2024
2 parents 2104e7f + af53041 commit dac60cd
Show file tree
Hide file tree
Showing 17 changed files with 1,016 additions and 980 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ The project originates from Confluent [kafka-connect-elasticsearch](https://gith

# Documentation

TBD
Supported Elasticsearch versions are 7.17.0+

# Contribute

[Source Code](https://github.com/aiven/elasticsearch-connector-for-apache-kafka)
[Source Code](https://github.com/Aiven-Open/elasticsearch-connector-for-apache-kafka)

[Issue Tracker](https://github.com/aiven/elasticsearch-connector-for-apache-kafka)
[Issue Tracker](https://github.com/Aiven-Open/elasticsearch-connector-for-apache-kafka)

# License

Expand Down
59 changes: 25 additions & 34 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ java {
}

wrapper {
distributionType = 'ALL'
distributionType = "ALL"
doLast {
def sha256Sum = new String(new URL("${distributionUrl}.sha256").bytes)
propertiesFile << "distributionSha256Sum=${sha256Sum}\n"
Expand All @@ -58,14 +58,14 @@ wrapper {

ext {
kafkaVersion = "2.2.0"
slf4jVersion = "1.7.36"
elasticSearchVersion = "2.4.1"
luceneVersion = "5.5.2"
jestVersion = "6.3.1"
slf4jVersion = "2.0.12"
elasticJavaClientVersion = "7.17.18"
testContainersElasticVersion = "1.19.6"
carrotsearchVersion = "2.8.1"
}

compileJava {
options.compilerArgs = ['-Xlint:all', '-Werror']
options.compilerArgs = ["-Xlint:all", "-Werror"]
}

checkstyle {
Expand All @@ -78,29 +78,23 @@ jacoco {
}

dependencies {
compileOnly "org.apache.kafka:connect-api:$kafkaVersion"
compileOnly "org.apache.kafka:connect-json:$kafkaVersion"
compileOnly("org.apache.kafka:connect-api:$kafkaVersion")
compileOnly("org.apache.kafka:connect-json:$kafkaVersion")

implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "io.searchbox:jest:$jestVersion"
implementation("org.slf4j:slf4j-api:$slf4jVersion")
implementation("co.elastic.clients:elasticsearch-java:$elasticJavaClientVersion")

testImplementation("junit:junit:4.13.2") {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
exclude(group: "org.hamcrest", module: "hamcrest-core")
}
testImplementation "org.hamcrest:hamcrest-all:1.3"
testImplementation "org.mockito:mockito-core:5.11.0"
testImplementation "org.mockito:mockito-all:1.10.19"

testImplementation "org.apache.kafka:connect-api:$kafkaVersion"
testImplementation "org.apache.kafka:connect-json:$kafkaVersion"
testImplementation "org.apache.lucene:lucene-test-framework:$luceneVersion"
testImplementation "com.fasterxml.jackson.core:jackson-core:2.17.0"
testImplementation "com.fasterxml.jackson.core:jackson-databind:2.17.0"
testImplementation "com.fasterxml.jackson.core:jackson-annotations:2.17.0"
testImplementation "org.elasticsearch:elasticsearch:$elasticSearchVersion:tests"
testImplementation "org.elasticsearch:elasticsearch:$elasticSearchVersion"
testImplementation "org.apache.lucene:lucene-expressions:$luceneVersion"
testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"
testImplementation("org.hamcrest:hamcrest-all:1.3")
testImplementation("org.mockito:mockito-core:5.11.0")
testImplementation("org.mockito:mockito-all:1.10.19")

testImplementation("org.apache.kafka:connect-json:$kafkaVersion")
testImplementation("org.testcontainers:elasticsearch:$testContainersElasticVersion")
testImplementation("com.carrotsearch.randomizedtesting:randomizedtesting-runner:$carrotsearchVersion")
testRuntimeOnly("org.slf4j:slf4j-log4j12:$slf4jVersion")
}

distributions {
Expand Down Expand Up @@ -154,7 +148,7 @@ publishing {
}

processResources {
filesMatching('elasticsearch-connector-for-apache-kafka-version.properties') {
filesMatching("elasticsearch-connector-for-apache-kafka-version.properties") {
expand(version: version)
}
}
Expand All @@ -167,13 +161,10 @@ jar {
}
}

def elasticsearch7Test = tasks.register("elasticsearch7Test", Test) {
environment("ELASTIC_TEST_CONTAINER_VERSION", "7.17.0")
}

test {

//we do not need to check classpath hell for testing
systemProperty "tests.jarhell.check", "false"

//tests.security.manager is true all ElasticsearchSinkTestBase aware test hang
systemProperty "tests.security.manager", "false"

tasks.named("check") {
dependsOn elasticsearch7Test
}
6 changes: 3 additions & 3 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
<!-- switch statements on types exceed maximum complexity -->
<suppress
checks="(CyclomaticComplexity)"
files="(JestElasticsearchClient|Mapping|JestElasticsearchClient).java"
files="(Mapping).java"
/>

<!-- TODO: Undecided if this is too much -->
<suppress
checks="(ClassDataAbstractionCoupling)"
files="(BulkProcessor|JestElasticsearchClient).java"
files="(BulkProcessor).java"
/>

<!-- TODO: Pass some parameters in common config object? -->
Expand All @@ -25,7 +25,7 @@
/>

<suppress
checks="(ClassFanOutComplexity)" files="(JestElasticsearchClient|JestElasticsearchClientTest).java"
checks="(ClassFanOutComplexity)" files="(ElasticsearchClientWrapper|ElasticsearchClientWrapperTest|BulkProcessor|ElasticsearchSinkTask|DataConverter).java"
/>

</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import io.aiven.connect.elasticsearch.bulk.BulkRequest;
import io.aiven.connect.elasticsearch.bulk.BulkResponse;

import com.google.gson.JsonObject;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import com.fasterxml.jackson.databind.JsonNode;

public interface ElasticsearchClient extends AutoCloseable {

enum Version {
ES_V1, ES_V2, ES_V5, ES_V6, ES_V7
ES_V1, ES_V2, ES_V5, ES_V6, ES_V7, ES_V8
}

/**
Expand Down Expand Up @@ -65,7 +67,7 @@ enum Version {
* @param type the type
* @throws IOException if the client cannot execute the request
*/
JsonObject getMapping(String index, String type) throws IOException;
Property getMapping(String index, String type) throws IOException;

/**
* Creates a bulk request for the list of {@link IndexableRecord} records.
Expand All @@ -87,13 +89,18 @@ enum Version {
/**
* Executes a search.
*
* @param query the search query
* @param index the index to search
* @param type the type to search
* @return the search result
* @throws IOException if the client cannot execute the request
*/
JsonObject search(String query, String index, String type) throws IOException;
SearchResponse<JsonNode> search(String index) throws IOException;

/**
* Refreshes the index.
*
* @param index the index to refresh
*/
void refreshIndex(String index) throws IOException;

/**
* Shuts down the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.kafka.connect.sink.SinkTask;

import io.aiven.connect.elasticsearch.bulk.BulkProcessor;
import io.aiven.connect.elasticsearch.jest.JestElasticsearchClient;
import io.aiven.connect.elasticsearch.clientwrapper.ElasticsearchClientWrapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,7 +55,7 @@ public void start(final Map<String, String> props) {
start(props, null);
}

@SuppressWarnings("deprecation")
@SuppressWarnings("deprecation") //TOPIC_INDEX_MAP_CONFIG
// public for testing
public void start(final Map<String, String> props, final ElasticsearchClient client) {
try {
Expand Down Expand Up @@ -121,7 +121,7 @@ public void start(final Map<String, String> props, final ElasticsearchClient cli
if (client != null) {
this.client = client;
} else {
this.client = new JestElasticsearchClient(props);
this.client = new ElasticsearchClientWrapper(config);
}

final ElasticsearchWriter.Builder builder = new ElasticsearchWriter.Builder(this.client)
Expand Down Expand Up @@ -178,6 +178,10 @@ public void close(final Collection<TopicPartition> partitions) {
log.debug("Closing the task for topic partitions: {}", partitions);
}

public void refresh(final String index) throws IOException {
client.refreshIndex(index);
}

@Override
public void stop() throws ConnectException {
log.info("Stopping ElasticsearchSinkTask.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand All @@ -31,7 +32,6 @@

import io.aiven.connect.elasticsearch.bulk.BulkProcessor;

import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,7 +103,7 @@ public class ElasticsearchWriter {
behaviorOnMalformedDoc
);

existingMappings = Sets.newHashSet();
existingMappings = new HashSet<>();
}

public static class Builder {
Expand Down Expand Up @@ -349,7 +349,7 @@ public void createIndicesForTopics(final Set<String> assignedTopics) {
}

private Set<String> indicesForTopics(final Set<String> assignedTopics) {
final Set<String> indices = Sets.newHashSet();
final Set<String> indices = new HashSet<>();
for (final String topic : assignedTopics) {
indices.add(convertTopicToIndexName(topic));
}
Expand Down
22 changes: 11 additions & 11 deletions src/main/java/io/aiven/connect/elasticsearch/Mapping.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;

import co.elastic.clients.elasticsearch._types.mapping.Property;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.JsonObject;

public class Mapping {

Expand All @@ -43,7 +43,7 @@ public class Mapping {
* @param index The index to write to Elasticsearch.
* @param type The type to create mapping for.
* @param schema The schema used to infer mapping.
* @throws IOException from underlying JestClient
* @throws IOException from underlying client
*/
public static void createMapping(
final ElasticsearchClient client,
Expand All @@ -57,7 +57,7 @@ public static void createMapping(
/**
* Get the JSON mapping for given index and type. Returns {@code null} if it does not exist.
*/
public static JsonObject getMapping(final ElasticsearchClient client, final String index, final String type)
public static Property getMapping(final ElasticsearchClient client, final String index, final String type)
throws IOException {
return client.getMapping(index, type);
}
Expand All @@ -67,7 +67,7 @@ public static JsonObject getMapping(final ElasticsearchClient client, final Stri
*
* @param schema The schema used to infer mapping.
*/
public static JsonNode inferMapping(final ElasticsearchClient client, final Schema schema) {
public static JsonNode inferMapping(final ElasticsearchClient.Version version, final Schema schema) {
if (schema == null) {
throw new DataException("Cannot infer mapping without schema.");
}
Expand All @@ -83,26 +83,26 @@ public static JsonNode inferMapping(final ElasticsearchClient client, final Sche
final ObjectNode fields = JsonNodeFactory.instance.objectNode();
switch (schemaType) {
case ARRAY:
return inferMapping(client, schema.valueSchema());
return inferMapping(version, schema.valueSchema());
case MAP:
properties.set("properties", fields);
fields.set(ElasticsearchSinkConnectorConstants.MAP_KEY, inferMapping(client, schema.keySchema()));
fields.set(ElasticsearchSinkConnectorConstants.MAP_VALUE, inferMapping(client, schema.valueSchema()));
fields.set(ElasticsearchSinkConnectorConstants.MAP_KEY, inferMapping(version, schema.keySchema()));
fields.set(ElasticsearchSinkConnectorConstants.MAP_VALUE, inferMapping(version, schema.valueSchema()));
return properties;
case STRUCT:
properties.set("properties", fields);
for (final Field field : schema.fields()) {
fields.set(field.name(), inferMapping(client, field.schema()));
fields.set(field.name(), inferMapping(version, field.schema()));
}
return properties;
default:
final String esType = getElasticsearchType(client, schemaType);
final String esType = getElasticsearchType(version, schemaType);
return inferPrimitive(esType, schema.defaultValue());
}
}

// visible for testing
protected static String getElasticsearchType(final ElasticsearchClient client,
protected static String getElasticsearchType(final ElasticsearchClient.Version version,
final Schema.Type schemaType) {
switch (schemaType) {
case BOOLEAN:
Expand All @@ -120,7 +120,7 @@ protected static String getElasticsearchType(final ElasticsearchClient client,
case FLOAT64:
return ElasticsearchSinkConnectorConstants.DOUBLE_TYPE;
case STRING:
switch (client.getVersion()) {
switch (version) {
case ES_V1:
case ES_V2:
return ElasticsearchSinkConnectorConstants.STRING_TYPE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,18 @@
* limitations under the License.
*/

package io.aiven.connect.elasticsearch.jest;
package io.aiven.connect.elasticsearch.clientwrapper;

import io.aiven.connect.elasticsearch.bulk.BulkRequest;

import io.searchbox.core.Bulk;
public class ElasticsearchBulkRequest implements BulkRequest {
private final co.elastic.clients.elasticsearch.core.BulkRequest bulkRequest;

public class JestBulkRequest implements BulkRequest {

private final Bulk bulk;

public JestBulkRequest(final Bulk bulk) {
this.bulk = bulk;
public ElasticsearchBulkRequest(final co.elastic.clients.elasticsearch.core.BulkRequest bulkRequest) {
this.bulkRequest = bulkRequest;
}

public Bulk getBulk() {
return bulk;
public co.elastic.clients.elasticsearch.core.BulkRequest getBulkRequest() {
return bulkRequest;
}
}
Loading

0 comments on commit dac60cd

Please sign in to comment.