Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce http-service, http-service-response, http-call, http-call-response #143

Merged
merged 8 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ Siddhi IO HTTP
======================================

[![Jenkins Build Status](https://wso2.org/jenkins/job/siddhi/job/siddhi-io-http/badge/icon)](https://wso2.org/jenkins/job/siddhi/job/siddhi-io-http/)
[![GitHub (pre-)Release](https://img.shields.io/github/release/siddhi-io/siddhi-io-http/all.svg)](https://github.com/siddhi-io/siddhi-io-http/releases)
[![GitHub (Pre-)Release Date](https://img.shields.io/github/release-date-pre/siddhi-io/siddhi-io-http.svg)](https://github.com/siddhi-io/siddhi-io-http/releases)
[![GitHub Release](https://img.shields.io/github/release/siddhi-io/siddhi-io-http.svg)](https://github.com/siddhi-io/siddhi-io-http/releases)
[![GitHub Release Date](https://img.shields.io/github/release-date/siddhi-io/siddhi-io-http.svg)](https://github.com/siddhi-io/siddhi-io-http/releases)
[![GitHub Open Issues](https://img.shields.io/github/issues-raw/siddhi-io/siddhi-io-http.svg)](https://github.com/siddhi-io/siddhi-io-http/issues)
[![GitHub Last Commit](https://img.shields.io/github/last-commit/siddhi-io/siddhi-io-http.svg)](https://github.com/siddhi-io/siddhi-io-http/commits/master)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
Expand All @@ -15,7 +15,7 @@ For information on <a target="_blank" href="https://siddhi.io/">Siddhi</a> and i
## Download

* Versions 2.x and above with group id `io.siddhi.extension.*` from <a target="_blank" href="https://mvnrepository.com/artifact/io.siddhi.extension.io.http/siddhi-io-http/">here</a>.
* Versions 1.x and lower with group id `org.wso2.extension.siddhi.*` from <a target="_blank" href="https://mvnrepository.com/artifact/org.wso2.extension.siddhi.execution.string/siddhi-io-http">here</a>.
* Versions 1.x and lower with group id `org.wso2.extension.siddhi.*` from <a target="_blank" href="https://mvnrepository.com/artifact/org.wso2.extension.siddhi.io.http/siddhi-io-http">here</a>.

## Latest API Docs

Expand Down
2 changes: 1 addition & 1 deletion component/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<parent>
<artifactId>siddhi-io-http-parent</artifactId>
<groupId>io.siddhi.extension.io.http</groupId>
<version>2.0.9-SNAPSHOT</version>
<version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,199 +22,84 @@
import io.siddhi.annotation.Extension;
import io.siddhi.annotation.Parameter;
import io.siddhi.annotation.util.DataType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.exception.ConnectionUnavailableException;
import io.siddhi.core.stream.ServiceDeploymentInfo;
import io.siddhi.core.stream.output.sink.Sink;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.core.util.transport.DynamicOptions;
import io.siddhi.core.util.transport.Option;
import io.siddhi.core.util.transport.OptionHolder;
import io.siddhi.extension.io.http.sink.util.HttpSinkUtil;
import io.siddhi.extension.io.http.util.HTTPSourceRegistry;
import io.siddhi.extension.io.http.util.HttpConstants;
import io.siddhi.query.api.definition.StreamDefinition;
import org.apache.log4j.Logger;
import org.wso2.carbon.messaging.Header;

import java.util.List;

/**
* {@code HttpResponseSink} Handle the HTTP publishing tasks.
*/
@Extension(name = "http-response", namespace = "sink",
description = "HTTP response sink is correlated with the " +
"The HTTP request source, through a unique `source.id`, and it send a response to the HTTP request " +
"source having the same `source.id`. The response message can be formatted in `text`, `XML` or `JSON` "
+ "and can be sent with appropriate headers.",
deprecated = true,
description = "" +
"_(Use http-service-response sink instead)._\n" +
"The http-response sink send responses of the requests consumed by its corresponding " +
"http-request source, by mapping the response messages to formats such as `text`, `XML` and `JSON`.",
parameters = {
@Parameter(
name = "source.id",
description = "Identifier of the source.",
description = "Identifier to correlate the http-response sink to its corresponding " +
"http-request source which consumed the request.",
type = {DataType.STRING}),
@Parameter(
name = "message.id",
description = "Identifier of the message.",
description = "Identifier to correlate the response with the request received " +
"by http-request source.",
dynamic = true,
type = {DataType.STRING}),
@Parameter(
name = "headers",
description = "The headers that should be included as HTTP response headers. There can be any" +
" number of headers concatenated on following format. \"'header1:value1'," +
"'header2:value2'\" User can include content-type header if he/she need to have any " +
"specific type for payload. If not system get the mapping type as the content-Type " +
"header (ie.`@map(xml)`:`application/xml`, `@map(json)`:`application/json`, " +
"`@map(text)`:`plain/text`) and if user does not include any mapping type then system "
+ "gets the `plain/text` as default Content-Type header. If user does not include " +
"Content-Length header then system calculate the bytes size of payload and include it" +
" as content-length header.",
description = "HTTP request headers in format `\"'<key>:<value>','<key>:<value>'\"`.\n" +
"When the `Content-Type` header is not provided the system decides the " +
"Content-Type based on the provided sink mapper as following: \n" +
" - `@map(type='xml')`: `application/xml`\n" +
" - `@map(type='json')`: `application/json`\n" +
" - `@map(type='text')`: `plain/text`\n" +
" - `@map(type='keyvalue')`: `application/x-www-form-urlencoded`\n" +
" - For all other cases system defaults to `plain/text`\n" +
"Also the `Content-Length` header need not to be provided, as the system " +
"automatically defines it by calculating the size of the payload.",
type = {DataType.STRING},
optional = true,
defaultValue = " "),
defaultValue = "Content-Type and Content-Length headers"),
},
examples = {
@Example(syntax =
"@sink(type='http-response', source.id='sampleSourceId', message.id='{{messageId}}', "
+ "headers=\"'content-type:json','content-length:94'\""
+ "@map(type='json', @payload('{{payloadBody}}')))\n"
+ "define stream FooStream (payloadBody String, messageId string, headers string);\n",
description =
"If it is json mapping expected input should be in following format for FooStream:\n"
+ "{\n"
+ "{\"events\":\n"
+ " {\"event\":\n"
+ " \"symbol\":WSO2,\n"
+ " \"price\":55.6,\n"
+ " \"volume\":100,\n"
+ " }\n"
+ "},\n"
+ "0cf708b1-7eae-440b-a93e-e72f801b486a,\n"
+ "Content-Length:24#Content-Location:USA\n"
+ "}\n\n"
+ "Above event will generate response for the matching source message " +
"as below.\n\n"
+ "~Output http event payload\n"
+ "{\"events\":\n"
+ " {\"event\":\n"
+ " \"symbol\":WSO2,\n"
+ " \"price\":55.6,\n"
+ " \"volume\":100,\n"
+ " }\n"
+ "}\n\n"
+ "~Output http event headers\n"
+ "Content-Length:24,\n"
+ "Content-Location:'USA',\n"
+ "Content-Type:'application/json'\n"
)}
)
public class HttpResponseSink extends Sink {

private static final Logger log = Logger.getLogger(HttpResponseSink.class);
private Option messageIdOption;
private String sourceId;
private Option httpHeaderOption;
private String mapType;

/**
* Returns the list of classes which this sink can consume.
* Based on the type of the sink, it may be limited to being able to publish specific type of classes.
* For example, a sink of type file can only write objects of type String .
*
* @return array of supported classes , if extension can support of any types of classes
* then return empty array .
*/
@Override
public Class[] getSupportedInputEventClasses() {
return new Class[]{String.class};
}

/**
* Give information to the deployment about the service exposed by the sink.
*
* @return ServiceDeploymentInfo Service related information to the deployment
*/
@Override
protected ServiceDeploymentInfo exposeServiceDeploymentInfo() {
return null;
}

/**
* Returns a list of supported dynamic options (that means for each event value of the option can change) by
* the transport
*
* @return the list of supported dynamic option keys
*/
@Override
public String[] getSupportedDynamicOptions() {
return new String[]{
HttpConstants.HEADERS,
HttpConstants.MESSAGE_ID
};
}

/**
* The initialization method for {@link Sink}, which will be called before other methods and validate
* the all configuration and getting the intial values.
*
* @param outputStreamDefinition containing stream definition bind to the {@link Sink}
* @param optionHolder Option holder containing static and dynamic configuration related
* to the {@link Sink}
* @param configReader to read the sink related system configuration.
* @param siddhiAppContext the context of the {@link io.siddhi.query.api.SiddhiApp} used to
* get siddhi related utilty functions.
*/
@Override
protected StateFactory init(StreamDefinition outputStreamDefinition, OptionHolder optionHolder,
ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
//read configurations
this.messageIdOption = optionHolder.validateAndGetOption(HttpConstants.MESSAGE_ID);
this.sourceId = optionHolder.validateAndGetStaticValue(HttpConstants.SOURCE_ID);
this.httpHeaderOption = optionHolder.getOrCreateOption(HttpConstants.HEADERS, HttpConstants.DEFAULT_HEADER);
this.mapType = outputStreamDefinition.getAnnotations().get(0).getAnnotations().get(0).getElements().get(0)
.getValue();
return null;
}
@Example(syntax = "" +
"@source(type='http-request', receiver.url='http://localhost:5005/add',\n" +
" source.id='adder',\n" +
" @map(type='json, @attributes(messageId='trp:messageId',\n" +
" value1='$.event.value1',\n" +
" value2='$.event.value2')))\n" +
"define stream AddStream (messageId string, value1 long, value2 long);\n" +
"\n" +
"@sink(type='http-response', source.id='adder',\n" +
" message.id='{{messageId}}', @map(type = 'json'))\n" +
"define stream ResultStream (messageId string, results long);\n" +
"\n" +
"@info(name = 'query1')\n" +
"from AddStream \n" +
"select messageId, value1 + value2 as results \n" +
"insert into ResultStream;",
description = "The http-request source on stream `AddStream` listens on " +
"url `http://localhost:5005/stocks` for JSON messages with format:\n" +
"```{\n" +
" \"event\": {\n" +
" \"value1\": 3,\n" +
" \"value2\": 4\n" +
" }\n" +
"}```\n" +
"and when events arrive it maps to `AddStream` events and pass " +
"them to query `query1` for processing. The query results produced on `ResultStream` " +
"are sent as a response via http-response sink with format:" +
"```{\n" +
" \"event\": {\n" +
" \"results\": 7\n" +
" }\n" +
"}```" +
"Here the request and response are correlated by passing the `messageId` " +
"produced by the http-request to the respective http-response sink."
),

@Override
public void publish(Object payload, DynamicOptions dynamicOptions, State state)
throws ConnectionUnavailableException {
String headers = httpHeaderOption.getValue(dynamicOptions);
List<Header> headersList = HttpSinkUtil.getHeaders(headers);
String messageId = messageIdOption.getValue(dynamicOptions);
String contentType = HttpSinkUtil.getContentType(mapType, headersList);
HTTPSourceRegistry.
getRequestSource(sourceId).handleCallback(messageId, (String) payload, headersList, contentType);
}

/**
* This method will be called before the processing method.
* Intention to establish connection to publish event.
*
* @throws ConnectionUnavailableException if end point is unavailable the ConnectionUnavailableException thrown
* such that the system will take care retrying for connection
*/
@Override
public void connect() throws ConnectionUnavailableException {

}

/**
* Called after all publishing is done, or when {@link ConnectionUnavailableException} is thrown
* Implementation of this method should contain the steps needed to disconnect from the sink.
*/
@Override
public void disconnect() {

}

/**
* The method can be called when removing an event receiver.
* The cleanups that has to be done when removing the receiver has to be done here.
*/
@Override
public void destroy() {
}
)
@Deprecated
public class HttpResponseSink extends HttpServiceResponseSink {

}
}
Loading