Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session Orchestrator Updates [Post-Testing] #344

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bcipy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
TRIGGER_FILENAME = 'triggers.txt'
SESSION_DATA_FILENAME = 'session.json'
SESSION_SUMMARY_FILENAME = 'session.xlsx'
LOG_FILENAME = 'bcipy_system_log.txt'
LOG_FILENAME = 'session_log.txt'
STIMULI_POSITIONS_FILENAME = 'stimuli_positions.json'

# misc configuration
Expand Down
6 changes: 4 additions & 2 deletions bcipy/helpers/system_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def configure_logger(
save_folder: str,
log_name=LOG_FILENAME,
log_level=logging.INFO,
version=None) -> None:
version=None) -> logging.Logger:
"""Configure Logger.

Does what it says.
Expand All @@ -220,7 +220,9 @@ def configure_logger(
print(f'Printing all BciPy logs to: {path_to_logs}')

if version:
logging.info(f'Start of Session for BciPy Version: ({version})')
root_logger.info(f'Start of Session for BciPy Version: ({version})')

return root_logger


def import_submodules(package, recursive=True):
Expand Down
32 changes: 7 additions & 25 deletions bcipy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@

from bcipy.config import (DEFAULT_EXPERIMENT_ID, DEFAULT_PARAMETERS_PATH,
STATIC_AUDIO_PATH)
from bcipy.helpers.language_model import init_language_model
from bcipy.helpers.load import (load_experiments, load_json_parameters,
load_signal_models)
from bcipy.helpers.load import (load_experiments, load_json_parameters)
from bcipy.helpers.save import init_save_data_structure
from bcipy.helpers.stimuli import play_sound
from bcipy.helpers.system_utils import configure_logger, get_system_info
Expand Down Expand Up @@ -82,12 +80,13 @@ def bci_main(
experiment_id=experiment)

# configure bcipy session logging
configure_logger(save_folder,
version=sys_info['bcipy_version'])
logger = configure_logger(
save_folder,
version=sys_info['bcipy_version'])

log.info(sys_info)

if execute_task(task, parameters, save_folder, alert, fake):
if execute_task(task, parameters, save_folder, logger, alert, fake):
if visualize:

# Visualize session data and fail silently if it errors
Expand All @@ -104,6 +103,7 @@ def execute_task(
task: Type[Task],
parameters: dict,
save_folder: str,
logger: logging.Logger,
alert: bool,
fake: bool) -> bool:
"""Execute Task.
Expand All @@ -122,31 +122,13 @@ def execute_task(
Returns:
(bool): True if the task was successfully executed, False otherwise
"""
signal_models = []
language_model = None

# Init EEG Model, if needed. Calibration Tasks Don't require probabilistic
# modules to be loaded.
if task not in task_registry.calibration_tasks():
# Try loading in our signal_model and starting a langmodel(if enabled)
if not fake:
try:
model_dir = parameters['signal_model_path']
signal_models = load_signal_models(directory=model_dir)
assert signal_models, f"No signal models found in {model_dir}"
except Exception as error:
log.exception(f'Cannot load signal model. Exiting. {error}')
raise error

language_model = init_language_model(parameters)

# Start Task
try:
start_task(task,
parameters,
save_folder,
language_model=language_model,
signal_models=signal_models,
logger,
fake=fake)

# If exception, close all display and acquisition objects
Expand Down
3 changes: 3 additions & 0 deletions bcipy/orchestrator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from bcipy.orchestrator.orchestrator import SessionOrchestrator

__all__ = ['SessionOrchestrator']
43 changes: 18 additions & 25 deletions bcipy/orchestrator/demo/demo_orchestrator.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,26 @@
from bcipy.config import DEFAULT_EXPERIMENT_ID, DEFAULT_PARAMETER_FILENAME
from bcipy.helpers.load import load_experimental_data
from bcipy.orchestrator.orchestrator import SessionOrchestrator
from bcipy.task.actions import (ExperimentFieldCollectionAction,
OfflineAnalysisAction)
from bcipy.config import DEFAULT_PARAMETERS_PATH
from bcipy.orchestrator import SessionOrchestrator
from bcipy.task.actions import (OfflineAnalysisAction)
from bcipy.task.paradigm.rsvp import RSVPCalibrationTask, RSVPCopyPhraseTask, RSVPTimingVerificationCalibration
from bcipy.task.paradigm.matrix import MatrixCalibrationTask


def demo_orchestrator(data_path: str, parameters_path: str) -> None:
def demo_orchestrator(parameters_path: str) -> None:
"""Demo the SessionOrchestrator.

This function demonstrates how to use the SessionOrchestrator to execute actions.

The action in this case is an OfflineAnalysisAction, which will analyze the data in a given directory.
"""
field_collection = ExperimentFieldCollectionAction(DEFAULT_EXPERIMENT_ID, data_path)
offline_analysis = OfflineAnalysisAction(data_path, parameters_path)
orchestrator = SessionOrchestrator()
orchestrator.add_task(field_collection)
orchestrator.add_task(offline_analysis)
# field_collection = ExperimentFieldCollectionAction(DEFAULT_EXPERIMENT_ID, data_path)
# offline_analysis = OfflineAnalysisAction(data_path, parameters_path)
tasks = [RSVPCalibrationTask, OfflineAnalysisAction]
orchestrator = SessionOrchestrator(parameters_path=parameters_path, fake=True)
orchestrator.add_task(RSVPTimingVerificationCalibration)
orchestrator.add_task(RSVPCalibrationTask)
orchestrator.add_task(MatrixCalibrationTask)
orchestrator.add_task(RSVPCopyPhraseTask)
orchestrator.add_task(OfflineAnalysisAction)
orchestrator.execute()


Expand All @@ -25,24 +29,13 @@ def demo_orchestrator(data_path: str, parameters_path: str) -> None:
import argparse

parser = argparse.ArgumentParser(description="Demo the SessionOrchestrator")
parser.add_argument(
'-d',
'--data_path',
help='Path to the data directory. If none provided, a GUI will open and prompt a choice.',
default=None)
parser.add_argument(
'-p',
'--parameters_path',
help='Path to the parameters file to use for training. If none provided, data path will be used.',
default=None)
default=DEFAULT_PARAMETERS_PATH)
args = parser.parse_args()
data_path = args.data_path
parameters_path = args.parameters_path

if not data_path:
data_path = load_experimental_data()

if not parameters_path:
parameters_path = f'{data_path}/{DEFAULT_PARAMETER_FILENAME}'
parameters_path = f'{args.parameters_path}'

demo_orchestrator(data_path, parameters_path)
demo_orchestrator(parameters_path)
102 changes: 62 additions & 40 deletions bcipy/orchestrator/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
from datetime import datetime
import logging
from logging import Logger
from typing import List, Optional, Union
from typing import List, Type

from bcipy.helpers.parameters import Parameters
from bcipy.helpers.validate import validate_experiment
from bcipy.helpers.system_utils import get_system_info
from bcipy.task import Task
from bcipy.helpers.system_utils import get_system_info, configure_logger
from bcipy.task import Task, TaskData
from bcipy.config import DEFAULT_EXPERIMENT_ID, DEFAULT_PARAMETERS_PATH, DEFAULT_USER_ID
from bcipy.signal.model import SignalModel
from bcipy.language.main import LanguageModel
from bcipy.helpers.load import load_json_parameters

"""
Expand All @@ -23,74 +21,97 @@
experiment ID, user ID, and parameters file. Tasks are added to the orchestrator, which are then executed in order.
"""

# Session Orchestrator Needs:
# - A way to initialize the session (user, experiment, tasks, parameters, models, system info, log, save folder)
# - save folder is not used in execute method and could be from a provided argument or from the parameters?
# - A way to save the session data


class SessionOrchestrator:
tasks: List[Task]
models: List[Union[SignalModel, LanguageModel]]
tasks: List[Type[Task]]
task_names: List[str]
parameters: Parameters
sys_info: dict
log: Logger
save_folder: Optional[str] = None
# This will need to be refactored to a more complex data structure to store data from each task
session_data: List[str]
# Session Orchestrator will contain global objects here (DAQ, models etc) to be shared between executed tasks.
save_folder: str
session_data: List[TaskData]
ready_to_execute: bool = False
last_task_dir: str

def __init__(
self,
experiment_id: str = DEFAULT_EXPERIMENT_ID,
user: str = DEFAULT_USER_ID,
parameters_path: str = DEFAULT_PARAMETERS_PATH,
fake: bool = False
) -> None:
validate_experiment(experiment_id)
self.parameters_path = (
parameters_path
)
self.parameters = load_json_parameters(parameters_path, True)
self.parameters_path = parameters_path
self.parameters = load_json_parameters(parameters_path, value_cast=True)
self.user = user
self.fake = fake
self.experiment_id = experiment_id
self.log = logging.getLogger(__name__)
self.sys_info = get_system_info()
self.tasks = []
self.task_names = []
self.session_data = []
self.init_orchestrator_save_folder(self.parameters["data_save_loc"])

self.ready_to_execute = False
self.logger = configure_logger(
self.save_folder,
'protocol_log.txt', # TODO: move to config
logging.DEBUG,
self.sys_info['bcipy_version'])

self.ready_to_execute = True

def add_task(self, task: Task) -> None:
# Loading task specific parameters could happen here
# TODO validate it is a Valid Task
def add_task(self, task: Type[Task]) -> None:
self.tasks.append(task)
self.task_names.append(task.name)

def execute(self) -> None:
"""Executes queued tasks in order"""

# TODO add error handling for exceptions (like
# TaskConfigurationException), allowing the orchestrator to continue and
# log the errors.
if not self.ready_to_execute:
msg = "Orchestrator not ready to execute tasks"
self.log.error(msg)
raise Exception(msg)

for task in self.tasks:
data_save_location = self.init_task_save_folder(task)
self.session_data.append(data_save_location)
task.setup(self.parameters, data_save_location)
task.execute()
task.cleanup()
try:
# initialize the task save folder and logger
data_save_location = self.init_task_save_folder(task)
session_logger = configure_logger(
data_save_location,
log_level=logging.DEBUG,
version=self.sys_info['bcipy_version'])

# initialize the task and execute it
initialized_task: Task = task(
self.parameters,
data_save_location,
session_logger,
fake=self.fake,
experiment_id=self.experiment_id,
parameters_path=self.parameters_path,
last_task_dir=self.last_task_dir)
task_data = initialized_task.execute()
self.session_data.append(task_data)
self.logger.info(f"Task {task.name} completed successfully")
# some tasks may need access to the previous task's data
self.last_task_dir = data_save_location
except Exception as e:
self.logger.exception(e)
self.save()

def init_orchestrator_save_folder(self, save_path: str) -> None:
timestamp = datetime.now().strftime("%Y-%m-%d %H-%M")
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# * No '/' after `save_folder` since it is included in
# * `data_save_location` in parameters
path = f'{save_path}{self.experiment_id}/{self.user}/orchestrator-run-{timestamp}/'
path = f'{save_path}{self.user}/{self.experiment_id}/{timestamp}/'
os.makedirs(path)
os.makedirs(os.path.join(path, 'logs'), exist_ok=True)
self.save_folder = path

def init_task_save_folder(self, task: Task) -> str:
def init_task_save_folder(self, task: Type[Task]) -> str:
assert self.save_folder is not None, "Orchestrator save folder not initialized"
save_directory = self.save_folder + f'{self.user}_{task.name}/'
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_directory = self.save_folder + f'{task.name}_{timestamp}/'
try:
# make a directory to save task data to
os.makedirs(save_directory)
Expand All @@ -102,9 +123,10 @@ def init_task_save_folder(self, task: Task) -> str:
return save_directory

def save(self) -> None:
# Save the session data
system_info = get_system_info()
with open(f'{self.save_folder}/session_data.json', 'w') as f:
# Save the protocol data
with open(f'{self.save_folder}/protocol.json', 'w') as f:
f.write(json.dumps({
'tasks': self.task_names,
'parameters': self.parameters_path,
'system_info': self.sys_info,
}))
File renamed without changes.
2 changes: 1 addition & 1 deletion bcipy/orchestrator/tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import unittest
from bcipy.orchestrator.config import parse_protocol, serialize_protocol, validate_protocol_string
from bcipy.orchestrator.protocol import parse_protocol, serialize_protocol, validate_protocol_string
from bcipy.task.actions import OfflineAnalysisAction
from bcipy.task.paradigm.rsvp.calibration.calibration import RSVPCalibrationTask
from bcipy.task.paradigm.rsvp.copy_phrase import RSVPCopyPhraseTask
Expand Down
1 change: 1 addition & 0 deletions bcipy/orchestrator/tests/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def test_orchestrator_invalid_experiment(self) -> None:

def test_orchestrator_queues_task(self) -> None:
task = Mock(spec=Task)
task.name = "test task"
orchestrator = SessionOrchestrator()
assert len(orchestrator.tasks) == 0
orchestrator.add_task(task)
Expand Down
3 changes: 0 additions & 3 deletions bcipy/signal/model/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
from bcipy.signal.model.base_model import SignalModel, ModelEvaluationReport
from bcipy.signal.model.pca_rda_kde.pca_rda_kde import PcaRdaKdeModel
from bcipy.signal.model.rda_kde.rda_kde import RdaKdeModel
from bcipy.signal.model.gaussian_mixture.gaussian_mixture import GazeModelIndividual, GazeModelCombined

__all__ = [
"SignalModel",
"PcaRdaKdeModel",
"RdaKdeModel",
"GazeModelIndividual",
"GazeModelCombined",
"ModelEvaluationReport",
]
6 changes: 0 additions & 6 deletions bcipy/signal/model/gaussian_mixture/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +0,0 @@
from .gaussian_mixture import GazeModelCombined, GazeModelIndividual

__all__ = [
"GazeModelCombined",
"GazeModelIndividual",
]
1 change: 0 additions & 1 deletion bcipy/signal/model/gaussian_mixture/gaussian_mixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from sklearn.mixture import GaussianMixture
from bcipy.helpers.stimuli import GazeReshaper
from sklearn.model_selection import cross_val_score # noqa
from sklearn.utils.estimator_checks import check_estimator # noqa
import scipy.stats as stats

from typing import Optional
Expand Down
4 changes: 2 additions & 2 deletions bcipy/signal/model/offline_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
visualize_results_all_symbols)
from bcipy.preferences import preferences
from bcipy.signal.model.base_model import SignalModel, SignalModelMetadata
from bcipy.signal.model.gaussian_mixture import (GazeModelCombined,
GazeModelIndividual)
from bcipy.signal.model.gaussian_mixture.gaussian_mixture import (GazeModelCombined,
GazeModelIndividual)
from bcipy.signal.model.pca_rda_kde import PcaRdaKdeModel
from bcipy.signal.process import (ERPTransformParams, extract_eye_info,
filter_inquiries, get_default_transform)
Expand Down
Loading
Loading