Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/logisland 532 #553

Merged
merged 43 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
1bc36df
Fix url for jcenter.bintray.com repository: from http to https
mathieu-rossignol Feb 4, 2020
919c337
Added databricks option to support databricks run: load config from D…
mathieu-rossignol Feb 5, 2020
587814e
Introducing SparkConfigReader class to be packaged and loaded only in…
mathieu-rossignol Feb 5, 2020
25fddb0
Introducing logisland-engine-spark_2_4plus_kafka_2_4plus
mathieu-rossignol Feb 10, 2020
4a36bdb
Attempt to add -checkpointLocation option
mathieu-rossignol Feb 10, 2020
f44321a
Remove building of spark2.4_kafka2.4 engine for the moment
mathieu-rossignol Feb 11, 2020
bfdf3ec
Added spark 1.4 support in 2.x engine. Also added azure eventhubs spa…
mathieu-rossignol Feb 11, 2020
bee69e0
Saving intermediate step towards new service to support azure event hubs
mathieu-rossignol Feb 13, 2020
da0ce2e
Eventhbubs service: all config properties read
mathieu-rossignol Feb 13, 2020
1075c46
First attempt to have azure event hubs service running, will have to …
mathieu-rossignol Feb 13, 2020
ab6a254
Fixed spark 2.4 module pom mqtt dependency
mathieu-rossignol Feb 13, 2020
3491194
Using checkpoint location in azure structured stream service. Also fi…
mathieu-rossignol Feb 14, 2020
65d2f46
Support spark 2.4 engine in logisland.sh script
mathieu-rossignol Feb 14, 2020
dc360dd
Update version of spark-sql-streaming-mqtt_2.11 dependency
mathieu-rossignol Feb 17, 2020
086aeef
Removed useless parenthesis
mathieu-rossignol Feb 17, 2020
7ffb978
Added spark 2 common jar to logisland doc dependency
mathieu-rossignol Feb 17, 2020
280e34d
Use right scala version
mathieu-rossignol Feb 18, 2020
8d0ea78
Explicitly adding bahir-common dependency in spark2.4 engine, as a wo…
mathieu-rossignol Feb 18, 2020
f59dabc
Set checkpointlocation in structured stream
mathieu-rossignol Feb 19, 2020
bbf270e
Added spark deploy mode property to support sparl standlone cluster d…
mathieu-rossignol Feb 21, 2020
2a719c5
Introudcing support of spark standalone cluster in logisland.sh
mathieu-rossignol Feb 21, 2020
6bd742b
Fix for ClassNotFoundException in databricks environment on executor:…
mathieu-rossignol Feb 26, 2020
337f895
Need spark 2.4.0 in spark 2.4 engine to align with latest supported v…
mathieu-rossignol Feb 26, 2020
7f5c0f7
Use offset as record key when reading from azure event hub. Also remo…
mathieu-rossignol Feb 27, 2020
2e1de7d
Remove not finished and useless engine for spark 2.4+ and kafka 2.4+
mathieu-rossignol Feb 27, 2020
94732be
Update user doc after full build
mathieu-rossignol Feb 27, 2020
b4d1f77
Added experimental logisland.sh feature that allows minimal launch on…
mathieu-rossignol Feb 28, 2020
af2e8b7
Added OpenDistro Elasticsearch integration test. For the moment, only…
mathieu-rossignol Feb 28, 2020
bd0636a
Centralized opendistro user name and password in integration tests
mathieu-rossignol Feb 28, 2020
85b1c6e
update test containers version in ES7 integration test
mathieu-rossignol Feb 28, 2020
e5cec1c
No more need of mock service in ES 6 and 7 integration test: use real…
mathieu-rossignol Feb 28, 2020
2607fac
Support for HTTPS opendistro elasticsearch
mathieu-rossignol Feb 28, 2020
e66b83a
Added comment in ES7 service regarding SSL
mathieu-rossignol Mar 10, 2020
0b36c05
Added support for avro serializer/deserializer in structured streams …
mathieu-rossignol Mar 11, 2020
557cbec
From avro 1.8.2 to avro 1.9.2 to have avro schema serializable and th…
mathieu-rossignol Mar 11, 2020
84cb63a
Forgotten close when loading maxmind db from URI. Alos upgraded versi…
mathieu-rossignol Mar 12, 2020
dd876d0
Changed log messages when loading maxmind DB from URI
mathieu-rossignol Mar 12, 2020
4343104
Rollback park-sql-streaming-mqtt_2.11 version to not have conflict wi…
mathieu-rossignol Mar 13, 2020
0e5a5fa
Fixed Record JSON deserialization problem after jackson version bump.…
mathieu-rossignol Mar 16, 2020
c531acd
Changed http://jcenter.bintray.com into https://jcenter.bintray.com i…
mathieu-rossignol Mar 16, 2020
2159598
Added https://repo1.maven.org/maven2 repo to ivy.xml
mathieu-rossignol Mar 16, 2020
15ffd64
Fix logisland.sh script using wrong spark engine: when spark 2.3.3 it…
mathieu-rossignol Mar 17, 2020
aaae6be
Try to fix 20 minutes timeout when running travis integration tests
mathieu-rossignol Mar 18, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ script:
# Build
# use travis_wait so it does not time_out after 10 minutes without output (unfortunately that seems to not work)
# use -q so there's not too much output for travis (4Mb max)
- travis_wait mvn clean install -Pintegration-tests -q
- travis_wait 30 mvn clean install -Pintegration-tests -q
# build assembly (there is currently missing jars in assembly when using mvn clean install...)
- mvn clean package -DskipTests -q
# Integrations tests
Expand Down
2 changes: 2 additions & 0 deletions logisland-assembly/src/assembly/shared-dependencies.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
<!-- Now, select which projects to include in this module-set. -->
<includes>
<!-- ENGINES -->
<!-- <include>com.hurence.logisland:logisland-engine-spark_2_4_kafka_2_4</include>-->
<include>com.hurence.logisland:logisland-engine-spark_2_1</include>
<include>com.hurence.logisland:logisland-engine-spark_2_3</include>
<include>com.hurence.logisland:logisland-engine-spark_2_4</include>
<include>com.hurence.logisland:logisland-engine-spark_1_6</include>
<include>com.hurence.logisland:logisland-engine-vanilla</include>
</includes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.3</version>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.maxmind.geoip2</groupId>
<artifactId>geoip2</artifactId>
<version>2.11.0</version>
<version>2.13.1</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.hurence.logisland.service.elasticsearch;


import com.hurence.logisland.annotation.documentation.CapabilityDescription;
import com.hurence.logisland.annotation.documentation.Tags;
import com.hurence.logisland.component.AllowableValue;
Expand All @@ -31,7 +30,6 @@
import java.util.Map;
import java.util.Optional;


@Tags({"elasticsearch", "client"})
@CapabilityDescription("A controller service for accessing an elasticsearch client.")
public interface ElasticsearchClientService extends DatastoreClientService {
Expand Down Expand Up @@ -154,6 +152,16 @@ public ValidationResult validate(final String subject, final String input) {
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.build();

PropertyDescriptor ENABLE_SSL = new PropertyDescriptor.Builder()
.name("enable.ssl")
.description("Whether to enable (true) TLS/SSL connections or not (false). This can for instance be used" +
" with opendistro. Defaults to false. Note that the current implementation does try to validate" +
" the server certificate.")
.required(false)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();

PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
.name("username")
.description("Username to access the Elasticsearch cluster")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ESRule implements TestRule {
* The internal-transport client that talks to the local node.
*/
private RestHighLevelClient client;
private ElasticsearchContainer container;

/**
* Return a closure which starts an embedded ES docker container, executes the unit-test, then shuts down the
Expand All @@ -46,7 +47,7 @@ public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
ElasticsearchContainer container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.6.2");
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.6.2");
container.start();
client = new RestHighLevelClient(RestClient.builder(HttpHost.create(container.getHttpHostAddress())));

Expand All @@ -60,6 +61,10 @@ public void evaluate() throws Throwable {
};
}

public String getHostPortString() {
return container.getHttpHostAddress();
}

/**
* Return the object through which operations can be performed on the ES cluster.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@

import java.io.IOException;
import java.util.*;
import java.util.function.BiConsumer;

import static com.hurence.logisland.service.elasticsearch.ElasticsearchClientService.HOSTS;

public class Elasticsearch_6_6_2_ClientServiceIT {

Expand All @@ -74,84 +75,13 @@ public void clean() throws IOException {
}
}

private class MockElasticsearchClientService extends Elasticsearch_6_6_2_ClientService {

@Override
protected void createElasticsearchClient(ControllerServiceInitializationContext context) throws ProcessException {
if (esClient != null) {
return;
}
esClient = esRule.getClient();
}

@Override
protected void createBulkProcessor(ControllerServiceInitializationContext context) {

if (bulkProcessor != null) {
return;
}

// create the bulk processor

BulkProcessor.Listener listener =
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long l, BulkRequest bulkRequest) {
getLogger().debug("Going to execute bulk [id:{}] composed of {} actions", new Object[]{l, bulkRequest.numberOfActions()});
}

@Override
public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
getLogger().debug("Executed bulk [id:{}] composed of {} actions", new Object[]{l, bulkRequest.numberOfActions()});
if (bulkResponse.hasFailures()) {
getLogger().warn("There was failures while executing bulk [id:{}]," +
" done bulk request in {} ms with failure = {}",
new Object[]{l, bulkResponse.getTook().getMillis(), bulkResponse.buildFailureMessage()});
for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed()) {
errors.put(item.getId(), item.getFailureMessage());
}
}
}
}

@Override
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
getLogger().error("something went wrong while bulk loading events to es : {}", new Object[]{throwable.getMessage()});
}

};

BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> esClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
bulkProcessor = BulkProcessor.builder(bulkConsumer, listener)
.setBulkActions(1000)
.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB))
.setFlushInterval(TimeValue.timeValueSeconds(1))
.setConcurrentRequests(2)
//.setBackoffPolicy(getBackOffPolicy(context))
.build();

}

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {

List<PropertyDescriptor> props = new ArrayList<>();

return Collections.unmodifiableList(props);
}

}

private ElasticsearchClientService configureElasticsearchClientService(final TestRunner runner) throws InitializationException {
final MockElasticsearchClientService elasticsearchClientService = new MockElasticsearchClientService();
final Elasticsearch_6_6_2_ClientService elasticsearchClientService = new Elasticsearch_6_6_2_ClientService();

runner.addControllerService("elasticsearchClient", elasticsearchClientService);

runner.enableControllerService(elasticsearchClientService);
runner.setProperty(TestProcessor.ELASTICSEARCH_CLIENT_SERVICE, "elasticsearchClient");
runner.assertValid(elasticsearchClientService);
runner.setProperty(elasticsearchClientService, HOSTS, esRule.getHostPortString());
runner.enableControllerService(elasticsearchClientService);

// TODO : is this necessary ?
final ElasticsearchClientService service = PluginProxy.unwrap(runner.getProcessContext().getPropertyValue(TestProcessor.ELASTICSEARCH_CLIENT_SERVICE).asControllerService());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<properties>
<!-- Versions -->
<elasticsearch.version>7.1.1</elasticsearch.version>
<testcontainers.version>1.12.5</testcontainers.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -70,14 +71,14 @@
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.10.7</version>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.10.7</version>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* Copyright (C) 2020 Hurence (support@hurence.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hurence.logisland.service.elasticsearch;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.cert.X509Certificate;

/**
* A JUnit rule which starts an embedded opendsitro elasticsearch docker container to test security features
*/
public class ESOpenDistroRule implements TestRule {

/**
* The internal-transport client that talks to the local node.
*/
private RestHighLevelClient client;
private ElasticsearchOpenDistroContainer container;
private String opendistroUsername;
private String opendistroPassword;

private static Logger logger = LoggerFactory.getLogger(ESOpenDistroRule.class);

public ESOpenDistroRule(String opendistroUsername, String opendistroPassword) {
this.opendistroUsername = opendistroUsername;
this.opendistroPassword = opendistroPassword;
}

/**
* Return a closure which starts an embedded ES OpenDistro docker container, executes the unit-test, then shuts down the
* ES instance.
*/
@Override
public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
container = new ElasticsearchOpenDistroContainer("amazon/opendistro-for-elasticsearch:1.4.0",
opendistroUsername, opendistroPassword);
container.start();

// TODO: if testcontainers support no SSL server validation one can use the wait strategy
// in ElasticsearchOpenDistroContainer instead. See inside ElasticsearchOpenDistroContainer.
long wait = 10000L;
logger.info("Waiting for ES open distro container to start for " + wait/1000 + " seconds");
Thread.sleep(wait);

/**
* Inspired from https://github.com/opendistro-for-elasticsearch/community/issues/64
*/

RestClientBuilder builder = RestClient.builder(
new HttpHost(container.getHostAddress(), container.getPort(), "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {

// Set user/password basic auth credentials
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(opendistroUsername, opendistroPassword));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

// Set SSL trust manager and context
// Create and use a trust manager accepting all server certificates
TrustManager[] acceptAllTrustManager = new TrustManager[] { new X509TrustManager() {
public java.security.cert.X509Certificate[] getAcceptedIssuers() {
return null;
}
public void checkClientTrusted(X509Certificate[] certs, String authType) {
}

public void checkServerTrusted(X509Certificate[] certs, String authType) {
}
} };

SSLContext sslContext = null;
try {
sslContext = SSLContext.getInstance("SSL");
sslContext.init(null, acceptAllTrustManager, new java.security.SecureRandom());
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
httpClientBuilder.setSSLContext(sslContext);

return httpClientBuilder;
}
});
client = new RestHighLevelClient(builder);

try {
base.evaluate(); // execute the unit test
} finally {
client.close();
container.stop();
}
}
};
}

public String getHostPortString() {
return container.getHostPortString();
}

public String getHostAddress() {
return container.getHostAddress();
}

public int getPort() {
return container.getPort();
}

/**
* Return the object through which operations can be performed on the ES cluster.
*/
public RestHighLevelClient getClient() {
return client;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class ESRule implements TestRule {
* The internal-transport client that talks to the local node.
*/
private RestHighLevelClient client;
private ElasticsearchContainer container;

/**
* Return a closure which starts an embedded ES docker container, executes the unit-test, then shuts down the
Expand All @@ -46,7 +47,7 @@ public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
ElasticsearchContainer container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.1.1");
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.1.1");
container.start();
client = new RestHighLevelClient(RestClient.builder(HttpHost.create(container.getHttpHostAddress())));

Expand All @@ -60,6 +61,10 @@ public void evaluate() throws Throwable {
};
}

public String getHostPortString() {
return container.getHttpHostAddress();
}

/**
* Return the object through which operations can be performed on the ES cluster.
*/
Expand Down
Loading