From f209f549a7e72e42608cd995c976f9fffc4e108c Mon Sep 17 00:00:00 2001 From: Lukas Nickel Date: Thu, 26 Oct 2023 16:48:33 +0200 Subject: [PATCH] Allow setting n_jobs on tool invocation - It can be useful to set n_jobs indepently from the config file - For the sklearn reconstructors this requires setting n_jobs on every model, that is attached to the reconstructor - Fixes #2307 --- ctapipe/reco/reconstructor.py | 14 ++++++- ctapipe/reco/sklearn.py | 15 ++++++++ ctapipe/reco/tests/test_sklearn.py | 44 ++++++++++++++++++++++ ctapipe/tools/apply_models.py | 9 +++++ ctapipe/tools/train_disp_reconstructor.py | 9 +++++ ctapipe/tools/train_energy_regressor.py | 8 ++++ ctapipe/tools/train_particle_classifier.py | 8 ++++ 7 files changed, 106 insertions(+), 1 deletion(-) diff --git a/ctapipe/reco/reconstructor.py b/ctapipe/reco/reconstructor.py index 28402449ee4..9bacb63a802 100644 --- a/ctapipe/reco/reconstructor.py +++ b/ctapipe/reco/reconstructor.py @@ -70,7 +70,7 @@ class Reconstructor(TelescopeComponent): algorithms should inherit from """ - #: ctapipe_rco entry points may provide Reconstructor implementations + #: ctapipe_reco entry points may provide Reconstructor implementations plugin_entry_point = "ctapipe_reco" def __init__(self, subarray, **kwargs): @@ -141,6 +141,18 @@ def read(cls, path, parent=None, subarray=None, **kwargs): Provenance().add_input_file(path, role="reconstructor") return instance + def set_n_jobs(self, n_jobs): + """ + Set n_jobs if applicable for the reconsructor. + This is not just a traits option of the class, because + for example in the case of the SKLearnReconstructors you need + to set the property of all of the sklearn models for it to be + applied in the fit and predict steps. + """ + self.log.warning( + f"Trying to set n_jobs to {n_jobs}, but the reconstructor does not make any use of it." + ) + class HillasGeometryReconstructor(Reconstructor): """ diff --git a/ctapipe/reco/sklearn.py b/ctapipe/reco/sklearn.py index fec334d9390..7b4d2e953dd 100644 --- a/ctapipe/reco/sklearn.py +++ b/ctapipe/reco/sklearn.py @@ -222,6 +222,13 @@ def fit(self, key, table): y = self._table_to_y(table, mask=valid) self._models[key].fit(X, y) + def set_n_jobs(self, n_jobs): + """ + Update n_jobs of all associated models. + """ + for model in self._models.values(): + setattr(model, "n_jobs", n_jobs) + class SKLearnRegressionReconstructor(SKLearnReconstructor): """ @@ -803,6 +810,14 @@ def predict_table(self, key, table: Table) -> Dict[ReconstructionProperty, Table ReconstructionProperty.GEOMETRY: altaz_result, } + def set_n_jobs(self, n_jobs): + """ + Update n_jobs of all associated models. + """ + for (disp, sign) in self._models.values(): + setattr(disp, "n_jobs", n_jobs) + setattr(sign, "n_jobs", n_jobs) + class CrossValidator(Component): """Class to train sklearn based reconstructors in a cross validation""" diff --git a/ctapipe/reco/tests/test_sklearn.py b/ctapipe/reco/tests/test_sklearn.py index 50b4a016880..a2c0e83e6ca 100644 --- a/ctapipe/reco/tests/test_sklearn.py +++ b/ctapipe/reco/tests/test_sklearn.py @@ -9,6 +9,7 @@ from ctapipe.core import Component from ctapipe.reco import EnergyRegressor, ParticleClassifier from ctapipe.reco.reconstructor import ReconstructionProperty +from ctapipe.reco.sklearn import DispReconstructor KEY = "LST_LST_LSTCam" @@ -166,6 +167,49 @@ def test_regressor_single_event(model_cls, example_table, example_subarray): assert valid[0] == False +def test_set_n_jobs(example_subarray): + config = Config( + { + "EnergyRegressor": { + "model_cls": "RandomForestRegressor", + "model_config": {"n_estimators": 20, "max_depth": 15, "n_jobs": -1}, + } + } + ) + regressor = EnergyRegressor( + example_subarray, + config=config, + ) + + regressor._models["telescope"] = regressor._new_model() + assert regressor._models["telescope"].n_jobs == -1 + regressor.set_n_jobs(42) + assert regressor._models["telescope"].n_jobs == 42 + + # DISP has two models per telescope, check that aswell + config = Config( + { + "DispReconstructor": { + "norm_cls": "RandomForestRegressor", + "norm_config": {"n_estimators": 20, "max_depth": 15, "n_jobs": -1}, + "sign_cls": "RandomForestClassifier", + "sign_config": {"n_estimators": 20, "max_depth": 15, "n_jobs": -1}, + } + } + ) + disp = DispReconstructor( + example_subarray, + config=config, + ) + + disp._models["telescope"] = disp._new_models() + assert disp._models["telescope"][0].n_jobs == -1 + assert disp._models["telescope"][1].n_jobs == -1 + disp.set_n_jobs(42) + assert disp._models["telescope"][0].n_jobs == 42 + assert disp._models["telescope"][1].n_jobs == 42 + + @pytest.mark.parametrize( "model_cls", ["KNeighborsClassifier", "RandomForestClassifier"] ) diff --git a/ctapipe/tools/apply_models.py b/ctapipe/tools/apply_models.py index 2df691d8bf4..764cb0493b0 100644 --- a/ctapipe/tools/apply_models.py +++ b/ctapipe/tools/apply_models.py @@ -68,6 +68,12 @@ class ApplyModels(Tool): help="How many subarray events to load at once for making predictions.", ).tag(config=True) + n_jobs = Integer( + default_value=None, + allow_none=True, + help="Number of threads to use for the reconstruction. This overwrites the values in the config", + ).tag(config=True) + progress_bar = Bool( help="show progress bar during processing", default_value=True, @@ -150,6 +156,9 @@ def setup(self): Reconstructor.read(path, parent=self, subarray=self.loader.subarray) for path in self.reconstructor_paths ] + if self.n_jobs: + for r in self._reconstructors: + r.set_n_jobs(self.n_jobs) def start(self): """Apply models to input tables""" diff --git a/ctapipe/tools/train_disp_reconstructor.py b/ctapipe/tools/train_disp_reconstructor.py index a5a968d226c..30efd15c822 100644 --- a/ctapipe/tools/train_disp_reconstructor.py +++ b/ctapipe/tools/train_disp_reconstructor.py @@ -66,6 +66,12 @@ class TrainDispReconstructor(Tool): default_value=0, help="Random seed for sampling and cross validation" ).tag(config=True) + n_jobs = Int( + default_value=None, + allow_none=True, + help="Number of threads to use for the reconstruction. This overwrites the values in the config", + ).tag(config=True) + project_disp = Bool( default_value=False, help=( @@ -103,6 +109,9 @@ def setup(self): self.n_events.attach_subarray(self.loader.subarray) self.models = DispReconstructor(self.loader.subarray, parent=self) + if self.n_jobs: + self.models.set_n_jobs(self.n_jobs) + self.cross_validate = CrossValidator(parent=self, model_component=self.models) self.rng = np.random.default_rng(self.random_seed) self.check_output(self.output_path, self.cross_validate.output_path) diff --git a/ctapipe/tools/train_energy_regressor.py b/ctapipe/tools/train_energy_regressor.py index 0d210762f1e..f7f636a1e85 100644 --- a/ctapipe/tools/train_energy_regressor.py +++ b/ctapipe/tools/train_energy_regressor.py @@ -63,6 +63,12 @@ class TrainEnergyRegressor(Tool): default_value=0, help="Random seed for sampling and cross validation" ).tag(config=True) + n_jobs = Int( + default_value=None, + allow_none=True, + help="Number of threads to use for the reconstruction. This overwrites the values in the config", + ).tag(config=True) + aliases = { ("i", "input"): "TableLoader.input_url", ("o", "output"): "TrainEnergyRegressor.output_path", @@ -94,6 +100,8 @@ def setup(self): self.n_events.attach_subarray(self.loader.subarray) self.regressor = EnergyRegressor(self.loader.subarray, parent=self) + if self.n_jobs: + self.regressor.set_n_jobs(self.n_jobs) self.cross_validate = CrossValidator( parent=self, model_component=self.regressor ) diff --git a/ctapipe/tools/train_particle_classifier.py b/ctapipe/tools/train_particle_classifier.py index 0002c683d39..337027130d9 100644 --- a/ctapipe/tools/train_particle_classifier.py +++ b/ctapipe/tools/train_particle_classifier.py @@ -92,6 +92,12 @@ class TrainParticleClassifier(Tool): help="Random number seed for sampling and the cross validation splitting", ).tag(config=True) + n_jobs = Int( + default_value=None, + allow_none=True, + help="Number of threads to use for the reconstruction. This overwrites the values in the config", + ).tag(config=True) + aliases = { "signal": "TrainParticleClassifier.input_url_signal", "background": "TrainParticleClassifier.input_url_background", @@ -144,6 +150,8 @@ def setup(self): self.n_background.attach_subarray(self.subarray) self.classifier = ParticleClassifier(subarray=self.subarray, parent=self) + if self.n_jobs: + self.classifier.set_n_jobs(self.n_jobs) self.rng = np.random.default_rng(self.random_seed) self.cross_validate = CrossValidator( parent=self, model_component=self.classifier