Skip to content

Commit

Permalink
[Feature][Connector-V2][Elasticsearce]fix jackson conflict in spark a…
Browse files Browse the repository at this point in the history
…nd remove jackson-datatype-jsr310(apache#2553)
  • Loading branch information
iture123 committed Oct 22, 2022
1 parent d6f9282 commit 4a3b560
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 34 deletions.
18 changes: 5 additions & 13 deletions seatunnel-connectors-v2/connector-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
<artifactId>connector-elasticsearch</artifactId>
<properties>
<elasticsearch-rest-client.version>7.5.1</elasticsearch-rest-client.version>
<jackson.databind.version>2.12.6</jackson.databind.version>
<jackson-datatype-jsr310.version>2.13.3</jackson-datatype-jsr310.version>
</properties>

<dependencies>
Expand All @@ -40,16 +38,6 @@
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch-rest-client.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.databind.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson-datatype-jsr310.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
Expand All @@ -62,6 +50,10 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-json</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import java.time.temporal.Temporal;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -39,8 +38,7 @@
*/
public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer {
private final SeaTunnelRowType seaTunnelRowType;
private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
private final ObjectMapper objectMapper = new ObjectMapper();

private final IndexSerializer indexSerializer;

Expand All @@ -58,7 +56,13 @@ public String serializeRow(SeaTunnelRow row){
Map<String, Object> doc = new HashMap<>(fieldNames.length);
Object[] fields = row.getFields();
for (int i = 0; i < fieldNames.length; i++) {
doc.put(fieldNames[i], fields[i]);
Object value = fields[i];
if (value instanceof Temporal){
//jackson not support jdk8 new time api
doc.put(fieldNames[i], value.toString());
} else {
doc.put(fieldNames[i], value);
}
}

StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -73,9 +71,7 @@ public void startMongoContainer() throws Exception {
LOGGER.info("Elasticsearch container started");
esRestClient = EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()), "", "");
testDataset = generateTestDataSet();
Thread.sleep(5000L);
createIndexDocs();

}

/**
Expand Down Expand Up @@ -129,8 +125,7 @@ private List<String> generateTestDataSet() throws JsonProcessingException, Unkno
};

List<String> documents = new ArrayList<>();
ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
ObjectMapper objectMapper = new ObjectMapper();
for (int i = 0; i < 100; i++) {
Map<String, Object> doc = new HashMap<>();
Object[] values = new Object[]{
Expand All @@ -146,8 +141,8 @@ private List<String> generateTestDataSet() throws JsonProcessingException, Unkno
Double.parseDouble("1.1"),
BigDecimal.valueOf(11, 1),
"test".getBytes(),
LocalDate.now(),
LocalDateTime.now()
LocalDate.now().toString(),
LocalDateTime.now().toString()
};
for (int j = 0; j < fiels.length; j++){
doc.put(fiels[j], values[j]);
Expand All @@ -157,7 +152,9 @@ private List<String> generateTestDataSet() throws JsonProcessingException, Unkno
return documents;
}

private List<String> readSinkData() {
private List<String> readSinkData() throws InterruptedException {
//wait for index refresh
Thread.sleep(2000);
List<String> source = Lists.newArrayList("c_map", "c_array", "c_string", "c_boolean", "c_tinyint", "c_smallint", "c_int", "c_bigint", "c_float", "c_double", "c_decimal", "c_bytes", "c_date", "c_timestamp");
ScrollResult scrollResult = esRestClient.searchByScroll("st_index2", source, "1m", 1000);
scrollResult.getDocs().forEach(x -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.seatunnel.e2e.spark.v2.elasticsearch;

import lombok.SneakyThrows;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.spark.SparkContainer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -37,11 +36,13 @@
import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.shaded.org.apache.commons.lang3.ThreadUtils;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.math.BigDecimal;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
Expand Down Expand Up @@ -73,7 +74,6 @@ public void startMongoContainer() throws Exception {
LOGGER.info("Elasticsearch container started");
esRestClient = EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()), "", "");
testDataset = generateTestDataSet();
Thread.sleep(5000L);
createIndexDocs();

}
Expand Down Expand Up @@ -129,8 +129,7 @@ private List<String> generateTestDataSet() throws JsonProcessingException, Unkno
};

List<String> documents = new ArrayList<>();
ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
ObjectMapper objectMapper = new ObjectMapper();
for (int i = 0; i < 100; i++) {
Map<String, Object> doc = new HashMap<>();
Object[] values = new Object[]{
Expand All @@ -146,8 +145,8 @@ private List<String> generateTestDataSet() throws JsonProcessingException, Unkno
Double.parseDouble("1.1"),
BigDecimal.valueOf(11, 1),
"test".getBytes(),
LocalDate.now(),
LocalDateTime.now()
LocalDate.now().toString(),
LocalDateTime.now().toString()
};
for (int j = 0; j < fiels.length; j++){
doc.put(fiels[j], values[j]);
Expand All @@ -157,7 +156,10 @@ private List<String> generateTestDataSet() throws JsonProcessingException, Unkno
return documents;
}

@SneakyThrows
private List<String> readSinkData() {
//wait for index refresh
Thread.sleep(2000);
List<String> source = Lists.newArrayList("c_map", "c_array", "c_string", "c_boolean", "c_tinyint", "c_smallint", "c_int", "c_bigint", "c_float", "c_double", "c_decimal", "c_bytes", "c_date", "c_timestamp");
ScrollResult scrollResult = esRestClient.searchByScroll("st_index2", source, "1m", 1000);
scrollResult.getDocs().forEach(x -> {
Expand Down

0 comments on commit 4a3b560

Please sign in to comment.