Skip to content

Commit

Permalink
Merge pull request #76 from niruhan/master
Browse files Browse the repository at this point in the history
Improve code quality
  • Loading branch information
ramindu90 authored Dec 4, 2019
2 parents 37e1a1d + ccb828b commit 362208b
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.siddhi.core.event.stream.holder.StreamEventClonerHolder;
import io.siddhi.core.event.stream.populater.ComplexEventPopulater;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.exception.SiddhiAppRuntimeException;
import io.siddhi.core.executor.ConstantExpressionExecutor;
import io.siddhi.core.executor.ExpressionExecutor;
import io.siddhi.core.executor.VariableExpressionExecutor;
Expand Down Expand Up @@ -89,7 +90,6 @@
type = {DataType.DOUBLE, DataType.FLOAT, DataType.INT, DataType.LONG},
dynamic = true
)

},
parameterOverloads = {
@ParameterOverload(parameterNames = {"no.of.clusters", "model.feature", "..."}),
Expand Down Expand Up @@ -204,15 +204,12 @@ protected StateFactory<ExtensionState> init(MetaStreamEvent metaStreamEvent,
//validating all the features
featureVariableExpressionExecutors = CoreUtils.extractAndValidateFeatures(inputDefinition,
attributeExpressionExecutors, coordinateStartIndex, dimensionality);

KMeansModel kMeansModel = new KMeansModel();

attributes = new ArrayList<>(1 + dimensionality);
attributes.add(new Attribute("euclideanDistanceToClosestCentroid", Attribute.Type.DOUBLE));
for (int i = 1; i <= dimensionality; i++) {
attributes.add(new Attribute("closestCentroidCoordinate" + i, Attribute.Type.DOUBLE));
}

return () -> new ExtensionState(kMeansModel, dataPoints);
}

Expand All @@ -225,14 +222,13 @@ protected void process(ComplexEventChunk<StreamEvent> complexEventChunk,
synchronized (this) {
while (complexEventChunk.hasNext()) {
StreamEvent streamEvent = complexEventChunk.next();

//validating and getting coordinate values
for (int i = 0; i < dimensionality; i++) {
try {
Number content = (Number) featureVariableExpressionExecutors.get(i).execute(streamEvent);
coordinateValuesOfCurrentDataPoint[i] = content.doubleValue();
} catch (ClassCastException e) {
throw new SiddhiAppCreationException("coordinate values should be int/float/double/long " +
throw new SiddhiAppRuntimeException("coordinate values should be int/float/double/long " +
"but found " +
attributeExpressionExecutors[i].execute(streamEvent).getClass());
}
Expand Down Expand Up @@ -277,7 +273,6 @@ public void stop() {
}

static class ExtensionState extends State {

private static final String KEY_UNTRAINED_DATA = "untrainedData";
private static final String KEY_K_MEANS_MODEL = "kMeansModel";
private KMeansModel kMeansModel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ public class KMeansMiniBatchSPExtension extends StreamProcessor<KMeansMiniBatchS
private ExecutorService executorService;
private List<VariableExpressionExecutor> featureVariableExpressionExecutors = new LinkedList<>();
private static final Logger logger = Logger.getLogger(KMeansMiniBatchSPExtension.class.getName());

private List<Attribute> attributes;

@Override
Expand Down Expand Up @@ -224,7 +223,6 @@ protected void process(ComplexEventChunk<StreamEvent> complexEventChunk,
dimensionality);
extensionState.dataPoints.clear();
}

}
}
nextProcessor.process(complexEventChunk);
Expand Down Expand Up @@ -319,18 +317,13 @@ protected StateFactory<ExtensionState> init(MetaStreamEvent metaStreamEvent,
attributeExpressionExecutors[3].getReturnType());
}
}

dimensionality = attributeExpressionLength - coordinateStartIndex;
coordinateValuesOfCurrentDataPoint = new double[dimensionality];

//validating all the features
featureVariableExpressionExecutors = CoreUtils.extractAndValidateFeatures(inputDefinition,
attributeExpressionExecutors, coordinateStartIndex, dimensionality);

KMeansModel kMeansModel = new KMeansModel();

executorService = siddhiQueryContext.getSiddhiAppContext().getExecutorService();

attributes = new ArrayList<>(1 + dimensionality);
attributes.add(new Attribute("euclideanDistanceToClosestCentroid", Attribute.Type.DOUBLE));
for (int i = 1; i <= dimensionality; i++) {
Expand All @@ -342,7 +335,6 @@ protected StateFactory<ExtensionState> init(MetaStreamEvent metaStreamEvent,
@Override
public List<Attribute> getReturnAttributes() {
return attributes;

}

@Override
Expand All @@ -351,7 +343,6 @@ public ProcessingMode getProcessingMode() {
}

static class ExtensionState extends State {

private static final String KEY_UNTRAINED_DATA = "untrainedData";
private static final String KEY_K_MEANS_MODEL = "kMeansModel";
private static final String KEY_NUMBER_OF_EVENTS_RECEIVED = "numberOfEventsReceived";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ private static void cluster(List<DataPoint> dataPointsArray, KMeansModel model,
}

private static String printClusterList(List<Cluster> clusterList) {
StringBuilder s = new StringBuilder();
for (Cluster c: clusterList) {
s.append(Arrays.toString(c.getCentroid().getCoordinates()));
StringBuilder stringBuilder = new StringBuilder();
for (Cluster clusterIterator: clusterList) {
stringBuilder.append(Arrays.toString(clusterIterator.getCentroid().getCoordinates()));
}
return s.toString();
return stringBuilder.toString();
}

private static void buildModel(List<DataPoint> dataPointsArray, KMeansModel model, int numberOfClusters) {
Expand Down Expand Up @@ -144,7 +144,7 @@ static void updateCluster(List<DataPoint> dataPointsArray, double decayRate, KMe
logger.debug("model at the start of this update : ");
logger.debug(model.getModelInfo());
}
StringBuilder s;
StringBuilder stringBuilder;
List<Cluster> intermediateClusterList = new LinkedList<>();

int iter = 0;
Expand All @@ -154,40 +154,40 @@ static void updateCluster(List<DataPoint> dataPointsArray, double decayRate, KMe
buildModel(dataPointsArray, model, numberOfClusters);
}
if (model.size() == numberOfClusters) {
ArrayList<Cluster> oldClusterList = new ArrayList<>(numberOfClusters);
List<Cluster> oldClusterList = new ArrayList<>(numberOfClusters);
for (int i = 0; i < numberOfClusters; i++) {
DataPoint d = new DataPoint();
DataPoint d1 = new DataPoint();
d.setCoordinates(model.getCoordinatesOfCentroidOfCluster(i));
d1.setCoordinates(model.getCoordinatesOfCentroidOfCluster(i));
Cluster c = new Cluster(d);;
Cluster c1 = new Cluster(d1);
oldClusterList.add(c);
intermediateClusterList.add(c1);
DataPoint newDataPoint1 = new DataPoint();
DataPoint newDataPoint2 = new DataPoint();
newDataPoint1.setCoordinates(model.getCoordinatesOfCentroidOfCluster(i));
newDataPoint2.setCoordinates(model.getCoordinatesOfCentroidOfCluster(i));
Cluster newCluster1 = new Cluster(newDataPoint1);;
Cluster newCluster2 = new Cluster(newDataPoint2);
oldClusterList.add(newCluster1);
intermediateClusterList.add(newCluster2);
}
boolean centroidShifted = false;
while (iter < maximumIterations) {

assignToCluster(dataPointsArray, model);
List<Cluster> newClusterList = calculateNewClusters(model, dimensionality);
centroidShifted = !intermediateClusterList.equals(newClusterList);
if (logger.isDebugEnabled()) {
s = new StringBuilder();
stringBuilder = new StringBuilder();
for (DataPoint c : dataPointsArray) {
s.append(Arrays.toString(c.getCoordinates()));
stringBuilder.append(Arrays.toString(c.getCoordinates()));
}
logger.debug("current iteration : " + iter + "\ndata points array\n" + s.toString() +
"\nCluster list : \n" + printClusterList(intermediateClusterList) +
"\nnew cluster list \n" + printClusterList(newClusterList) + "\nCentroid shifted? = "
+ centroidShifted + "\n");
logger.debug("current iteration : " + iter + "\ndata points array\n"
+ stringBuilder.toString() + "\nCluster list : \n"
+ printClusterList(intermediateClusterList) + "\nnew cluster list \n"
+ printClusterList(newClusterList) + "\nCentroid shifted? = " + centroidShifted + "\n");
}
if (!centroidShifted) {
break;
}
model.setClusterList(newClusterList);
for (int i = 0; i < numberOfClusters; i++) {
Cluster b = newClusterList.get(i);
intermediateClusterList.get(i).getCentroid().setCoordinates(b.getCentroid().getCoordinates());
Cluster currentCluster = newClusterList.get(i);
intermediateClusterList.get(i).getCentroid().setCoordinates(currentCluster.getCentroid()
.getCoordinates());
}
iter++;
}
Expand Down Expand Up @@ -263,11 +263,9 @@ public static Object[] getAssociatedCentroidInfo(DataPoint currentDatapoint, KMe
associatedCluster.getCentroid().getCoordinates());
List<Double> associatedCentroidInfoList = new ArrayList<Double>();
associatedCentroidInfoList.add(minDistance);

for (double x : associatedCluster.getCentroid().getCoordinates()) {
associatedCentroidInfoList.add(x);
for (double coordinate : associatedCluster.getCentroid().getCoordinates()) {
associatedCentroidInfoList.add(coordinate);
}

Object[] associatedCentroidInfo = new Object[associatedCentroidInfoList.size()];
associatedCentroidInfoList.toArray(associatedCentroidInfo);
return associatedCentroidInfo;
Expand All @@ -282,24 +280,23 @@ public static Object[] getAssociatedCentroidInfo(DataPoint currentDatapoint, KMe
private static List<Cluster> calculateNewClusters(KMeansModel model, int dimensionality) {
List<Cluster> newClusterList = new LinkedList<>();

for (Cluster c: model.getClusterList()) {
for (Cluster clusterIterator: model.getClusterList()) {
double[] total;
total = new double[dimensionality];
for (DataPoint d: c.getDataPointsInCluster()) {
double[] coordinatesOfd = d.getCoordinates();
for (DataPoint dataPointIterator: clusterIterator.getDataPointsInCluster()) {
double[] coordinatesOfd = dataPointIterator.getCoordinates();
for (int i = 0; i < dimensionality; i++) {
total[i] += coordinatesOfd[i];
}
}
int numberOfMembers = c.getDataPointsInCluster().size();
int numberOfMembers = clusterIterator.getDataPointsInCluster().size();
for (int i = 0; i < dimensionality; i++) {
total[i] = Math.round((total[i] / numberOfMembers) * 10000.0) / 10000.0;
}

DataPoint d1 = new DataPoint();
d1.setCoordinates(total);
Cluster c1 = new Cluster(d1);
newClusterList.add(c1);
DataPoint newDataPoint = new DataPoint();
newDataPoint.setCoordinates(total);
Cluster newCluster = new Cluster(newDataPoint);
newClusterList.add(newCluster);
}
return newClusterList;
}
Expand Down

0 comments on commit 362208b

Please sign in to comment.