Skip to content

Commit

Permalink
add start and stop of stream
Browse files Browse the repository at this point in the history
  • Loading branch information
timonmerk committed Sep 23, 2024
1 parent b137462 commit 6c49866
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 24 deletions.
6 changes: 3 additions & 3 deletions gui_dev/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@
"@vitejs/plugin-react": "^4.3.1",
"@welldone-software/why-did-you-render": "^8.0.3",
"babel-plugin-react-compiler": "latest",
"eslint": "^9.10.0",
"eslint": "^9.11.0",
"eslint-config-prettier": "^9.1.0",
"eslint-plugin-jsdoc": "^50.2.3",
"eslint-plugin-jsdoc": "^50.2.4",
"eslint-plugin-react": "^7.36.1",
"eslint-plugin-react-compiler": "latest",
"eslint-plugin-react-hooks": "^4.6.2",
"eslint-plugin-react-refresh": "^0.4.12",
"prettier": "^3.3.3",
"vite": "^5.4.6"
"vite": "^5.4.7"
}
}
23 changes: 17 additions & 6 deletions gui_dev/src/pages/Dashboard.jsx
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
import { Graph } from "@/components";
import { Box } from "@mui/material";
import { Box, Button } from "@mui/material";
import { useSessionStore } from "@/stores";

export const Dashboard = () => (
<Box>
<Graph />
</Box>
);
export const Dashboard = () => {

const startStream = useSessionStore((state) => state.startStream);
const stopStream = useSessionStore((state) => state.stopStream);

return(
<>
<Button variant="contained" onClick={startStream}> Run stream</Button>
<Button variant="contained" onClick={stopStream}> Stop stream</Button>
<Box>
<Graph />
</Box>
</>
)
}
4 changes: 2 additions & 2 deletions gui_dev/src/pages/Settings/Settings.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ export const Settings = () => {
variant="contained"
component={Link}
color="primary"
to="/decoding"
to="/dashboard"
sx={{ mt: 2 }}
>
Run Stream
Start Stream
</Button>
</Stack>
);
Expand Down
48 changes: 48 additions & 0 deletions gui_dev/src/stores/sessionStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,54 @@ export const useSessionStore = createPersistStore("session", (set, get) => ({
);
},

startStream: async () => {
try {
console.log("Start Stream");

const response = await fetch("/api/stream-control", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
// This needs to be adapted depending on the backend changes
body: JSON.stringify({ "action" : "start"}),
});

if (!response.ok) {
throw new Error(`Failed start stream: ${await response.text()}`);
}

const result = await response.json();
console.log("Stream started:", result);
} catch (error) {
console.error("Failed to start stream:", error);
}
},

stopStream: async () => {
try {
console.log("Stop Stream");

const response = await fetch("/api/stream-control-stop", {
method: "POST",
headers: {
"Content-Type": "application/json",
},
// This needs to be adapted depending on the backend changes
body: JSON.stringify({ "action" : "stop"}),
});

if (!response.ok) {
throw new Error(`Failed stopping stream: ${await response.text()}`);
}

const result = await response.json();
console.log("Stream Stopping:", result);
} catch (error) {
console.error("Failed to stop stream:", error);
}
},

resetSession: () =>
get().setStateAndSync({
sourceType: null,
Expand Down
24 changes: 18 additions & 6 deletions py_neuromodulation/gui/backend/app_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,37 @@ async def update_settings(data: dict):
##### PYNM CONTROL #####
########################

@self.post("/api/stream-control-stop")
async def handle_stream_control_stop(data: dict):
action = data["action"]
self.logger.info("Stopping stream")
if action == "stop":
self.pynm_state.stream_handling_queue.put("stop")
self.pynm_state.stream.stream_handling_queue.put("stop")
return {"message": f"Stream action '{action}' executed"}

@self.post("/api/stream-control")
async def handle_stream_control(data: dict):
action = data["action"]
if action == "start":
# TODO: create out_dir and experiment_name text filds in frontend
self.logger.info("websocket:")
self.logger.info(self.websocket_manager)
# TODO: I cannot interact with stream_state_queue,
# since the async function is really waiting until the stream finished
await self.pynm_state.start_run_function(
out_dir=data["out_dir"],
experiment_name=data["experiment_name"],
websocket_manager=self.websocket_manager,
#out_dir=data["out_dir"],
#experiment_name=data["experiment_name"],
websocket_manager_features=self.websocket_manager,
)

if action == "stop":
if self.pynm_state.stream.is_running is False:
# TODO: if the message starts with ERROR we could show the message in a popup
return {"message": "ERROR: Stream is not running"}
#if self.pynm_state.stream.is_running is False:
# # TODO: if the message starts with ERROR we could show the message in a popup
# return {"message": "ERROR: Stream is not running"}

# initiate stream stop and feature save
self.pynm_state.stream_handling_queue.put("stop")
self.pynm_state.stream.stream_handling_queue.put("stop")

return {"message": f"Stream action '{action}' executed"}
Expand Down
4 changes: 2 additions & 2 deletions py_neuromodulation/gui/backend/app_pynm.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async def start_run_function(
# The stream will then put the results in the queue
# there should be another websocket in which the results are sent to the frontend

stream_handling_queue = Queue()
self.stream_handling_queue = Queue()

self.logger.info("setup stream Process")

Expand All @@ -54,7 +54,7 @@ async def start_run_function(
await self.stream.run(
out_dir=out_dir,
experiment_name=experiment_name,
stream_handling_queue=stream_handling_queue,
stream_handling_queue=self.stream_handling_queue,
is_stream_lsl=self.lsl_stream_name is not None,
stream_lsl_name=self.lsl_stream_name
if self.lsl_stream_name is not None
Expand Down
2 changes: 1 addition & 1 deletion py_neuromodulation/gui/backend/app_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def send_message(self, message: str | dict):
if self.active_connections:
for connection in self.active_connections:
if type(message) is dict:
await connection.send_json(json.dump(message))
await connection.send_json(message)
elif type(message) is str:
await connection.send_text(message)
self.logger.info(f"Message sent")
Expand Down
9 changes: 5 additions & 4 deletions py_neuromodulation/stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,11 @@ async def run(
nm.logger.log_to_file(out_dir)

# Initialize mp.Pool for multiprocessing
self.pool = mp.Pool(processes=self.settings.n_jobs)
#self.pool = mp.Pool(processes=self.settings.n_jobs)
# Set up shared memory for multiprocessing
self.shared_memory = mp.Array(ctypes.c_double, self.settings.n_jobs * self.settings.n_jobs)
#self.shared_memory = mp.Array(ctypes.c_double, self.settings.n_jobs * self.settings.n_jobs)
# Set up multiprocessing semaphores
self.semaphore = mp.Semaphore(self.settings.n_jobs)
#self.semaphore = mp.Semaphore(self.settings.n_jobs)

# Initialize generator
self.generator: Iterator
Expand Down Expand Up @@ -339,7 +339,8 @@ async def run(
if websocket_featues is not None:
nm.logger.info("Sending message to Websocket")
#nm.logger.info(feature_dict)
await websocket_featues.send_message(feature_dict)
#await websocket_featues.send_cbor(feature_dict)
#await websocket_featues.send_message(feature_dict)
self.batch_count += 1
if self.batch_count % self.save_interval == 0:
self.db.commit()
Expand Down

0 comments on commit 6c49866

Please sign in to comment.