diff --git a/examples/digital_fingerprinting/production/morpheus/dfp/utils/dfp_arg_parser.py b/examples/digital_fingerprinting/production/morpheus/dfp/utils/dfp_arg_parser.py index 4b807443ad..2c5dc1bc4d 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp/utils/dfp_arg_parser.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp/utils/dfp_arg_parser.py @@ -46,6 +46,8 @@ def __init__(self, source: str, tracking_uri: str, silence_monitors: bool, + mlflow_experiment_name_formatter: str, + mlflow_model_name_formatter: str, train_users: str = None): self._skip_users = list(skip_user) @@ -65,8 +67,8 @@ def __init__(self, self._time_fields: TimeFields = None self._silence_monitors = silence_monitors - self._model_name_formatter = f"DFP-{source}-" + "{user_id}" - self._experiment_name_formatter = f"dfp/{source}/training/" + "{reg_model_name}" + self._model_name_formatter = mlflow_model_name_formatter + self._experiment_name_formatter = mlflow_experiment_name_formatter @staticmethod def verify_init(func): diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_azure_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp_azure_pipeline.py index 41cccd83e9..fd83d4d327 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp_azure_pipeline.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp_azure_pipeline.py @@ -140,6 +140,14 @@ type=str, default="http://mlflow:5000", help=("The MLflow tracking URI to connect to the tracking backend.")) +@click.option('--mlflow_experiment_name_template', + type=str, + default="dfp/azure/training/{reg_model_name}", + help="The MLflow experiment name template to use when logging experiments. ") +@click.option('--mlflow_model_name_template', + type=str, + default="DFP-azure-{user_id}", + help="The MLflow model name template to use when logging models. ") def run_pipeline(train_users, skip_user: typing.Tuple[str], only_user: typing.Tuple[str], @@ -149,6 +157,8 @@ def run_pipeline(train_users, log_level, sample_rate_s, filter_threshold, + mlflow_experiment_name_template, + mlflow_model_name_template, **kwargs): """Runs the DFP pipeline.""" # To include the generic, we must be training all or generic @@ -311,8 +321,8 @@ def run_pipeline(train_users, # Output is UserMessageMeta -- Cached frame set pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema)) - model_name_formatter = "DFP-azure-{user_id}" - experiment_name_formatter = "dfp/azure/training/{reg_model_name}" + model_name_formatter = mlflow_model_name_template + experiment_name_formatter = mlflow_experiment_name_template if (is_training): # Finally, perform training which will output a model diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py index dd47a8a923..a9d588fae1 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp_duo_pipeline.py @@ -141,6 +141,14 @@ type=str, default="http://mlflow:5000", help=("The MLflow tracking URI to connect to the tracking backend.")) +@click.option('--mlflow_experiment_name_template', + type=str, + default="dfp/duo/training/{reg_model_name}", + help="The MLflow experiment name template to use when logging experiments. ") +@click.option('--mlflow_model_name_template', + type=str, + default="DFP-duo-{user_id}", + help="The MLflow model name template to use when logging models. ") def run_pipeline(train_users, skip_user: typing.Tuple[str], only_user: typing.Tuple[str], @@ -150,6 +158,8 @@ def run_pipeline(train_users, log_level, sample_rate_s, filter_threshold, + mlflow_experiment_name_template, + mlflow_model_name_template, **kwargs): """Runs the DFP pipeline.""" # To include the generic, we must be training all or generic @@ -306,8 +316,8 @@ def run_pipeline(train_users, # Output is UserMessageMeta -- Cached frame set pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema)) - model_name_formatter = "DFP-duo-{user_id}" - experiment_name_formatter = "dfp/duo/training/{reg_model_name}" + model_name_formatter = mlflow_model_name_template + experiment_name_formatter = mlflow_experiment_name_template if (is_training): diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_integrated_training_batch_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp_integrated_training_batch_pipeline.py index 8384a0ebaf..c18da19ee4 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp_integrated_training_batch_pipeline.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp_integrated_training_batch_pipeline.py @@ -101,6 +101,14 @@ type=str, default="http://mlflow:5000", help=("The MLflow tracking URI to connect to the tracking backend.")) +@click.option('--mlflow_experiment_name_template', + type=str, + default="dfp/{source}/training/{reg_model_name}", + help="The MLflow experiment name template to use when logging experiments. ") +@click.option('--mlflow_model_name_template', + type=str, + default="DFP-{source}-{user_id}", + help="The MLflow model name template to use when logging models. ") @click.option("--disable_pre_filtering", is_flag=True, help=("Enabling this option will skip pre-filtering of json messages. " @@ -126,6 +134,8 @@ def run_pipeline(source: str, tracking_uri, silence_monitors, use_cpp, + mlflow_experiment_name_template, + mlflow_model_name_template, **kwargs): if (skip_user and only_user): logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting") @@ -140,6 +150,8 @@ def run_pipeline(source: str, source, tracking_uri, silence_monitors, + mlflow_experiment_name_template, + mlflow_model_name_template, train_users) dfp_arg_parser.init() diff --git a/examples/digital_fingerprinting/production/morpheus/dfp_integrated_training_streaming_pipeline.py b/examples/digital_fingerprinting/production/morpheus/dfp_integrated_training_streaming_pipeline.py index 6374a61f5d..e60792d6d3 100644 --- a/examples/digital_fingerprinting/production/morpheus/dfp_integrated_training_streaming_pipeline.py +++ b/examples/digital_fingerprinting/production/morpheus/dfp_integrated_training_streaming_pipeline.py @@ -101,6 +101,14 @@ type=str, default="http://mlflow:5000", help=("The MLflow tracking URI to connect to the tracking backend.")) +@click.option('--mlflow_experiment_name_template', + type=str, + default="dfp/{source}/training/{reg_model_name}", + help="The MLflow experiment name template to use when logging experiments. ") +@click.option('--mlflow_model_name_template', + type=str, + default="DFP-{source}-{user_id}", + help="The MLflow model name template to use when logging models. ") @click.option('--bootstrap_servers', type=str, default="localhost:9092", @@ -138,6 +146,8 @@ def run_pipeline(source: str, tracking_uri, silence_monitors, use_cpp, + mlflow_experiment_name_template, + mlflow_model_name_template, **kwargs): if (skip_user and only_user): logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting") @@ -152,6 +162,8 @@ def run_pipeline(source: str, source, tracking_uri, silence_monitors, + mlflow_experiment_name_template, + mlflow_model_name_template, train_users) dfp_arg_parser.init() diff --git a/morpheus/controllers/mlflow_model_writer_controller.py b/morpheus/controllers/mlflow_model_writer_controller.py index d373a01dd1..bf935e8751 100644 --- a/morpheus/controllers/mlflow_model_writer_controller.py +++ b/morpheus/controllers/mlflow_model_writer_controller.py @@ -86,6 +86,27 @@ def experiment_name_formatter(self): def databricks_permissions(self): return self._databricks_permissions + def _create_safe_user_id(self, user_id: str): + """ + Creates a safe user ID for use in MLflow model names and experiment names. + + Parameters + ---------- + user_id : str + The user ID. + + Returns + ------- + str + The generated safe user ID. + """ + + safe_user_id = user_id.replace('.', '_dot_') + safe_user_id = safe_user_id.replace('/', '_slash_') + safe_user_id = safe_user_id.replace(':', '_colon_') + + return safe_user_id + def user_id_to_model(self, user_id: str): """ Converts a user ID to an model name @@ -102,7 +123,7 @@ def user_id_to_model(self, user_id: str): """ kwargs = { - "user_id": user_id, + "user_id": self._create_safe_user_id(user_id), "user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(), } @@ -123,9 +144,11 @@ def user_id_to_experiment(self, user_id: str) -> str: The generated experiment name. """ + safe_user_id = self._create_safe_user_id(user_id) + kwargs = { - "user_id": user_id, - "user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(), + "user_id": safe_user_id, + "user_md5": hashlib.md5(safe_user_id.encode('utf-8')).hexdigest(), "reg_model_name": self.user_id_to_model(user_id=user_id) }