Skip to content

Commit

Permalink
Partition query support and upgrade siddhi version (#49)
Browse files Browse the repository at this point in the history
* partition query support and siddhi version update to latest

* fix update

* Update .travis.yml

* Update version to 0.2.1-SNAPSHOT

* Update pom.xml

* Update pom.xml
  • Loading branch information
pranjal0811 authored and haoch committed Sep 4, 2019
1 parent ea88b3c commit 8032082
Show file tree
Hide file tree
Showing 15 changed files with 94 additions and 76 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ language: java
script: mvn clean test
jdk:
- oraclejdk8
dist: trusty
4 changes: 2 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
<parent>
<artifactId>flink-siddhi-parent_2.11</artifactId>
<groupId>com.github.haoch</groupId>
<version>0.2.0-SNAPSHOT</version>
<version>0.2.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-siddhi_2.11</artifactId>
<packaging>jar</packaging>
</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;

import java.util.*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.StreamDefinition;

/**
* <h1>Siddhi Runtime Operator</h1>
Expand All @@ -66,13 +66,13 @@
*
* <ul>
* <li>
* Create Siddhi {@link org.wso2.siddhi.core.SiddhiAppRuntime} according predefined execution plan and integrate with Flink Stream Operator lifecycle.
* Create Siddhi {@link io.siddhi.core.SiddhiAppRuntime} according predefined execution plan and integrate with Flink Stream Operator lifecycle.
* </li>
* <li>
* Connect Flink DataStreams with predefined Siddhi Stream according to unique streamId
* </li>
* <li>
* Convert native {@link StreamRecord} to Siddhi {@link org.wso2.siddhi.core.event.Event} according to {@link StreamSchema}, and send to Siddhi Runtime.
* Convert native {@link StreamRecord} to Siddhi {@link io.siddhi.core.event.Event} according to {@link StreamSchema}, and send to Siddhi Runtime.
* </li>
* <li>
* Listen output callback event and convert as expected output type according to output {@link org.apache.flink.api.common.typeinfo.TypeInformation}, then output as typed DataStream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.siddhi.utils.SiddhiExecutionPlanner;
import org.apache.flink.util.Preconditions;
import org.wso2.siddhi.core.SiddhiManager;
import io.siddhi.core.SiddhiManager;

import java.io.Serializable;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.query.api.definition.AbstractDefinition;

/**
* Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
Expand Down Expand Up @@ -86,7 +86,7 @@ public synchronized void stopProcessing() {
this.collectedRecords.clear();
}

private Map<String, Object> toMap(Event event) {
public Map<String, Object> toMap(Event event) {
Map<String, Object> map = new LinkedHashMap<>();
for (int i = 0; i < definition.getAttributeNameArray().length; i++) {
map.put(definition.getAttributeNameArray()[i], event.getData(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.query.api.definition.AbstractDefinition;

/**
* Siddhi Stream output callback handler and conver siddhi {@link Event} to required output type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.siddhi.utils.SiddhiTypeFactory;
import org.apache.flink.util.Preconditions;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,27 @@

package org.apache.flink.streaming.siddhi.utils;

import io.siddhi.query.api.SiddhiApp;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.execution.ExecutionElement;
import io.siddhi.query.api.execution.partition.Partition;
import io.siddhi.query.api.execution.query.Query;
import io.siddhi.query.api.execution.query.input.handler.StreamHandler;
import io.siddhi.query.api.execution.query.input.handler.Window;
import io.siddhi.query.api.execution.query.input.stream.InputStream;
import io.siddhi.query.api.execution.query.input.stream.JoinInputStream;
import io.siddhi.query.api.execution.query.input.stream.SingleInputStream;
import io.siddhi.query.api.execution.query.input.stream.StateInputStream;
import io.siddhi.query.api.execution.query.output.stream.OutputStream;
import io.siddhi.query.api.execution.query.selection.OutputAttribute;
import io.siddhi.query.api.execution.query.selection.Selector;
import io.siddhi.query.api.expression.Variable;
import io.siddhi.query.compiler.SiddhiCompiler;
import org.apache.commons.collections.ListUtils;
import org.apache.flink.streaming.siddhi.schema.SiddhiStreamSchema;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wso2.siddhi.query.api.SiddhiApp;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.api.execution.ExecutionElement;
import org.wso2.siddhi.query.api.execution.query.Query;
import org.wso2.siddhi.query.api.execution.query.input.handler.StreamHandler;
import org.wso2.siddhi.query.api.execution.query.input.handler.Window;
import org.wso2.siddhi.query.api.execution.query.input.stream.InputStream;
import org.wso2.siddhi.query.api.execution.query.input.stream.JoinInputStream;
import org.wso2.siddhi.query.api.execution.query.input.stream.SingleInputStream;
import org.wso2.siddhi.query.api.execution.query.input.stream.StateInputStream;
import org.wso2.siddhi.query.api.execution.query.output.stream.OutputStream;
import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
import org.wso2.siddhi.query.api.execution.query.selection.Selector;
import org.wso2.siddhi.query.api.expression.Variable;
import org.wso2.siddhi.query.compiler.SiddhiCompiler;

import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -74,13 +75,18 @@ public String getEnrichedExecutionPlan() {

private void parse() throws Exception {
SiddhiApp siddhiApp = SiddhiCompiler.parse(enrichedExecutionPlan);
Query query;
for (ExecutionElement executionElement : siddhiApp.getExecutionElementList()) {
if (!(executionElement instanceof Query)) {
throw new Exception("Unhandled execution element: " + executionElement.toString());
}

InputStream inputStream = ((Query) executionElement).getInputStream();
Selector selector = ((Query) executionElement).getSelector();
//--FIX START Support for Partition Execution element
if (executionElement instanceof Query) {
query = (Query) executionElement;
}else{
query = ((Partition) executionElement).getQueryList().get(0);
}
//--FIX END
InputStream inputStream = query.getInputStream();
Selector selector = query.getSelector();
Map<String, SingleInputStream> queryLevelAliasToStreamMapping = new HashMap<>();

// Inputs stream definitions
Expand Down Expand Up @@ -143,7 +149,7 @@ private void parse() throws Exception {
}

// Output streams
OutputStream outputStream = ((Query) executionElement).getOutputStream();
OutputStream outputStream = query.getOutputStream();
outputStreams.put(outputStream.getId(), selector.getSelectionList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.siddhi.router.StreamRoute;
import org.apache.flink.streaming.siddhi.operator.SiddhiOperatorContext;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.query.api.definition.AbstractDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;

import java.util.*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.flink.streaming.siddhi.extension;

import java.util.HashMap;
import java.util.Map;

import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.executor.ExpressionExecutor;
import org.wso2.siddhi.core.executor.function.FunctionExecutor;
import org.wso2.siddhi.core.util.config.ConfigReader;
import org.wso2.siddhi.query.api.definition.Attribute;
import io.siddhi.core.config.SiddhiQueryContext;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.function.FunctionExecutor;
import io.siddhi.core.util.config.ConfigReader;
import io.siddhi.core.util.snapshot.state.State;
import io.siddhi.core.util.snapshot.state.StateFactory;
import io.siddhi.query.api.definition.Attribute;

public class CustomPlusFunctionExtension extends FunctionExecutor {
private Attribute.Type returnType;
Expand All @@ -34,7 +33,7 @@ public class CustomPlusFunctionExtension extends FunctionExecutor {
* The initialization method for FunctionExecutor, this method will be called before the other methods
*/
@Override
protected void init(ExpressionExecutor[] expressionExecutors, ConfigReader configReader, SiddhiAppContext siddhiAppContext) {
protected StateFactory init(ExpressionExecutor[] expressionExecutors, ConfigReader configReader, SiddhiQueryContext siddhiQueryContext) {
for (ExpressionExecutor expressionExecutor : attributeExpressionExecutors) {
Attribute.Type attributeType = expressionExecutor.getReturnType();
if (attributeType == Attribute.Type.DOUBLE) {
Expand All @@ -46,6 +45,7 @@ protected void init(ExpressionExecutor[] expressionExecutors, ConfigReader confi
returnType = Attribute.Type.LONG;
}
}
return null;
}

/**
Expand Down Expand Up @@ -91,17 +91,18 @@ protected Object execute(Object data) {
}

@Override
public Attribute.Type getReturnType() {
return returnType;
protected Object execute(Object[] objects, State state) {
return objects;
}

@Override
public Map<String, Object> currentState() {
return new HashMap<>();
protected Object execute(Object o, State state) {
return o;
}

@Override
public void restoreState(Map<String, Object> map) {

public Attribute.Type getReturnType() {
return returnType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.wso2.siddhi.core.SiddhiAppRuntime;
import org.wso2.siddhi.core.SiddhiManager;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import io.siddhi.core.SiddhiAppRuntime;
import io.siddhi.core.SiddhiManager;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.input.InputHandler;
import io.siddhi.core.stream.output.StreamCallback;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.siddhi.source.Event;
import org.junit.Test;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.definition.Attribute;
import io.siddhi.query.api.definition.StreamDefinition;

import static org.junit.Assert.*;

Expand Down
4 changes: 2 additions & 2 deletions experimental/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>flink-siddhi-parent_2.11</artifactId>
<groupId>com.github.haoch</groupId>
<version>0.2.0-SNAPSHOT</version>
<version>0.2.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -127,4 +127,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
18 changes: 14 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ under the License.
<groupId>com.github.haoch</groupId>
<artifactId>flink-siddhi-parent_2.11</artifactId>

<version>0.2.0-SNAPSHOT</version>
<version>0.2.1-SNAPSHOT</version>
<modules>
<module>core</module>
<module>experimental</module>
Expand All @@ -34,15 +34,15 @@ under the License.
<packaging>pom</packaging>

<properties>
<siddhi.version>4.2.40</siddhi.version>
<siddhi.version>5.1.2</siddhi.version>
<flink.version>1.7.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>

<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.wso2.siddhi</groupId>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-core</artifactId>
<version>${siddhi.version}</version>
<exclusions>
Expand All @@ -53,7 +53,7 @@ under the License.
</exclusions>
</dependency>
<dependency>
<groupId>org.wso2.siddhi</groupId>
<groupId>io.siddhi</groupId>
<artifactId>siddhi-query-api</artifactId>
<version>${siddhi.version}</version>
<exclusions>
Expand Down Expand Up @@ -141,6 +141,16 @@ under the License.
<name>WSO2 Maven2 Repository</name>
<url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
</repository>
<repository>
<id>wso2.releases</id>
<name>WSO2 internal Repository</name>
<url>http://maven.wso2.org/nexus/content/repositories/releases/</url>
<releases>
<enabled>true</enabled>
<updatePolicy>daily</updatePolicy>
<checksumPolicy>ignore</checksumPolicy>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
Expand Down

0 comments on commit 8032082

Please sign in to comment.