Skip to content

Commit

Permalink
Merge pull request #16 from scaleoutsoftware/br-dev
Browse files Browse the repository at this point in the history
Adding shared data interface and workbench implementation
  • Loading branch information
ripleyb authored Mar 14, 2024
2 parents c2b65a3 + ef492b2 commit 783fd0f
Show file tree
Hide file tree
Showing 10 changed files with 274 additions and 5 deletions.
2 changes: 1 addition & 1 deletion Core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

group 'com.scaleoutsoftware.digitaltwin'
version '3.0.4'
version '3.0.6'

sourceCompatibility = JavaVersion.VERSION_12

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.scaleoutsoftware.digitaltwin.core;

public enum CacheOperationStatus {

/**
* The object was successfully retrieved.
*/

ObjectRetrieved,

/**
* The object was successfully added/updated.
*/
ObjectPut,

/**
* The object could not be retrieved because it was not found.
*/
ObjectDoesNotExist,

/**
* The object was removed successfully.
*/
ObjectRemoved,

/**
* The cache was cleared successfully.
*/
CacheCleared
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.scaleoutsoftware.digitaltwin.core;

/**
* Represents a response from a {@link SharedData} operation.
*/
public interface CacheResult {
/**
* Gets the key or null to the object associated with the result.
* @return the key or null.
*/
public String getKey();

/**
* Get the object returned from a Get operation.
* @return the object or null.
*/
public byte[] getValue();

/**
* Gets the status of the cache operation.
* @return the operation status.
*/
CacheOperationStatus getStatus();
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,17 @@ public abstract class ProcessingContext implements Serializable {
* @return the {@link SimulationController} or null if no simulation is running.
*/
public abstract SimulationController getSimulationController();

/**
* Retrieve a {@link SharedData} accessor for this model's shared data.
* @return a {@link SharedData} instance.
*/
public abstract SharedData getSharedModelData();

/**
* Retrieve a {@link SharedData} accessor for globally shared data.
* @return a {@link SharedData} instance.
*/
public abstract SharedData getSharedGlobalData();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.scaleoutsoftware.digitaltwin.core;

public interface SharedData {
/**
* Retrieves an existing object from the cache.
* @return A cache result.
*/
public CacheResult get(String key);

/**
* Put a new key/value mapping into the cache.
* @return a cache result.
*/
public CacheResult put(String key, byte[] value);

/**
* Remove a key/value mapping from the cache.
* @return a cache result.
*/
public CacheResult remove(String key);

/**
* Clear the shared data cache.
* @return a cache result.
*/
public CacheResult clear();
}
2 changes: 1 addition & 1 deletion Development/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies {
testImplementation group: 'junit', name: 'junit', version: '4.12'
implementation group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
// public build configuration
implementation group: 'com.scaleoutsoftware.digitaltwin', name: 'core', version: '3.0.4'
implementation group: 'com.scaleoutsoftware.digitaltwin', name: 'core', version: '3.0.6'

// local build configuration
//implementation fileTree(dir: '..\\Core\\build\\libs\\', include: '*.jar')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class TwinExecutionEngine implements Closeable {
private ConcurrentHashMap<String, Class<?>> _messageProcessorValueTypes;
private ConcurrentHashMap<String, ConcurrentHashMap<String, TwinProxy>> _modelInstances;
private ConcurrentHashMap<String, ConcurrentHashMap<String,AlertProviderConfiguration>> _alertProviders;
private ConcurrentHashMap<String, HashMap<String,byte[]>> _modelsSharedData;
private HashMap<String,byte[]> _globalSharedData;
private Workbench _workbench;
private ConcurrentHashMap<String, SimulationScheduler> _simulationSchedulers;
private ConcurrentHashMap<String, WorkbenchTimerTask> _realTimeTimers;
Expand All @@ -52,6 +54,8 @@ void init( ) {
_simulationProcessors = new ConcurrentHashMap<>();
_messageProcessorValueTypes = new ConcurrentHashMap<>();
_modelInstances = new ConcurrentHashMap<>();
_modelsSharedData = new ConcurrentHashMap<>();
_globalSharedData = new HashMap<>();
_alertProviders = new ConcurrentHashMap<>();
_simulationSchedulers = new ConcurrentHashMap<>();
_realTimeTimers = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -241,6 +245,18 @@ SimulationStep runSimulationStep(SimulationStepArgs args) {
return status;
}

HashMap<String, byte[]> getModelData(String model) {
HashMap<String, byte[]> sharedData = _modelsSharedData.get(model);
if(sharedData == null) sharedData = new HashMap<>();
_modelsSharedData.put(model, sharedData);
return sharedData;
}

HashMap<String, byte[]> getGlobalSharedData() {
return _globalSharedData;
}


public void logMessage(String model, LogMessage message) {
ConcurrentLinkedQueue<LogMessage> prev = _workbench.LOGGED_MESSAGES.get(model);
if(prev == null) {
Expand Down Expand Up @@ -315,7 +331,10 @@ ProcessingResult run(String model, String id, String source, String serializedLi
instance = proxy.getInstance();
}
MessageProcessor mp = _messageProcessors.get(model);
WorkbenchProcessingContext context = new WorkbenchProcessingContext(_workbench._twinExecutionEngine);
HashMap<String, byte[]> sharedData = _modelsSharedData.get(model);
if(sharedData == null) sharedData = new HashMap<>();
_modelsSharedData.put(model, sharedData);
WorkbenchProcessingContext context = new WorkbenchProcessingContext(_workbench._twinExecutionEngine, sharedData, _globalSharedData);
context.reset(model, id, source, instance);
ProcessingResult res = mp.processMessages(context, instance, new WorkbenchMessageListFactory(serializedList, _messageProcessorValueTypes.get(model)));
if(context.forceSave()) res = ProcessingResult.UpdateDigitalTwin;
Expand Down Expand Up @@ -360,7 +379,10 @@ ProcessingResult run(String model, String id, String source, List<Object> messag
instance = proxy.getInstance();
}
MessageProcessor mp = _messageProcessors.get(model);
WorkbenchProcessingContext context = new WorkbenchProcessingContext(_workbench._twinExecutionEngine);
HashMap<String, byte[]> sharedData = _modelsSharedData.get(model);
if(sharedData == null) sharedData = new HashMap<>();
_modelsSharedData.put(model, sharedData);
WorkbenchProcessingContext context = new WorkbenchProcessingContext(_workbench._twinExecutionEngine, sharedData, _globalSharedData);
context.reset(model, id, source, instance);
ProcessingResult res = mp.processMessages(context, instance, new WorkbenchMessageListFactory(messages, _messageProcessorValueTypes.get(model)));
if(context.forceSave()) res = ProcessingResult.UpdateDigitalTwin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@ class WorkbenchProcessingContext extends ProcessingContext {
String _source;
DigitalTwinBase _twinInstance;
SimulationController _controller;
HashMap<String, byte[]> _modelData;
HashMap<String, byte[]> _globalData;
boolean _forceSave;

WorkbenchProcessingContext(TwinExecutionEngine twinExecutionEngine) {
WorkbenchProcessingContext(TwinExecutionEngine twinExecutionEngine, HashMap<String, byte[]> modelSharedData, HashMap<String, byte[]> globalSharedData) {
_twinExecutionEngine = twinExecutionEngine;
_controller = null;
_modelData = modelSharedData;
_globalData = globalSharedData;
}

WorkbenchProcessingContext(TwinExecutionEngine twinExecutionEngine, SimulationController controller) {
Expand All @@ -56,6 +60,8 @@ void reset(String model, String id, String source) {
_id = id;
_forceSave = false;
_source = source;
_modelData = _twinExecutionEngine.getModelData(model);
_globalData = _twinExecutionEngine.getGlobalSharedData();
}

void resetInstance(DigitalTwinBase instance) {
Expand Down Expand Up @@ -210,4 +216,14 @@ public Date getCurrentTime() {
public SimulationController getSimulationController() {
return _controller;
}

@Override
public SharedData getSharedModelData() {
return new WorkbenchSharedData(_modelData);
}

@Override
public SharedData getSharedGlobalData() {
return new WorkbenchSharedData(_globalData);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.scaleoutsoftware.digitaltwin.development;

import com.scaleoutsoftware.digitaltwin.core.CacheOperationStatus;
import com.scaleoutsoftware.digitaltwin.core.CacheResult;
import com.scaleoutsoftware.digitaltwin.core.SharedData;

import java.util.HashMap;

public class WorkbenchSharedData implements SharedData {
private final HashMap<String, byte[]> data;

public WorkbenchSharedData(HashMap<String, byte[]> shared) {
data = shared;
}
@Override
public CacheResult get(String s) {
return new CacheResult() {
@Override
public String getKey() {
return s;
}

@Override
public byte[] getValue() {
return data.getOrDefault(s, null);
}

@Override
public CacheOperationStatus getStatus() {
return data.containsKey(s) ? CacheOperationStatus.ObjectRetrieved : CacheOperationStatus.ObjectDoesNotExist;
}
};
}

@Override
public CacheResult put(String s, byte[] bytes) {
data.put(s, bytes);
return new CacheResult() {
@Override
public String getKey() {
return s;
}

@Override
public byte[] getValue() {
return bytes;
}

@Override
public CacheOperationStatus getStatus() {
return CacheOperationStatus.ObjectPut;
}
};
}

@Override
public CacheResult remove(String s) {
byte[] v = data.remove(s);
return new CacheResult() {
@Override
public String getKey() {
return s;
}

@Override
public byte[] getValue() {
return v;
}

@Override
public CacheOperationStatus getStatus() {
return v == null ? CacheOperationStatus.ObjectDoesNotExist : CacheOperationStatus.ObjectRemoved;
}
};
}

@Override
public CacheResult clear() {
data.clear();
return new CacheResult() {
@Override
public String getKey() {
return null;
}

@Override
public byte[] getValue() {
return null;
}

@Override
public CacheOperationStatus getStatus() {
return CacheOperationStatus.CacheCleared;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,34 @@ public ProcessingResult onTimedMessage(String s, DigitalTwinBase digitalTwinBase
case "StopTimer":
processingContext.stopTimer("timer");
break;
case "SharedData":
SharedData sharedData = processingContext.getSharedModelData();
CacheResult result = sharedData.put("Hello", "Some string...".getBytes(StandardCharsets.UTF_8));
if(result.getStatus() == CacheOperationStatus.ObjectPut) {
System.out.println("Successfully stored object in model storage.");
}
result = sharedData.get("Hello");
if(result.getStatus() == CacheOperationStatus.ObjectRetrieved) {
System.out.println("Successfully retrieved " + new String(result.getValue(), StandardCharsets.UTF_8) + " from model storage.");
}
result = sharedData.remove("Hello");
if(result.getStatus() == CacheOperationStatus.ObjectRemoved) {
System.out.println("Successfully removed " + new String(result.getValue(), StandardCharsets.UTF_8) + " from model storage.");
}
sharedData = processingContext.getSharedGlobalData();
result = sharedData.put("Hello", "Some string...".getBytes(StandardCharsets.UTF_8));
if(result.getStatus() == CacheOperationStatus.ObjectPut) {
System.out.println("Successfully stored object in global storage.");
}
result = sharedData.get("Hello");
if(result.getStatus() == CacheOperationStatus.ObjectRetrieved) {
System.out.println("Successfully retrieved " + new String(result.getValue(), StandardCharsets.UTF_8) + " from global storage.");
}
result = sharedData.remove("Hello");
if(result.getStatus() == CacheOperationStatus.ObjectRemoved) {
System.out.println("Successfully removed " + new String(result.getValue(), StandardCharsets.UTF_8) + " from global storage.");
}
break;
default:
break;
}
Expand Down Expand Up @@ -736,4 +764,16 @@ public void TestWorkbenchGenerateModelSchemaExceptionally() throws Exception {
throw e;
}
}

@Test
public void TestWorkbenchSharedData() throws Exception {
try (Workbench workbench = new Workbench()) {
workbench.addRealTimeModel("Simple", new SimpleMessageProcessor(), SimpleDigitalTwin.class, SimpleMessage.class);
LinkedList<Object> messages = new LinkedList<>();
messages.add(new SimpleMessage("SharedData", 29));
workbench.send("Simple", "23", messages);
} catch (Exception e) {
throw e;
}
}
}

0 comments on commit 783fd0f

Please sign in to comment.