Skip to content

Commit

Permalink
[Feature][Connector-V2] add Elasticsearch e2e spark test(apache#2553)
Browse files Browse the repository at this point in the history
  • Loading branch information
iture123 committed Sep 4, 2022
1 parent 28e821f commit cafea1f
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 0 deletions.
7 changes: 7 additions & 0 deletions seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.17.3</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.spark.v2.elasticsearch;

import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.e2e.spark.SparkContainer;

import com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;

/**
* This test case is used to verify that the elasticsearch source is able to send data to the console.
* Make sure the SeaTunnel job can submit successfully on spark engine.
*/
public class ElasticsearchSourceToConsoleIT extends SparkContainer {

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSourceToConsoleIT.class);

private ElasticsearchContainer container;

@SuppressWarnings({"checkstyle:MagicNumber", "checkstyle:Indentation"})
@BeforeEach
public void startElasticsearchContainer() throws InterruptedException {
container = new ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch")).withNetwork(NETWORK).withNetworkAliases("elasticsearch").withLogConsumer(new Slf4jLogConsumer(LOGGER));
container.start();
LOGGER.info("Elasticsearch container started");
Thread.sleep(5000L);
createIndexDocs();
}

/**
* create a index,and bulk some documents
*/
private void createIndexDocs() {
EsRestClient esRestClient = EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()), "", "");
String requestBody = "{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" +
"{\"name\":\"EbvYoFkXtS\",\"age\":18}\n" +
"{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" +
"{\"name\":\"LjFMprGLJZ\",\"age\":19}\n" +
"{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" +
"{\"name\":\"uJTtAVuSyI\",\"age\":20}\n";
esRestClient.bulk(requestBody);
try {
esRestClient.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Test
public void testElasticsearchSourceToConsoleSink() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelSparkJob("/elasticsearch/elasticsearch_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@AfterEach
public void closeContainer() {
if (container != null) {
container.stop();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.e2e.spark.v2.elasticsearch;

import org.apache.seatunnel.e2e.spark.SparkContainer;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;

/**
* This test case is used to verify that the fake source is able to send data to the elasticsearch.
* Make sure the SeaTunnel job can submit successfully on spark engine.
*/
public class FakeSourceToElasticsearchIT extends SparkContainer {

private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToElasticsearchIT.class);

private ElasticsearchContainer container;

@SuppressWarnings({"checkstyle:MagicNumber", "checkstyle:Indentation"})
@BeforeEach
public void startElasticsearchContainer() throws InterruptedException {
container = new ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch")).withNetwork(NETWORK).withNetworkAliases("elasticsearch").withLogConsumer(new Slf4jLogConsumer(LOGGER));
container.start();
LOGGER.info("Elasticsearch container started");
Thread.sleep(5000L);
}

@Test
public void testFakeSourceToElasticsearchSink() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelSparkJob("/elasticsearch/fakesource_to_elasticsearch.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@AfterEach
public void closeContainer() {
if (container != null) {
container.stop();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
Elasticsearch {
hosts = ["elasticsearch:9200"]
index = "st_index*"
source = ["_id","name","age"]
result_table_name = "fake"
scroll_size = 100
scroll_time = "1m"
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}

transform {
sql {
sql = "select _id as doc_id,name,age from fake"
}

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
Console {}
Assert {
rules = [
{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
#execution.checkpoint.interval = 10000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
schema = {
fields {
name = "string"
age = "int"
}
}
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}

transform {
sql {
sql = "select name,age from fake"
}

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
Elasticsearch{
hosts = ["elasticsearch:9200"]
index = "st_index"
index_type = "st"
}

# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}

0 comments on commit cafea1f

Please sign in to comment.