Skip to content

Commit

Permalink
[Feature][Connector-V2]remove duplicate testcase,add Assert sink(apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
iture123 committed Sep 4, 2022
1 parent 19a118d commit 28e821f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.seatunnel.e2e.flink.v2.elasticsearch;

import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.e2e.flink.FlinkContainer;

import com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -40,23 +42,40 @@ public class ElasticsearchSourceToConsoleIT extends FlinkContainer {

@SuppressWarnings({"checkstyle:MagicNumber", "checkstyle:Indentation"})
@BeforeEach
public void startElasticsearchContainer() throws InterruptedException{
public void startElasticsearchContainer() throws InterruptedException {
container = new ElasticsearchContainer(DockerImageName.parse("elasticsearch:6.8.23").asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"))
.withNetwork(NETWORK)
.withNetworkAliases("elasticsearch")
.withLogConsumer(new Slf4jLogConsumer(LOGGER));
container.start();
LOGGER.info("Elasticsearch container started");
Thread.sleep(5000L);
createIndexDocs();
}

/**
* create a index,and bulk some documents
*/
private void createIndexDocs() {
EsRestClient esRestClient = EsRestClient.createInstance(Lists.newArrayList(container.getHttpHostAddress()), "", "");
String requestBody = "{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" +
"{\"name\":\"EbvYoFkXtS\",\"age\":18}\n" +
"{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" +
"{\"name\":\"LjFMprGLJZ\",\"age\":19}\n" +
"{\"index\":{\"_index\":\"st_index\",\"_type\":\"st\"}}\n" +
"{\"name\":\"uJTtAVuSyI\",\"age\":20}\n";
esRestClient.bulk(requestBody);
try {
esRestClient.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Test
public void testElasticsearchSourceToConsoleSink() throws IOException, InterruptedException {
Container.ExecResult sinkEsResult = executeSeaTunnelFlinkJob("/elasticsearch/fakesource_to_elasticsearch.conf");
Assertions.assertEquals(0, sinkEsResult.getExitCode());
Container.ExecResult sourceEsResult = executeSeaTunnelFlinkJob("/elasticsearch/elasticsearch_to_console.conf");
Assertions.assertEquals(0, sourceEsResult.getExitCode());
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/elasticsearch/elasticsearch_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,19 @@ transform {

sink {
Console {}

Assert {
rules = [
{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}

0 comments on commit 28e821f

Please sign in to comment.