Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Locks and Multi fan-in and fan-out #195

Merged
merged 4 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion backend/staticfiles/synthesis/lib/inputs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from multiprocessing import shared_memory
from multiprocessing import shared_memory, Condition

import numpy as np
from lib.exceptions import InvalidInputNameException
Expand All @@ -14,9 +14,16 @@ def create_readonly_wire(name):


class Inputs:

ENABLE_NAME = "Enable"

def __init__(self, input_data) -> None:
self.inputs = input_data

def _init_enabled(self) -> None:
self._enable_data = self.inputs[Inputs.ENABLE_NAME] if Inputs.ENABLE_NAME in self.inputs else None
self._enable_condition = Condition(self.inputs[Inputs.ENABLE_NAME]["lock"]) if self.enable_wire else None

def read(self, name):
if self.inputs.get(name) is None:
raise InvalidInputNameException(f"{name} is not declared in inputs")
Expand Down Expand Up @@ -139,3 +146,37 @@ def read_array(self, name):

data = self._read_npy_matrix(name, np.float64)
return data


@property
def enabled(self) -> bool:
# No enable wire used, so enabled by default
if self._enable_data is None:
return True

_enabled = self.read_number(Inputs.ENABLE_NAME)

# If enable wire is present but, not initialized, return False
if _enabled is None:
return False

return np.isclose(_enabled, np.array([1.0]))



@enabled.setter
def enabled(self, _enabled: bool):
if self._enable_data is None:
return

self._enable_data["lock"].acquire()

if self._enable_data.get("created", False):
self._enable_data["data"][0] = 1 if _enabled else 0
else:
# Wire doesn't exist yet, ideally has to be created and set
# TODO: Is this case necessary?
pass

self._enable_data["lock"].release()

36 changes: 35 additions & 1 deletion backend/staticfiles/synthesis/lib/outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ def __init__(self, output_data) -> None:
self.shms = []

def _create_wire(self, name, size):
shm = shared_memory.SharedMemory(name=name, create=True, size=size)
try:
shm = shared_memory.SharedMemory(name=name)
except FileNotFoundError:
shm = shared_memory.SharedMemory(name=name, create=True, size=size)
self.shms.append(shm)
return shm

Expand All @@ -36,6 +39,9 @@ def share(self, name, data):
if self.outputs.get(name) is None:
raise InvalidOutputNameException(f"{name} is not declared in outputs")

# Lock before writing data
self.outputs[name]["lock"].acquire()

# Store the array data in the appropriate variables
data = np.array(data)
shape = np.array(data.shape)
Expand Down Expand Up @@ -77,6 +83,9 @@ def share(self, name, data):
# Mark output as created
self.outputs[name]["created"] = True

# Release lock after writing data
self.outputs[name]["lock"].release()

def _share_npy_matrix(self, name, matrix, shape):
dim = np.array([len(matrix.shape)], dtype=np.int64)
if self.outputs[name].get("created", False):
Expand All @@ -103,6 +112,9 @@ def share_image(self, name, image):
if self.outputs.get(name) is None:
raise InvalidOutputNameException(f"{name} is not declared in outputs")

# Lock before writing data
self.outputs[name]["lock"].acquire()

image = np.array(image, dtype=np.uint8)
if len(image.shape) != 2 and len(image.shape) != 3:
raise ValueError("Image must be 2D or 3D")
Expand All @@ -115,10 +127,16 @@ def share_image(self, name, image):
shape = np.array(shape, dtype=np.int64)
self._share_npy_matrix(name, image, shape)

# Release lock after writing data
self.outputs[name]["lock"].release()

def share_number(self, name, number):
if self.outputs.get(name) is None:
raise InvalidOutputNameException(f"{name} is not declared in outputs")

# Lock before writing data
self.outputs[name]["lock"].acquire()

if self.outputs[name].get("created", False):
self.outputs[name]["data"][:] = number
else:
Expand All @@ -132,10 +150,16 @@ def share_number(self, name, number):
self.outputs[name]["data"][:] = number
self.outputs[name]["created"] = True

# Release lock after writing data
self.outputs[name]["lock"].release()

def share_string(self, name, string):
if self.outputs.get(name) is None:
raise InvalidOutputNameException(f"{name} is not declared in outputs")

# Lock before writing data
self.outputs[name]["lock"].acquire()

if self.outputs[name].get("created", False):
self.outputs[name]["data"][:] = string
else:
Expand All @@ -149,8 +173,18 @@ def share_string(self, name, string):
self.outputs[name]["data"][:] = string
self.outputs[name]["created"] = True

# Release lock after writing data
self.outputs[name]["lock"].release()

def share_array(self, name, array):
if self.outputs.get(name) is None:
raise InvalidOutputNameException(f"{name} is not declared in outputs")

# Lock before writing data
self.outputs[name]["lock"].acquire()

array = np.array(array, dtype=np.float64)
self._share_npy_matrix(name, array, np.array(array.shape, dtype=np.int64))

# Release lock after writing data
self.outputs[name]["lock"].release()
39 changes: 28 additions & 11 deletions backend/staticfiles/synthesis/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import multiprocessing
import random
import string
from multiprocessing import shared_memory
from multiprocessing import shared_memory, Lock
from time import sleep

from lib.inputs import Inputs
Expand All @@ -17,11 +17,12 @@


def clean_shared_memory(names):
all_names = names[:]
all_names = list(names.keys())
all_names.extend([name + "_dim" for name in names])
all_names.extend([name + "_shape" for name in names])
all_names.extend([name + "_type" for name in names])

# Clean all shared memory
for name in all_names:
try:
shm = shared_memory.SharedMemory(name, create=False)
Expand All @@ -30,6 +31,13 @@ def clean_shared_memory(names):
except FileNotFoundError:
pass

# Release all locks
for name in names:
try:
names[name].release()
except ValueError:
pass


def main():
"""
Expand All @@ -44,7 +52,7 @@ def main():
synchronize_frequency = data["synchronize_frequency"]

block_data = {}
wire_names = []
all_wires = {}

for wire in wires:
source = wire["source"]
Expand All @@ -58,19 +66,28 @@ def main():
parameter_data = {param["name"]: param["value"]}
block_data[target["block"]]["parameters"].update(parameter_data)
else:
wire_name = "".join(
random.choices(string.ascii_uppercase + string.digits, k=10)
)
wire_names.append(wire_name)
output_data = {source["name"]: {"wire": wire_name}}
input_data = {target["name"]: {"wire": wire_name}}
block_data[source["block"]] = block_data.get(
source["block"], {"inputs": {}, "outputs": {}, "parameters": {}}
)
block_data[source["block"]]["outputs"].update(output_data)
block_data[target["block"]] = block_data.get(
target["block"], {"inputs": {}, "outputs": {}, "parameters": {}}
)

if source["name"] in block_data[source["block"]]["outputs"]:
wire_name = block_data[source["block"]]["outputs"][source["name"]]["wire"]
elif target["name"] in block_data[target["block"]]["inputs"]:
wire_name = block_data[target["block"]]["inputs"][target["name"]]["wire"]
else:
wire_name = "".join(
random.choices(string.ascii_uppercase + string.digits, k=10)
)

# If a new wire, add it to dictionary and also keep track of its lock
if wire_name not in all_wires:
all_wires[wire_name] = Lock()
output_data = {source["name"]: {"wire": wire_name, "lock": all_wires[wire_name]}}
input_data = {target["name"]: {"wire": wire_name, "lock": all_wires[wire_name]}}
block_data[source["block"]]["outputs"].update(output_data)
block_data[target["block"]]["inputs"].update(input_data)


Expand Down Expand Up @@ -109,7 +126,7 @@ def main():
for process in processes:
process.terminate()
process.join()
clean_shared_memory(wire_names)
clean_shared_memory(all_wires)


if __name__ == "__main__":
Expand Down