From 8be265b7b580ea75b5a2992bf89d96efd83a8584 Mon Sep 17 00:00:00 2001 From: Brandon Date: Thu, 14 Mar 2024 12:48:08 -0700 Subject: [PATCH 1/3] Adding shared data Adding shared data interfaces Updating processing context interface with new global and model shared data --- .../core/CacheOperationStatus.java | 30 +++++++++++++++++++ .../digitaltwin/core/CacheResult.java | 24 +++++++++++++++ .../digitaltwin/core/ProcessingContext.java | 13 ++++++++ .../digitaltwin/core/SharedData.java | 27 +++++++++++++++++ 4 files changed, 94 insertions(+) create mode 100644 Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/CacheOperationStatus.java create mode 100644 Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/CacheResult.java create mode 100644 Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/SharedData.java diff --git a/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/CacheOperationStatus.java b/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/CacheOperationStatus.java new file mode 100644 index 0000000..8a0a8ab --- /dev/null +++ b/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/CacheOperationStatus.java @@ -0,0 +1,30 @@ +package com.scaleoutsoftware.digitaltwin.core; + +public enum CacheOperationStatus { + + /** + * The object was successfully retrieved. + */ + + ObjectRetrieved, + + /** + * The object was successfully added/updated. + */ + ObjectPut, + + /** + * The object could not be retrieved because it was not found. + */ + ObjectDoesNotExist, + + /** + * The object was removed successfully. + */ + ObjectRemoved, + + /** + * The cache was cleared successfully. + */ + CacheCleared +} diff --git a/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/CacheResult.java b/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/CacheResult.java new file mode 100644 index 0000000..204bb1d --- /dev/null +++ b/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/CacheResult.java @@ -0,0 +1,24 @@ +package com.scaleoutsoftware.digitaltwin.core; + +/** + * Represents a response from a {@link SharedData} operation. + */ +public interface CacheResult { + /** + * Gets the key or null to the object associated with the result. + * @return the key or null. + */ + public String getKey(); + + /** + * Get the object returned from a Get operation. + * @return the object or null. + */ + public byte[] getValue(); + + /** + * Gets the status of the cache operation. + * @return the operation status. + */ + CacheOperationStatus getStatus(); +} diff --git a/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/ProcessingContext.java b/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/ProcessingContext.java index c785eff..676ad4a 100644 --- a/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/ProcessingContext.java +++ b/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/ProcessingContext.java @@ -222,4 +222,17 @@ public abstract class ProcessingContext implements Serializable { * @return the {@link SimulationController} or null if no simulation is running. */ public abstract SimulationController getSimulationController(); + + /** + * Retrieve a {@link SharedData} accessor for this model's shared data. + * @return a {@link SharedData} instance. + */ + public abstract SharedData getSharedModelData(); + + /** + * Retrieve a {@link SharedData} accessor for globally shared data. + * @return a {@link SharedData} instance. + */ + public abstract SharedData getSharedGlobalData(); + } diff --git a/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/SharedData.java b/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/SharedData.java new file mode 100644 index 0000000..42ecac0 --- /dev/null +++ b/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/SharedData.java @@ -0,0 +1,27 @@ +package com.scaleoutsoftware.digitaltwin.core; + +public interface SharedData { + /** + * Retrieves an existing object from the cache. + * @return A cache result. + */ + public CacheResult get(String key); + + /** + * Put a new key/value mapping into the cache. + * @return a cache result. + */ + public CacheResult put(String key, byte[] value); + + /** + * Remove a key/value mapping from the cache. + * @return a cache result. + */ + public CacheResult remove(); + + /** + * Clear the shared data cache. + * @return a cache result. + */ + public CacheResult clear(); +} From 95b447d5aed0d6708fcdc5f8dc8d39764e9816c1 Mon Sep 17 00:00:00 2001 From: Brandon Date: Thu, 14 Mar 2024 12:55:15 -0700 Subject: [PATCH 2/3] Fixing shared data remove interface Fixing shared data remove interface --- Core/build.gradle | 2 +- .../java/com/scaleoutsoftware/digitaltwin/core/SharedData.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Core/build.gradle b/Core/build.gradle index fdb95ca..075bdee 100644 --- a/Core/build.gradle +++ b/Core/build.gradle @@ -3,7 +3,7 @@ plugins { } group 'com.scaleoutsoftware.digitaltwin' -version '3.0.4' +version '3.0.6' sourceCompatibility = JavaVersion.VERSION_12 diff --git a/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/SharedData.java b/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/SharedData.java index 42ecac0..7268adf 100644 --- a/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/SharedData.java +++ b/Core/src/main/java/com/scaleoutsoftware/digitaltwin/core/SharedData.java @@ -17,7 +17,7 @@ public interface SharedData { * Remove a key/value mapping from the cache. * @return a cache result. */ - public CacheResult remove(); + public CacheResult remove(String key); /** * Clear the shared data cache. From 6f5f28ca909fe6845e2fa2be51d98657c3fc04e1 Mon Sep 17 00:00:00 2001 From: Brandon Date: Thu, 14 Mar 2024 13:17:22 -0700 Subject: [PATCH 3/3] Updating development environment Update workbench with shared data implementation --- Development/build.gradle | 4 +- .../development/TwinExecutionEngine.java | 28 +++++- .../WorkbenchProcessingContext.java | 18 +++- .../development/WorkbenchSharedData.java | 97 +++++++++++++++++++ .../development/TestWorkbench.java | 40 ++++++++ 5 files changed, 181 insertions(+), 6 deletions(-) create mode 100644 Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/WorkbenchSharedData.java diff --git a/Development/build.gradle b/Development/build.gradle index f6d33ca..7aef498 100644 --- a/Development/build.gradle +++ b/Development/build.gradle @@ -4,7 +4,7 @@ plugins { } group 'com.scaleoutsoftware.digitaltwin' -version '3.0.5' +version '3.0.6' sourceCompatibility = JavaVersion.VERSION_12 @@ -20,7 +20,7 @@ dependencies { testImplementation group: 'junit', name: 'junit', version: '4.12' implementation group: 'com.google.code.gson', name: 'gson', version: '2.8.5' // public build configuration - implementation group: 'com.scaleoutsoftware.digitaltwin', name: 'core', version: '3.0.4' + implementation group: 'com.scaleoutsoftware.digitaltwin', name: 'core', version: '3.0.6' // local build configuration //implementation fileTree(dir: '..\\Core\\build\\libs\\', include: '*.jar') diff --git a/Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/TwinExecutionEngine.java b/Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/TwinExecutionEngine.java index 385049a..6c02378 100644 --- a/Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/TwinExecutionEngine.java +++ b/Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/TwinExecutionEngine.java @@ -34,6 +34,8 @@ class TwinExecutionEngine implements Closeable { private ConcurrentHashMap> _messageProcessorValueTypes; private ConcurrentHashMap> _modelInstances; private ConcurrentHashMap> _alertProviders; + private ConcurrentHashMap> _modelsSharedData; + private HashMap _globalSharedData; private Workbench _workbench; private ConcurrentHashMap _simulationSchedulers; private ConcurrentHashMap _realTimeTimers; @@ -52,7 +54,9 @@ void init( ) { _simulationProcessors = new ConcurrentHashMap<>(); _messageProcessorValueTypes = new ConcurrentHashMap<>(); _modelInstances = new ConcurrentHashMap<>(); - _alertProviders = new ConcurrentHashMap<>(); + _modelsSharedData = new ConcurrentHashMap<>(); + _globalSharedData = new HashMap<>(); + _alertProviders = new ConcurrentHashMap<>(); _simulationSchedulers = new ConcurrentHashMap<>(); _realTimeTimers = new ConcurrentHashMap<>(); _gson = new Gson(); @@ -242,6 +246,18 @@ SimulationStep runSimulationStep(SimulationStepArgs args) { return status; } + HashMap getModelData(String model) { + HashMap sharedData = _modelsSharedData.get(model); + if(sharedData == null) sharedData = new HashMap<>(); + _modelsSharedData.put(model, sharedData); + return sharedData; + } + + HashMap getGlobalSharedData() { + return _globalSharedData; + } + + public void logMessage(String model, LogMessage message) { ConcurrentLinkedQueue prev = _workbench.LOGGED_MESSAGES.get(model); if(prev == null) { @@ -315,7 +331,10 @@ ProcessingResult run(String model, String id, String source, String serializedLi instance = proxy.getInstance(); } MessageProcessor mp = _messageProcessors.get(model); - WorkbenchProcessingContext context = new WorkbenchProcessingContext(_workbench._twinExecutionEngine); + HashMap sharedData = _modelsSharedData.get(model); + if(sharedData == null) sharedData = new HashMap<>(); + _modelsSharedData.put(model, sharedData); + WorkbenchProcessingContext context = new WorkbenchProcessingContext(_workbench._twinExecutionEngine, sharedData, _globalSharedData); context.reset(model, id, source, instance); ProcessingResult res = mp.processMessages(context, instance, new WorkbenchMessageListFactory(serializedList, _messageProcessorValueTypes.get(model))); if(context.forceSave()) res = ProcessingResult.UpdateDigitalTwin; @@ -360,7 +379,10 @@ ProcessingResult run(String model, String id, String source, List messag instance = proxy.getInstance(); } MessageProcessor mp = _messageProcessors.get(model); - WorkbenchProcessingContext context = new WorkbenchProcessingContext(_workbench._twinExecutionEngine); + HashMap sharedData = _modelsSharedData.get(model); + if(sharedData == null) sharedData = new HashMap<>(); + _modelsSharedData.put(model, sharedData); + WorkbenchProcessingContext context = new WorkbenchProcessingContext(_workbench._twinExecutionEngine, sharedData, _globalSharedData); context.reset(model, id, source, instance); ProcessingResult res = mp.processMessages(context, instance, new WorkbenchMessageListFactory(messages, _messageProcessorValueTypes.get(model))); if(context.forceSave()) res = ProcessingResult.UpdateDigitalTwin; diff --git a/Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/WorkbenchProcessingContext.java b/Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/WorkbenchProcessingContext.java index f08c7fc..8905ce9 100644 --- a/Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/WorkbenchProcessingContext.java +++ b/Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/WorkbenchProcessingContext.java @@ -31,11 +31,15 @@ class WorkbenchProcessingContext extends ProcessingContext { String _source; DigitalTwinBase _twinInstance; SimulationController _controller; + HashMap _modelData; + HashMap _globalData; boolean _forceSave; - WorkbenchProcessingContext(TwinExecutionEngine twinExecutionEngine) { + WorkbenchProcessingContext(TwinExecutionEngine twinExecutionEngine, HashMap modelSharedData, HashMap globalSharedData) { _twinExecutionEngine = twinExecutionEngine; _controller = null; + _modelData = modelSharedData; + _globalData = globalSharedData; } WorkbenchProcessingContext(TwinExecutionEngine twinExecutionEngine, SimulationController controller) { @@ -56,6 +60,8 @@ void reset(String model, String id, String source) { _id = id; _forceSave = false; _source = source; + _modelData = _twinExecutionEngine.getModelData(model); + _globalData = _twinExecutionEngine.getGlobalSharedData(); } void resetInstance(DigitalTwinBase instance) { @@ -210,4 +216,14 @@ public Date getCurrentTime() { public SimulationController getSimulationController() { return _controller; } + + @Override + public SharedData getSharedModelData() { + return new WorkbenchSharedData(_modelData); + } + + @Override + public SharedData getSharedGlobalData() { + return new WorkbenchSharedData(_globalData); + } } diff --git a/Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/WorkbenchSharedData.java b/Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/WorkbenchSharedData.java new file mode 100644 index 0000000..ffee258 --- /dev/null +++ b/Development/src/main/java/com/scaleoutsoftware/digitaltwin/development/WorkbenchSharedData.java @@ -0,0 +1,97 @@ +package com.scaleoutsoftware.digitaltwin.development; + +import com.scaleoutsoftware.digitaltwin.core.CacheOperationStatus; +import com.scaleoutsoftware.digitaltwin.core.CacheResult; +import com.scaleoutsoftware.digitaltwin.core.SharedData; + +import java.util.HashMap; + +public class WorkbenchSharedData implements SharedData { + private final HashMap data; + + public WorkbenchSharedData(HashMap shared) { + data = shared; + } + @Override + public CacheResult get(String s) { + return new CacheResult() { + @Override + public String getKey() { + return s; + } + + @Override + public byte[] getValue() { + return data.getOrDefault(s, null); + } + + @Override + public CacheOperationStatus getStatus() { + return data.containsKey(s) ? CacheOperationStatus.ObjectRetrieved : CacheOperationStatus.ObjectDoesNotExist; + } + }; + } + + @Override + public CacheResult put(String s, byte[] bytes) { + data.put(s, bytes); + return new CacheResult() { + @Override + public String getKey() { + return s; + } + + @Override + public byte[] getValue() { + return bytes; + } + + @Override + public CacheOperationStatus getStatus() { + return CacheOperationStatus.ObjectPut; + } + }; + } + + @Override + public CacheResult remove(String s) { + byte[] v = data.remove(s); + return new CacheResult() { + @Override + public String getKey() { + return s; + } + + @Override + public byte[] getValue() { + return v; + } + + @Override + public CacheOperationStatus getStatus() { + return v == null ? CacheOperationStatus.ObjectDoesNotExist : CacheOperationStatus.ObjectRemoved; + } + }; + } + + @Override + public CacheResult clear() { + data.clear(); + return new CacheResult() { + @Override + public String getKey() { + return null; + } + + @Override + public byte[] getValue() { + return null; + } + + @Override + public CacheOperationStatus getStatus() { + return CacheOperationStatus.CacheCleared; + } + }; + } +} diff --git a/Development/src/test/java/com/scaleoutsoftware/digitaltwin/development/TestWorkbench.java b/Development/src/test/java/com/scaleoutsoftware/digitaltwin/development/TestWorkbench.java index 1be1f3c..34ec72f 100644 --- a/Development/src/test/java/com/scaleoutsoftware/digitaltwin/development/TestWorkbench.java +++ b/Development/src/test/java/com/scaleoutsoftware/digitaltwin/development/TestWorkbench.java @@ -93,6 +93,34 @@ public ProcessingResult onTimedMessage(String s, DigitalTwinBase digitalTwinBase case "StopTimer": processingContext.stopTimer("timer"); break; + case "SharedData": + SharedData sharedData = processingContext.getSharedModelData(); + CacheResult result = sharedData.put("Hello", "Some string...".getBytes(StandardCharsets.UTF_8)); + if(result.getStatus() == CacheOperationStatus.ObjectPut) { + System.out.println("Successfully stored object in model storage."); + } + result = sharedData.get("Hello"); + if(result.getStatus() == CacheOperationStatus.ObjectRetrieved) { + System.out.println("Successfully retrieved " + new String(result.getValue(), StandardCharsets.UTF_8) + " from model storage."); + } + result = sharedData.remove("Hello"); + if(result.getStatus() == CacheOperationStatus.ObjectRemoved) { + System.out.println("Successfully removed " + new String(result.getValue(), StandardCharsets.UTF_8) + " from model storage."); + } + sharedData = processingContext.getSharedGlobalData(); + result = sharedData.put("Hello", "Some string...".getBytes(StandardCharsets.UTF_8)); + if(result.getStatus() == CacheOperationStatus.ObjectPut) { + System.out.println("Successfully stored object in global storage."); + } + result = sharedData.get("Hello"); + if(result.getStatus() == CacheOperationStatus.ObjectRetrieved) { + System.out.println("Successfully retrieved " + new String(result.getValue(), StandardCharsets.UTF_8) + " from global storage."); + } + result = sharedData.remove("Hello"); + if(result.getStatus() == CacheOperationStatus.ObjectRemoved) { + System.out.println("Successfully removed " + new String(result.getValue(), StandardCharsets.UTF_8) + " from global storage."); + } + break; default: break; } @@ -736,4 +764,16 @@ public void TestWorkbenchGenerateModelSchemaExceptionally() throws Exception { throw e; } } + + @Test + public void TestWorkbenchSharedData() throws Exception { + try (Workbench workbench = new Workbench()) { + workbench.addRealTimeModel("Simple", new SimpleMessageProcessor(), SimpleDigitalTwin.class, SimpleMessage.class); + LinkedList messages = new LinkedList<>(); + messages.add(new SimpleMessage("SharedData", 29)); + workbench.send("Simple", "23", messages); + } catch (Exception e) { + throw e; + } + } }