Skip to content

Commit

Permalink
[Feature][Connector-V2]new connector of Elasticsearch source(apache#2553
Browse files Browse the repository at this point in the history
)
  • Loading branch information
iture123 committed Sep 22, 2022
1 parent 66f9ad9 commit b6688c2
Show file tree
Hide file tree
Showing 30 changed files with 1,661 additions and 43 deletions.
64 changes: 64 additions & 0 deletions docs/en/connector-v2/source/Elasticsearch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Elasticsearch

> Elasticsearch source connector
## Description

Used to read data from Elasticsearch.

support version >= 2.x and < 8.x.

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [x] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [schema projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-------------|--------| -------- |---------------|
| hosts | array | yes | - |
| username | string | no | - |
| password | string | no | - |
| index | string | yes | - |
| source | array | yes | - |
| scroll_time | string | no | 1m |
| scroll_size | int | no | 100 |



### hosts [array]
Elasticsearch cluster http address, the format is `host:port`, allowing multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.

### username [string]
x-pack username.

### password [string]
x-pack password.

### index [string]
Elasticsearch index name, support * fuzzy matching.

### source [array]
The fields of index.
You can get the document id by specifying the field `_id`.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit.

### scroll_time [String]
Amount of time Elasticsearch will keep the search context alive for scroll requests.

### scroll_size [int]
Maximum number of hits to be returned with each Elasticsearch scroll request.

## Examples
simple
```hocon
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-*"
source = ["_id","name","age"]
}
```
3 changes: 2 additions & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ seatunnel.sink.OssFile = connector-file-oss
seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
seatunnel.sink.DingTalk = connector-dingtalk
seatunnel.sink.elasticsearch = connector-elasticsearch
seatunnel.source.Elasticsearch = connector-elasticsearch
seatunnel.sink.Elasticsearch = connector-elasticsearch
seatunnel.source.IoTDB = connector-iotdb
seatunnel.sink.IoTDB = connector-iotdb
seatunnel.sink.Neo4j = connector-neo4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,38 +17,75 @@

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.client;

import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetIndexDocsCountException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ScrollRequestException;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.Asserts;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class EsRestClient {

private static EsRestClient ES_REST_CLIENT;
private static RestClient REST_CLIENT;
private static final int CONNECTION_REQUEST_TIMEOUT = 10 * 1000;

private static final int SOCKET_TIMEOUT = 5 * 60 * 1000;

private EsRestClient() {
private final RestClient restClient;

private final ObjectMapper mapper = new ObjectMapper();

private EsRestClient(RestClient restClient) {
this.restClient = restClient;
}

public static EsRestClient createInstance(Config pluginConfig) {
List<String> hosts = pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS);
String username = null;
String password = null;
if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME)) {
username = pluginConfig.getString(EsClusterConnectionConfig.USERNAME);
if (pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD)) {
password = pluginConfig.getString(EsClusterConnectionConfig.PASSWORD);
}
}
return createInstance(hosts, username, password);
}

public static EsRestClient createInstance(List<String> hosts, String username, String password) {
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
return new EsRestClient(restClientBuilder.build());
}

@SuppressWarnings("checkstyle:MagicNumber")
private static RestClientBuilder getRestClientBuilder(List<String> hosts, String username, String password) {
HttpHost[] httpHosts = new HttpHost[hosts.size()];
for (int i = 0; i < hosts.size(); i++) {
Expand All @@ -58,8 +95,8 @@ private static RestClientBuilder getRestClientBuilder(List<String> hosts, String

RestClientBuilder builder = RestClient.builder(httpHosts)
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectionRequestTimeout(10 * 1000)
.setSocketTimeout(5 * 60 * 1000));
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(SOCKET_TIMEOUT));

if (StringUtils.isNotEmpty(username)) {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
Expand All @@ -69,20 +106,11 @@ private static RestClientBuilder getRestClientBuilder(List<String> hosts, String
return builder;
}

public static EsRestClient getInstance(List<String> hosts, String username, String password) {
if (REST_CLIENT == null) {
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
REST_CLIENT = restClientBuilder.build();
ES_REST_CLIENT = new EsRestClient();
}
return ES_REST_CLIENT;
}

public BulkResponse bulk(String requestBody) {
Request request = new Request("POST", "_bulk");
request.setJsonEntity(requestBody);
try {
Response response = REST_CLIENT.performRequest(request);
Response response = restClient.performRequest(request);
if (response == null) {
throw new BulkElasticsearchException("bulk es Response is null");
}
Expand All @@ -104,10 +132,10 @@ public BulkResponse bulk(String requestBody) {
/**
* @return version.number, example:2.0.0
*/
public static String getClusterVersion() {
public String getClusterVersion() {
Request request = new Request("GET", "/");
try {
Response response = REST_CLIENT.performRequest(request);
Response response = restClient.performRequest(request);
String result = EntityUtils.toString(response.getEntity());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(result);
Expand All @@ -119,7 +147,114 @@ public static String getClusterVersion() {
}

public void close() throws IOException {
REST_CLIENT.close();
restClient.close();
}

/**
* first time to request search documents by scroll
* call /${index}/_search?scroll=${scroll}
*
* @param index index name
* @param source select fields
* @param scrollTime such as:1m
* @param scrollSize fetch documents count in one request
*/
public ScrollResult searchByScroll(String index, List<String> source, String scrollTime, int scrollSize) {
Map<String, Object> param = new HashMap<>();
Map<String, Object> query = new HashMap<>();
query.put("match_all", new HashMap<String, String>());
param.put("query", query);
param.put("_source", source);
param.put("sort", new String[]{"_doc"});
param.put("size", scrollSize);
String endpoint = index + "/_search?scroll=" + scrollTime;
ScrollResult scrollResult = getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param));
return scrollResult;
}

/**
* scroll to get result
* call _search/scroll
*
* @param scrollId the scroll id of the last request
* @param scrollTime such as:1m
*/
public ScrollResult searchWithScrollId(String scrollId, String scrollTime) {
Map<String, String> param = new HashMap<>();
param.put("scroll_id", scrollId);
param.put("scroll", scrollTime);
ScrollResult scrollResult = getDocsFromScrollRequest("_search/scroll", JsonUtils.toJsonString(param));
return scrollResult;
}

private ScrollResult getDocsFromScrollRequest(String endpoint, String requestBody) {
Request request = new Request("POST", endpoint);
request.setJsonEntity(requestBody);
try {
Response response = restClient.performRequest(request);
if (response == null) {
throw new ScrollRequestException("POST " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
ObjectNode responseJson = JsonUtils.parseObject(entity);

JsonNode shards = responseJson.get("_shards");
int totalShards = shards.get("total").intValue();
int successful = shards.get("successful").intValue();
Asserts.check(totalShards == successful, String.format("POST %s,total shards(%d)!= successful shards(%d)", endpoint, totalShards, successful));

ScrollResult scrollResult = getDocsFromScrollResponse(responseJson);
return scrollResult;
} else {
throw new ScrollRequestException(String.format("POST %s response status code=%d,request boy=%s", endpoint, response.getStatusLine().getStatusCode(), requestBody));
}
} catch (IOException e) {
throw new ScrollRequestException(String.format("POST %s error,request boy=%s", endpoint, requestBody), e);

}
}

private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) {
ScrollResult scrollResult = new ScrollResult();
String scrollId = responseJson.get("_scroll_id").asText();
scrollResult.setScrollId(scrollId);

JsonNode hitsNode = responseJson.get("hits").get("hits");
List<Map<String, Object>> docs = new ArrayList<>(hitsNode.size());
scrollResult.setDocs(docs);

Iterator<JsonNode> iter = hitsNode.iterator();
while (iter.hasNext()) {
Map<String, Object> doc = new HashMap<>();
JsonNode hitNode = iter.next();
doc.put("_index", hitNode.get("_index").textValue());
doc.put("_id", hitNode.get("_id").textValue());
Map<String, Object> source = mapper.convertValue(hitNode.get("_source"), new TypeReference<Map<String, Object>>(){});
doc.putAll(source);
docs.add(doc);
}
return scrollResult;
}

public List<IndexDocsCount> getIndexDocsCount(String index) {
String endpoint = String.format("_cat/indices/%s?h=index,docsCount&format=json", index);
Request request = new Request("GET", endpoint);
try {
Response response = restClient.performRequest(request);
if (response == null) {
throw new GetIndexDocsCountException("POST " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
List<IndexDocsCount> indexDocsCounts = JsonUtils.toList(entity, IndexDocsCount.class);
return indexDocsCounts;
} else {
throw new GetIndexDocsCountException(String.format("POST %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
}
} catch (IOException ex) {
throw new GetIndexDocsCountException(ex);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;

public class EsClusterConnectionConfig {

public static final String HOSTS = "hosts";

public static final String USERNAME = "username";

public static final String PASSWORD = "password";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source;

public class SourceConfig {

public static final String INDEX = "index";

public static final String SOURCE = "source";

public static final String SCROLL_TIME = "scroll_time";

public static final String SCROLL_SIZE = "scroll_size";

}
Loading

0 comments on commit b6688c2

Please sign in to comment.