Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/logisland 504 open faas support #512

Merged
merged 19 commits into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
21e6cfe
A first implementation of CallRequest processor not tested yet.
MiniPlayer Sep 26, 2019
b34415a
Implemented a few simple tests for CallRequest processor.
MiniPlayer Sep 26, 2019
a4663b8
Fixed build. Created a MockRestClientServices that returns given coor…
MiniPlayer Sep 27, 2019
e5e299f
Cleanned pom files. Explained why I could not use the test jar and ho…
MiniPlayer Sep 27, 2019
39605e1
Added tests and found a missing dependency (mvel)
MiniPlayer Sep 27, 2019
0ca5650
Improved elasticsearch tests by initializing docker container once an…
MiniPlayer Sep 27, 2019
10192ea
Generalised to all elasticsearch versions.
MiniPlayer Sep 27, 2019
741c86b
Adding forgotten file...
MiniPlayer Sep 27, 2019
56da4c7
Added rest service and rest processor modules to documentation and as…
MiniPlayer Oct 2, 2019
bf9a16c
Added possibility to use input record as body to CallRequest processor.
MiniPlayer Oct 2, 2019
2279975
Implemented an async version of CallRequest (AsyncCallRequest process…
MiniPlayer Oct 3, 2019
cae44d6
Fixed a few bugs. RestLookupService was not Thread safe because of th…
MiniPlayer Oct 4, 2019
5c83210
Added AsyncCallRequest
MiniPlayer Oct 4, 2019
a087cb6
Factorized some code into AbstractCallRequest.
MiniPlayer Oct 4, 2019
817c14c
Created dome static variables
MiniPlayer Oct 4, 2019
7512b22
Factorized tests as well.
MiniPlayer Oct 4, 2019
4b6357d
Added possiblity to filter out some http code response as errors. Add…
MiniPlayer Oct 7, 2019
d2357eb
Renamed serializer to deserializer. Fixed a bug, response body in okh…
MiniPlayer Oct 8, 2019
82e8d67
Fixed a bug when calling isSet on property that supports expression l…
MiniPlayer Oct 8, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions logisland-assembly/src/assembly/full-assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<include>com.hurence.logisland:logisland-service-cassandra-client</include>
<include>com.hurence.logisland:logisland-service-influxdb-client</include>
<include>com.hurence.logisland:logisland-service-redis</include>
<include>com.hurence.logisland:logisland-service-rest</include>
<!-- PROCESSORS -->
<include>com.hurence.logisland:logisland-processor-common</include>
<include>com.hurence.logisland:logisland-processor-xml</include>
Expand All @@ -64,6 +65,7 @@
<include>com.hurence.logisland:logisland-processor-elasticsearch</include>
<include>com.hurence.logisland:logisland-processor-excel</include>
<include>com.hurence.logisland:logisland-processor-scripting</include>
<include>com.hurence.logisland:logisland-processor-rest</include>
<!-- CONNECTORS -->
<include>com.hurence.logisland:logisland-connector-opc</include>
</includes>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>logisland-processors</artifactId>
<groupId>com.hurence.logisland</groupId>
<version>1.2.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>

<packaging>jar</packaging>
<artifactId>logisland-processor-rest</artifactId>

<properties>
<vertx.version>3.8.1</vertx.version>
</properties>

<dependencies>
<!-- Provided by other jars-->
<dependency>
<groupId>com.hurence.logisland</groupId>
<artifactId>logisland-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.hurence.logisland</groupId>
<artifactId>logisland-utils</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.hurence.logisland</groupId>
<artifactId>logisland-plugin-support</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.hurence.logisland</groupId>
<artifactId>logisland-scripting-mvel</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<!-- Vertx for async-->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<version>${vertx.version}</version>
</dependency>



</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.hurence.logisland</groupId>
<artifactId>logisland-maven-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/**
* Copyright (C) 2016 Hurence (support@hurence.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hurence.logisland.rest.processor.lookup;


import com.hurence.logisland.component.AllowableValue;
import com.hurence.logisland.component.InitializationException;
import com.hurence.logisland.component.PropertyDescriptor;
import com.hurence.logisland.processor.ProcessContext;
import com.hurence.logisland.processor.ProcessError;
import com.hurence.logisland.record.Field;
import com.hurence.logisland.record.Record;
import com.hurence.logisland.serializer.ExtendedJsonSerializer;
import com.hurence.logisland.serializer.RecordSerializer;
import com.hurence.logisland.serializer.SerializerProvider;
import com.hurence.logisland.validator.StandardValidators;
import com.hurence.logisland.validator.ValidationContext;
import com.hurence.logisland.validator.ValidationResult;

import java.util.*;
import java.util.stream.Collectors;

public abstract class AbstractCallRequest extends AbstractHttpProcessor
{

public static final PropertyDescriptor FIELD_HTTP_RESPONSE = new PropertyDescriptor.Builder()
.name("field.http.response")
.description("The name of the field to put http response")
.required(false)
.expressionLanguageSupported(false)
.build();

public static final PropertyDescriptor REQUEST_METHOD = new PropertyDescriptor.Builder()
.name("request.method")
.description("The HTTP VERB Request to use.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();

public static final PropertyDescriptor REQUEST_MIME_TYPE = new PropertyDescriptor.Builder()
.name("request.mime.type")
.description("The response mime type expected for the response to use in request.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();

public static final PropertyDescriptor REQUEST_BODY = new PropertyDescriptor.Builder()
.name("request.body")
.description("The body to use for the request.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();

public static final PropertyDescriptor INPUT_AS_BODY = new PropertyDescriptor.Builder()
.name("input.as.body")
.description("If the input record should be serialized into json and used as body of request or not.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.defaultValue("false")
.build();

public static final String OVERWRITE_EXISTING_VALUE = "overwrite_existing";
public static final AllowableValue OVERWRITE_EXISTING =
new AllowableValue(OVERWRITE_EXISTING_VALUE, "overwrite existing field", "if field already exist");
public static final String KEEP_OLD_FIELD_VALUE = "keep_only_old_field";
public static final AllowableValue KEEP_OLD_FIELD =
new AllowableValue(KEEP_OLD_FIELD_VALUE, "keep only old field value", "keep only old field");
public static final String IGNORE_RESPONSE_VALUE = "ignore_response_field";
public static final AllowableValue IGNORE_RESPONSE =
new AllowableValue(IGNORE_RESPONSE_VALUE, "discard/ignore response", "discard/ignore response");

public static final PropertyDescriptor CONFLICT_RESOLUTION_POLICY = new PropertyDescriptor.Builder()
.name("conflict.resolution.policy")
.description("What to do when a field with the same name already exists ?")
.required(false)
.defaultValue(KEEP_OLD_FIELD.getValue())
.allowableValues(OVERWRITE_EXISTING, KEEP_OLD_FIELD, IGNORE_RESPONSE)
.build();

public static final PropertyDescriptor VALID_HTTP_CODES = new PropertyDescriptor.Builder()
.name("valid.http.response")
.description("A comma separated list of integer (http codes)." +
"If not specified return every response. If specified add error for responses with http code not listed.")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.COMMA_SEPARATED_LIST_OF_INTEGER_VALIDATOR)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();

public static final PropertyDescriptor KEEP_ONLY_BODY_RESPONSE = new PropertyDescriptor.Builder()
.name("keep.only.response.body")
.description("if set only keeps body returned value instead of entire http response with http code and http message.")
.required(false)
.defaultValue("false")
.expressionLanguageSupported(false)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();

Set<Integer> validHttpCodes = new HashSet<>();
String responseFieldName;
String conflictPolicy;
boolean inputAsBody;
boolean onlyKeepResponseBody;
RecordSerializer serializer;

@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(context));
if (context.getPropertyValue(INPUT_AS_BODY).asBoolean() && context.getPropertyValue(REQUEST_BODY).isSet()) {
validationResults.add(
new ValidationResult.Builder()
.input(String.format("properties '%s' and '%s' are mutually exclusive so they can not be set both at the same time.",
INPUT_AS_BODY.getName(), REQUEST_BODY.getName()))
.valid(false)
.build());
}
if (context.getPropertyValue(CONFLICT_RESOLUTION_POLICY).asString().equals(IGNORE_RESPONSE.getValue()) && context.getPropertyValue(FIELD_HTTP_RESPONSE).isSet()) {
validationResults.add(
new ValidationResult.Builder()
.input(String.format("property '%s' can not be set to '%s' when property '%s' is set.",
CONFLICT_RESOLUTION_POLICY.getName(), IGNORE_RESPONSE.getValue(), FIELD_HTTP_RESPONSE.getName()))
.valid(false)
.build());
}
return validationResults;
}

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(HTTP_CLIENT_SERVICE);
props.add(FIELD_HTTP_RESPONSE);
props.add(REQUEST_MIME_TYPE);
props.add(REQUEST_METHOD);
props.add(REQUEST_BODY);
props.add(INPUT_AS_BODY);
props.add(CONFLICT_RESOLUTION_POLICY);
props.add(VALID_HTTP_CODES);
props.add(KEEP_ONLY_BODY_RESPONSE);
return Collections.unmodifiableList(props);
}

@Override
public void init(ProcessContext context) throws InitializationException {
super.init(context);
try {
if (context.getPropertyValue(FIELD_HTTP_RESPONSE).isSet()) {
this.responseFieldName = context.getPropertyValue(FIELD_HTTP_RESPONSE).asString();
}
this.conflictPolicy = context.getPropertyValue(CONFLICT_RESOLUTION_POLICY).asString();
this.inputAsBody = context.getPropertyValue(INPUT_AS_BODY).asBoolean();
this.onlyKeepResponseBody = context.getPropertyValue(KEEP_ONLY_BODY_RESPONSE).asBoolean();
if (inputAsBody) {
serializer = SerializerProvider.getSerializer(ExtendedJsonSerializer.class.getName(), null);
}
if (context.getPropertyValue(VALID_HTTP_CODES).isSet()) {
context.getPropertyValue(VALID_HTTP_CODES).asStringOpt().ifPresent(s -> {
List<Integer> httpCodes = Arrays
.stream(s.split(","))
.map(Integer::parseInt)
.collect(Collectors.toList());
validHttpCodes.addAll(httpCodes);
});
} else {
validHttpCodes.clear();
}
} catch (Exception ex) {
throw new InitializationException(ex);
}
}

void modifyRecord(Record record, Record rsp) {
if (!validHttpCodes.isEmpty() && !validHttpCodes.contains(rsp.getField(restClientService.getResponseCodeKey()).asInteger())) {
record.addError(ProcessError.RUNTIME_ERROR.getName(),
String.format("http response code was not valid (%s)", validHttpCodes.toString()));
} else {
switch (conflictPolicy) {
case OVERWRITE_EXISTING_VALUE:
break;
case IGNORE_RESPONSE_VALUE:
return;
case KEEP_OLD_FIELD_VALUE:
if (record.hasField(responseFieldName)) return;
}
if (onlyKeepResponseBody) {
Field body = rsp.getField(restClientService.getResponseBodyKey());
record.setField(responseFieldName, body.getType(), body.getRawValue());
} else {
record.setRecordField(responseFieldName, rsp);
}
}
}

Optional<String> calculBody(Record record, ProcessContext context) {
if (context.getPropertyValue(REQUEST_BODY).isSet()) {
return Optional.ofNullable(context.getPropertyValue(REQUEST_BODY.getName()).evaluate(record).asString());
}
return Optional.empty();
}

Optional<String> calculMimTyp(Record record, ProcessContext context) {
if (context.getPropertyValue(REQUEST_MIME_TYPE).isSet()) {
return Optional.ofNullable(context.getPropertyValue(REQUEST_MIME_TYPE.getName()).evaluate(record).asString());
}
return Optional.empty();
}

Optional<String> calculVerb(Record record, ProcessContext context) {
if (context.getPropertyValue(REQUEST_METHOD).isSet()) {
return Optional.ofNullable(context.getPropertyValue(REQUEST_METHOD.getName()).evaluate(record).asString());
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright (C) 2016 Hurence (support@hurence.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.hurence.logisland.rest.processor.lookup;

import com.hurence.logisland.classloading.PluginProxy;
import com.hurence.logisland.component.InitializationException;
import com.hurence.logisland.component.PropertyDescriptor;
import com.hurence.logisland.processor.AbstractProcessor;
import com.hurence.logisland.processor.ProcessContext;
import com.hurence.logisland.service.rest.RestClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHttpProcessor extends AbstractProcessor {

private static final Logger logger = LoggerFactory.getLogger(AbstractHttpProcessor.class);

public static final PropertyDescriptor HTTP_CLIENT_SERVICE = new PropertyDescriptor.Builder()
.name("http.client.service")
.description("The instance of the Controller Service to use for HTTP requests.")
.required(true)
.identifiesControllerService(RestClientService.class)
.build();


protected RestClientService restClientService;

@Override
public boolean hasControllerService() {
return true;
}

@Override
public void init(final ProcessContext context) throws InitializationException {
super.init(context);
restClientService = PluginProxy.rewrap(context.getPropertyValue(HTTP_CLIENT_SERVICE).asControllerService());
if (restClientService == null) {
logger.error("Http Rest client service is not initialized!");
throw new InitializationException("Could not initialize Http Rest client service!");
}
}


}
Loading