diff --git a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml index 02a6633d2fc..04ecf214522 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/pom.xml +++ b/seatunnel-connectors-v2/connector-elasticsearch/pom.xml @@ -30,8 +30,6 @@ connector-elasticsearch 7.5.1 - 2.12.6 - 2.13.3 @@ -40,16 +38,6 @@ elasticsearch-rest-client ${elasticsearch-rest-client.version} - - com.fasterxml.jackson.core - jackson-databind - ${jackson.databind.version} - - - com.fasterxml.jackson.datatype - jackson-datatype-jsr310 - ${jackson-datatype-jsr310.version} - org.apache.seatunnel connector-common @@ -62,6 +50,10 @@ ${project.version} compile + + org.apache.seatunnel + seatunnel-format-json + ${project.version} + - diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java index 3c1ca1e00da..fd7fe4a1f46 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/serialize/ElasticsearchRowSerializer.java @@ -28,9 +28,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import java.time.temporal.Temporal; import java.util.HashMap; import java.util.Map; @@ -39,8 +38,7 @@ */ public class ElasticsearchRowSerializer implements SeaTunnelRowSerializer { private final SeaTunnelRowType seaTunnelRowType; - private final ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) - .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + private final ObjectMapper objectMapper = new ObjectMapper(); private final IndexSerializer indexSerializer; @@ -58,7 +56,13 @@ public String serializeRow(SeaTunnelRow row){ Map doc = new HashMap<>(fieldNames.length); Object[] fields = row.getFields(); for (int i = 0; i < fieldNames.length; i++) { - doc.put(fieldNames[i], fields[i]); + Object value = fields[i]; + if (value instanceof Temporal){ + //jackson not support jdk8 new time api + doc.put(fieldNames[i], value.toString()); + } else { + doc.put(fieldNames[i], value); + } } StringBuilder sb = new StringBuilder(); diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchIT.java index 25239625933..af248c6c57c 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-elasticsearch-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/elasticsearch/ElasticsearchIT.java @@ -24,8 +24,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterEach; @@ -73,9 +71,7 @@ public void startMongoContainer() throws Exception { LOGGER.info("Elasticsearch container started"); esRestClient = EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()), "", ""); testDataset = generateTestDataSet(); - Thread.sleep(5000L); createIndexDocs(); - } /** @@ -129,8 +125,7 @@ private List generateTestDataSet() throws JsonProcessingException, Unkno }; List documents = new ArrayList<>(); - ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) - .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + ObjectMapper objectMapper = new ObjectMapper(); for (int i = 0; i < 100; i++) { Map doc = new HashMap<>(); Object[] values = new Object[]{ @@ -146,8 +141,8 @@ private List generateTestDataSet() throws JsonProcessingException, Unkno Double.parseDouble("1.1"), BigDecimal.valueOf(11, 1), "test".getBytes(), - LocalDate.now(), - LocalDateTime.now() + LocalDate.now().toString(), + LocalDateTime.now().toString() }; for (int j = 0; j < fiels.length; j++){ doc.put(fiels[j], values[j]); @@ -157,7 +152,9 @@ private List generateTestDataSet() throws JsonProcessingException, Unkno return documents; } - private List readSinkData() { + private List readSinkData() throws InterruptedException { + //wait for index refresh + Thread.sleep(2000); List source = Lists.newArrayList("c_map", "c_array", "c_string", "c_boolean", "c_tinyint", "c_smallint", "c_int", "c_bigint", "c_float", "c_double", "c_decimal", "c_bytes", "c_date", "c_timestamp"); ScrollResult scrollResult = esRestClient.searchByScroll("st_index2", source, "1m", 1000); scrollResult.getDocs().forEach(x -> { diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchIT.java index 51a730c8229..c56a3db482d 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-elasticsearch-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/elasticsearch/ElasticsearchIT.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.e2e.spark.v2.elasticsearch; +import lombok.SneakyThrows; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult; @@ -24,8 +25,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterEach; @@ -37,11 +36,13 @@ import org.testcontainers.containers.Container; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.shaded.org.apache.commons.lang3.ThreadUtils; import org.testcontainers.utility.DockerImageName; import java.io.IOException; import java.math.BigDecimal; import java.net.UnknownHostException; +import java.time.Duration; import java.time.LocalDate; import java.time.LocalDateTime; import java.util.ArrayList; @@ -73,7 +74,6 @@ public void startMongoContainer() throws Exception { LOGGER.info("Elasticsearch container started"); esRestClient = EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()), "", ""); testDataset = generateTestDataSet(); - Thread.sleep(5000L); createIndexDocs(); } @@ -129,8 +129,7 @@ private List generateTestDataSet() throws JsonProcessingException, Unkno }; List documents = new ArrayList<>(); - ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule()) - .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + ObjectMapper objectMapper = new ObjectMapper(); for (int i = 0; i < 100; i++) { Map doc = new HashMap<>(); Object[] values = new Object[]{ @@ -146,8 +145,8 @@ private List generateTestDataSet() throws JsonProcessingException, Unkno Double.parseDouble("1.1"), BigDecimal.valueOf(11, 1), "test".getBytes(), - LocalDate.now(), - LocalDateTime.now() + LocalDate.now().toString(), + LocalDateTime.now().toString() }; for (int j = 0; j < fiels.length; j++){ doc.put(fiels[j], values[j]); @@ -157,7 +156,10 @@ private List generateTestDataSet() throws JsonProcessingException, Unkno return documents; } + @SneakyThrows private List readSinkData() { + //wait for index refresh + Thread.sleep(2000); List source = Lists.newArrayList("c_map", "c_array", "c_string", "c_boolean", "c_tinyint", "c_smallint", "c_int", "c_bigint", "c_float", "c_double", "c_decimal", "c_bytes", "c_date", "c_timestamp"); ScrollResult scrollResult = esRestClient.searchByScroll("st_index2", source, "1m", 1000); scrollResult.getDocs().forEach(x -> {