From e705fcc7336483f98da472e810f0c7a3ac2d2f9d Mon Sep 17 00:00:00 2001 From: Suhas-G Date: Tue, 16 Aug 2022 11:45:21 +0200 Subject: [PATCH 1/3] changes to make fsm work --- backend/staticfiles/synthesis/lib/inputs.py | 44 +++++++++++++++++++- backend/staticfiles/synthesis/lib/outputs.py | 31 ++++++++++++++ backend/staticfiles/synthesis/main.py | 26 ++++++++---- 3 files changed, 93 insertions(+), 8 deletions(-) diff --git a/backend/staticfiles/synthesis/lib/inputs.py b/backend/staticfiles/synthesis/lib/inputs.py index b7950a94..3ee1d7ad 100644 --- a/backend/staticfiles/synthesis/lib/inputs.py +++ b/backend/staticfiles/synthesis/lib/inputs.py @@ -1,4 +1,5 @@ -from multiprocessing import shared_memory +from multiprocessing import shared_memory, Condition +from site import ENABLE_USER_SITE import numpy as np from lib.exceptions import InvalidInputNameException @@ -14,9 +15,16 @@ def create_readonly_wire(name): class Inputs: + + ENABLE_NAME = "Enable" + def __init__(self, input_data) -> None: self.inputs = input_data + def _init_enabled(self) -> None: + self._enable_data = self.inputs[Inputs.ENABLE_NAME] if Inputs.ENABLE_NAME in self.inputs else None + self._enable_condition = Condition(self.inputs[Inputs.ENABLE_NAME]["lock"]) if self.enable_wire else None + def read(self, name): if self.inputs.get(name) is None: raise InvalidInputNameException(f"{name} is not declared in inputs") @@ -139,3 +147,37 @@ def read_array(self, name): data = self._read_npy_matrix(name, np.float64) return data + + + @property + def enabled(self) -> bool: + # No enable wire used, so enabled by default + if self._enable_data is None: + return True + + _enabled = self.read_number(Inputs.ENABLE_NAME) + + # If enable wire is present but, not initialized, return False + if _enabled is None: + return False + + return np.isclose(_enabled, np.array([1.0])) + + + + @enabled.setter + def enabled(self, _enabled: bool): + if self._enable_data is None: + return + + self._enable_data["lock"].acquire() + + if self._enable_data.get("created", False): + self._enable_data["data"][0] = 1 if _enabled else 0 + else: + # Wire doesn't exist yet, ideally has to be created and set + # TODO: Is this case necessary? + pass + + self._enable_data["lock"].release() + diff --git a/backend/staticfiles/synthesis/lib/outputs.py b/backend/staticfiles/synthesis/lib/outputs.py index 80bd6884..4fe1d075 100644 --- a/backend/staticfiles/synthesis/lib/outputs.py +++ b/backend/staticfiles/synthesis/lib/outputs.py @@ -36,6 +36,9 @@ def share(self, name, data): if self.outputs.get(name) is None: raise InvalidOutputNameException(f"{name} is not declared in outputs") + # Lock before writing data + self.outputs[name]["lock"].acquire() + # Store the array data in the appropriate variables data = np.array(data) shape = np.array(data.shape) @@ -77,6 +80,9 @@ def share(self, name, data): # Mark output as created self.outputs[name]["created"] = True + # Release lock after writing data + self.outputs[name]["lock"].release() + def _share_npy_matrix(self, name, matrix, shape): dim = np.array([len(matrix.shape)], dtype=np.int64) if self.outputs[name].get("created", False): @@ -103,6 +109,9 @@ def share_image(self, name, image): if self.outputs.get(name) is None: raise InvalidOutputNameException(f"{name} is not declared in outputs") + # Lock before writing data + self.outputs[name]["lock"].acquire() + image = np.array(image, dtype=np.uint8) if len(image.shape) != 2 and len(image.shape) != 3: raise ValueError("Image must be 2D or 3D") @@ -115,10 +124,16 @@ def share_image(self, name, image): shape = np.array(shape, dtype=np.int64) self._share_npy_matrix(name, image, shape) + # Release lock after writing data + self.outputs[name]["lock"].release() + def share_number(self, name, number): if self.outputs.get(name) is None: raise InvalidOutputNameException(f"{name} is not declared in outputs") + # Lock before writing data + self.outputs[name]["lock"].acquire() + if self.outputs[name].get("created", False): self.outputs[name]["data"][:] = number else: @@ -132,10 +147,16 @@ def share_number(self, name, number): self.outputs[name]["data"][:] = number self.outputs[name]["created"] = True + # Release lock after writing data + self.outputs[name]["lock"].release() + def share_string(self, name, string): if self.outputs.get(name) is None: raise InvalidOutputNameException(f"{name} is not declared in outputs") + # Lock before writing data + self.outputs[name]["lock"].acquire() + if self.outputs[name].get("created", False): self.outputs[name]["data"][:] = string else: @@ -149,8 +170,18 @@ def share_string(self, name, string): self.outputs[name]["data"][:] = string self.outputs[name]["created"] = True + # Release lock after writing data + self.outputs[name]["lock"].release() + def share_array(self, name, array): if self.outputs.get(name) is None: raise InvalidOutputNameException(f"{name} is not declared in outputs") + + # Lock before writing data + self.outputs[name]["lock"].acquire() + array = np.array(array, dtype=np.float64) self._share_npy_matrix(name, array, np.array(array.shape, dtype=np.int64)) + + # Release lock after writing data + self.outputs[name]["lock"].release() diff --git a/backend/staticfiles/synthesis/main.py b/backend/staticfiles/synthesis/main.py index aa0ccc53..a53b8155 100644 --- a/backend/staticfiles/synthesis/main.py +++ b/backend/staticfiles/synthesis/main.py @@ -3,8 +3,9 @@ import multiprocessing import random import string -from multiprocessing import shared_memory +from multiprocessing import shared_memory, Lock from time import sleep +from winreg import LoadKey from lib.inputs import Inputs from lib.outputs import Outputs @@ -17,11 +18,12 @@ def clean_shared_memory(names): - all_names = names[:] + all_names = list(names.keys()) all_names.extend([name + "_dim" for name in names]) all_names.extend([name + "_shape" for name in names]) all_names.extend([name + "_type" for name in names]) + # Clean all shared memory for name in all_names: try: shm = shared_memory.SharedMemory(name, create=False) @@ -30,6 +32,13 @@ def clean_shared_memory(names): except FileNotFoundError: pass + # Release all locks + for name in names: + try: + names[name]["lock"].release() + except ValueError: + pass + def main(): """ @@ -44,7 +53,7 @@ def main(): synchronize_frequency = data["synchronize_frequency"] block_data = {} - wire_names = [] + all_wires = {} for wire in wires: source = wire["source"] @@ -61,9 +70,12 @@ def main(): wire_name = "".join( random.choices(string.ascii_uppercase + string.digits, k=10) ) - wire_names.append(wire_name) - output_data = {source["name"]: {"wire": wire_name}} - input_data = {target["name"]: {"wire": wire_name}} + + # If a new wire, add it to dictionary and also keep track of its lock + if wire_name not in all_wires: + all_wires[wire_name] = Lock() + output_data = {source["name"]: {"wire": wire_name, "lock": all_wires[wire_name]}} + input_data = {target["name"]: {"wire": wire_name, "lock": all_wires[wire_name]}} block_data[source["block"]] = block_data.get( source["block"], {"inputs": {}, "outputs": {}, "parameters": {}} ) @@ -109,7 +121,7 @@ def main(): for process in processes: process.terminate() process.join() - clean_shared_memory(wire_names) + clean_shared_memory(all_wires) if __name__ == "__main__": From b78f91d4c0d61169679db187bf05e5dcc346522f Mon Sep 17 00:00:00 2001 From: Suhas-G Date: Tue, 25 Oct 2022 13:38:15 +0200 Subject: [PATCH 2/3] fix: multi fan-in and fan-out of wires --- backend/staticfiles/synthesis/lib/inputs.py | 1 - backend/staticfiles/synthesis/lib/outputs.py | 5 ++++- backend/staticfiles/synthesis/main.py | 23 ++++++++++++-------- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/backend/staticfiles/synthesis/lib/inputs.py b/backend/staticfiles/synthesis/lib/inputs.py index 3ee1d7ad..d7da8849 100644 --- a/backend/staticfiles/synthesis/lib/inputs.py +++ b/backend/staticfiles/synthesis/lib/inputs.py @@ -1,5 +1,4 @@ from multiprocessing import shared_memory, Condition -from site import ENABLE_USER_SITE import numpy as np from lib.exceptions import InvalidInputNameException diff --git a/backend/staticfiles/synthesis/lib/outputs.py b/backend/staticfiles/synthesis/lib/outputs.py index 4fe1d075..7b8cd448 100644 --- a/backend/staticfiles/synthesis/lib/outputs.py +++ b/backend/staticfiles/synthesis/lib/outputs.py @@ -11,7 +11,10 @@ def __init__(self, output_data) -> None: self.shms = [] def _create_wire(self, name, size): - shm = shared_memory.SharedMemory(name=name, create=True, size=size) + try: + shm = shared_memory.SharedMemory(name=name) + except FileNotFoundError: + shm = shared_memory.SharedMemory(name=name, create=True, size=size) self.shms.append(shm) return shm diff --git a/backend/staticfiles/synthesis/main.py b/backend/staticfiles/synthesis/main.py index a53b8155..67f33def 100644 --- a/backend/staticfiles/synthesis/main.py +++ b/backend/staticfiles/synthesis/main.py @@ -5,7 +5,6 @@ import string from multiprocessing import shared_memory, Lock from time import sleep -from winreg import LoadKey from lib.inputs import Inputs from lib.outputs import Outputs @@ -67,22 +66,28 @@ def main(): parameter_data = {param["name"]: param["value"]} block_data[target["block"]]["parameters"].update(parameter_data) else: - wire_name = "".join( - random.choices(string.ascii_uppercase + string.digits, k=10) + block_data[source["block"]] = block_data.get( + source["block"], {"inputs": {}, "outputs": {}, "parameters": {}} + ) + block_data[target["block"]] = block_data.get( + target["block"], {"inputs": {}, "outputs": {}, "parameters": {}} ) + if source["name"] in block_data[source["block"]]["outputs"]: + wire_name = block_data[source["block"]]["outputs"][source["name"]]["wire"] + elif target["name"] in block_data[target["block"]]["inputs"]: + wire_name = block_data[target["block"]]["inputs"][target["name"]]["wire"] + else: + wire_name = "".join( + random.choices(string.ascii_uppercase + string.digits, k=10) + ) + # If a new wire, add it to dictionary and also keep track of its lock if wire_name not in all_wires: all_wires[wire_name] = Lock() output_data = {source["name"]: {"wire": wire_name, "lock": all_wires[wire_name]}} input_data = {target["name"]: {"wire": wire_name, "lock": all_wires[wire_name]}} - block_data[source["block"]] = block_data.get( - source["block"], {"inputs": {}, "outputs": {}, "parameters": {}} - ) block_data[source["block"]]["outputs"].update(output_data) - block_data[target["block"]] = block_data.get( - target["block"], {"inputs": {}, "outputs": {}, "parameters": {}} - ) block_data[target["block"]]["inputs"].update(input_data) From cba4f3234b09b8a1dec5124626278018dcab957a Mon Sep 17 00:00:00 2001 From: Suhas G Date: Thu, 27 Oct 2022 10:39:36 +0200 Subject: [PATCH 3/3] fix exception handler lock release Co-authored-by: Toshan Luktuke <31101072+toshan-luktuke@users.noreply.github.com> --- backend/staticfiles/synthesis/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/staticfiles/synthesis/main.py b/backend/staticfiles/synthesis/main.py index 67f33def..52587889 100644 --- a/backend/staticfiles/synthesis/main.py +++ b/backend/staticfiles/synthesis/main.py @@ -34,7 +34,7 @@ def clean_shared_memory(names): # Release all locks for name in names: try: - names[name]["lock"].release() + names[name].release() except ValueError: pass