Skip to content

Commit

Permalink
[Feature][Connector-V2] new connecotor of Elasticsearch source(apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
iture123 committed Aug 28, 2022
1 parent ead3d68 commit d270890
Show file tree
Hide file tree
Showing 18 changed files with 892 additions and 38 deletions.
58 changes: 58 additions & 0 deletions docs/en/connector-v2/source/Elasticsearch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Elasticsearch

## Description

Read data from `Elasticsearch`.

:::tip

Engine Supported

* supported `ElasticSearch version is >= 2.x and < 8.x`

:::

## Options

| name | type | required | default value |
|-------------|--------| -------- |---------------|
| hosts | array | yes | - |
| username | string | no | |
| password | string | no | |
| index | string | yes | - |
| source | array | yes | - |
| scroll_time | string | no | 1m |
| scroll_size | int | no | 100 |



### hosts [array]
`Elasticsearch` cluster http address, the format is `host:port` , allowing multiple hosts to be specified. Such as `["host1:9200", "host2:9200"]`.

### username [string]
x-pack username

### password [string]
x-pack password

### index [string]
`Elasticsearch` index name, support * fuzzy matching

### source [array]
The fields of index.
You can get the document id by specifying the field `_id`.If sink _id to other index,you need specify an alias for _id due to the `Elasticsearch` limit.

### scroll_time [String]
Amount of time `Elasticsearch` will keep the search context alive for scroll requests.

### scroll_size [int]
Maximum number of hits to be returned with each `Elasticsearch` scroll request.

## Examples
```bash
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-*"
source = ["_id","name","age"]
}
```
3 changes: 2 additions & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ seatunnel.source.OssFile = connector-file-oss
seatunnel.source.Pulsar = connector-pulsar
seatunnel.source.Hudi = connector-hudi
seatunnel.sink.DingTalk = connector-dingtalk
seatunnel.sink.elasticsearch = connector-elasticsearch
seatunnel.source.Elasticsearch = connector-elasticsearch
seatunnel.sink.Elasticsearch = connector-elasticsearch
seatunnel.source.IoTDB = connector-iotdb
seatunnel.sink.IoTDB = connector-iotdb
seatunnel.sink.Neo4j = connector-neo4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,61 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.Asserts;
import org.apache.http.util.EntityUtils;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.BulkElasticsearchException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetElasticsearchVersionException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.GetIndexDocsCountException;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ScrollRequestException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

import java.io.IOException;
import java.util.List;
import java.util.*;

public class EsRestClient {

private static EsRestClient esRestClient;
private static RestClient restClient;
private final RestClient restClient;

private EsRestClient() {
private EsRestClient(RestClient restClient) {
this.restClient = restClient;
}

public static EsRestClient createInstance(Config pluginConfig){
List<String> hosts = pluginConfig.getStringList(EsClusterConnectionConfig.HOSTS);
String username = null;
String password = null;
if (pluginConfig.hasPath(EsClusterConnectionConfig.USERNAME)) {
username = pluginConfig.getString(EsClusterConnectionConfig.USERNAME);
if (pluginConfig.hasPath(EsClusterConnectionConfig.PASSWORD)) {
password = pluginConfig.getString(EsClusterConnectionConfig.PASSWORD);
}
}
return createInstance(hosts, username, password);
}

public static EsRestClient createInstance(List<String> hosts, String username, String password) {
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
return new EsRestClient(restClientBuilder.build());
}


private static RestClientBuilder getRestClientBuilder(List<String> hosts, String username, String password) {
HttpHost[] httpHosts = new HttpHost[hosts.size()];
for (int i = 0; i < hosts.size(); i++) {
Expand All @@ -67,14 +94,6 @@ private static RestClientBuilder getRestClientBuilder(List<String> hosts, String
return builder;
}

public static EsRestClient getInstance(List<String> hosts, String username, String password) {
if (restClient == null) {
RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, username, password);
restClient = restClientBuilder.build();
esRestClient = new EsRestClient();
}
return esRestClient;
}

public BulkResponse bulk(String requestBody) {
Request request = new Request("POST", "_bulk");
Expand Down Expand Up @@ -102,7 +121,7 @@ public BulkResponse bulk(String requestBody) {
/**
* @return version.number, example:2.0.0
*/
public static String getClusterVersion() {
public String getClusterVersion() {
Request request = new Request("GET", "/");
try {
Response response = restClient.performRequest(request);
Expand All @@ -120,4 +139,113 @@ public void close() throws IOException {
restClient.close();
}

/**
* first time to request search documents by scroll
* call /${index}/_search?scroll=${scroll}
*
* @param index index name
* @param source select fields
* @param scrollTime such as:1m
* @param scrollSize fetch documents count in one request
*/
public ScrollResult searchByScroll(String index, List<String> source, String scrollTime, int scrollSize) {
Map<String, Object> param = new HashMap<>();
Map<String, Object> query = new HashMap<>();
query.put("match_all", new HashMap<String, String>());
param.put("query", query);
param.put("_source", source);
param.put("sort", new String[]{"_doc"});
param.put("size", scrollSize);
String endpoint = index + "/_search?scroll=" + scrollTime;
ScrollResult scrollResult = getDocsFromScrollRequest(endpoint, JsonUtils.toJsonString(param));
return scrollResult;
}


/**
* scroll to get result
* call _search/scroll
*
* @param scrollId the scroll id of the last request
* @param scrollTime such as:1m
*/
public ScrollResult searchWithScrollId(String scrollId, String scrollTime) {
Map<String, String> param = new HashMap<>();
param.put("scroll_id", scrollId);
param.put("scroll", scrollTime);
ScrollResult scrollResult = getDocsFromScrollRequest("_search/scroll", JsonUtils.toJsonString(param));
return scrollResult;
}


private ScrollResult getDocsFromScrollRequest(String endpoint, String requestBody) {
Request request = new Request("POST", endpoint);
request.setJsonEntity(requestBody);
try {
Response response = restClient.performRequest(request);
if (response == null) {
throw new ScrollRequestException("POST " + endpoint + " response null");
}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
ObjectNode responseJson = JsonUtils.parseObject(entity);

JsonNode shards = responseJson.get("_shards");
int totalShards = shards.get("total").intValue();
int successful = shards.get("successful").intValue();
Asserts.check(totalShards == successful, String.format("POST %s,total shards(%d)!= successful shards(%d)", endpoint, totalShards, successful));

ScrollResult scrollResult = getDocsFromScrollResponse(responseJson);
return scrollResult;
} else {
throw new ScrollRequestException(String.format("POST %s response status code=%d,request boy=%s", endpoint, response.getStatusLine().getStatusCode(), requestBody));
}
} catch (IOException e) {
throw new ScrollRequestException(String.format("POST %s error,request boy=%s", endpoint, requestBody), e);

}
}

private ScrollResult getDocsFromScrollResponse(ObjectNode responseJson) {
ScrollResult scrollResult = new ScrollResult();
String scrollId = responseJson.get("_scroll_id").asText();
scrollResult.setScrollId(scrollId);

JsonNode hitsNode = responseJson.get("hits").get("hits");
List<Map<String, Object>> docs = new ArrayList<>(hitsNode.size());
scrollResult.setDocs(docs);

Iterator<JsonNode> iter = hitsNode.iterator();
while (iter.hasNext()) {
Map<String, Object> doc = new HashMap<>();
JsonNode hitNode = iter.next();
doc.put("_index", hitNode.get("_index").textValue());
doc.put("_id", hitNode.get("_id").textValue());
Map<String, Object> source = JsonUtils.parseObject(hitNode.get("_source").toPrettyString(), Map.class);
doc.putAll(source);
docs.add(doc);
}
return scrollResult;
}

public List<IndexDocsCount> getIndexDocsCount(String index){
String endpoint = String.format("_cat/indices/%s?h=index,docsCount&format=json",index);
Request request = new Request("GET", endpoint);
try {
Response response = restClient.performRequest(request);
if (response == null) {

}
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
String entity = EntityUtils.toString(response.getEntity());
List<IndexDocsCount> indexDocsCounts = JsonUtils.toList(entity, IndexDocsCount.class);
return indexDocsCounts;
}else{
throw new GetIndexDocsCountException(String.format("POST %s response status code=%d", endpoint, response.getStatusLine().getStatusCode()));
}
}catch (IOException ex){
throw new GetIndexDocsCountException(ex);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;

public class EsClusterConnectionConfig {

public static final String HOSTS = "hosts";

public static final String USERNAME = "username";

public static final String PASSWORD = "password";

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@ public class SinkConfig {

public static final String INDEX_TYPE = "index_type";

public static final String USERNAME = "username";

public static final String PASSWORD = "password";

public static final String HOSTS = "hosts";

public static final String MAX_BATCH_SIZE = "max_batch_size";

public static final String MAX_RETRY_SIZE = "max_retry_size";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source;

public class SourceConfig {

public static final String INDEX = "index";

public static final String SOURCE = "source";

public static final String SCROLL_TIME = "scroll_time";

public static final String SCROLL_SIZE = "scroll_size";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.source;

public class SourceConfigDeaultConstant {

public static final String SCROLLL_TIME = "1m";

public static final int SCROLLL_SIZE = 100;

}
Loading

0 comments on commit d270890

Please sign in to comment.