Skip to content

Commit

Permalink
Allow setting n_jobs on tool invocation
Browse files Browse the repository at this point in the history
- It can be useful to set n_jobs indepently from the config file
- For the sklearn reconstructors this requires setting n_jobs on every
  model, that is attached to the reconstructor
- Fixes cta-observatory#2307
  • Loading branch information
LukasNickel authored and Tobychev committed Apr 10, 2024
1 parent 9b411c2 commit 3cb6aba
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 1 deletion.
14 changes: 13 additions & 1 deletion ctapipe/reco/reconstructor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class Reconstructor(TelescopeComponent):
algorithms should inherit from
"""

#: ctapipe_rco entry points may provide Reconstructor implementations
#: ctapipe_reco entry points may provide Reconstructor implementations
plugin_entry_point = "ctapipe_reco"

def __init__(self, subarray, **kwargs):
Expand Down Expand Up @@ -146,6 +146,18 @@ def read(cls, path, parent=None, subarray=None, **kwargs):
Provenance().add_input_file(path, role="reconstructor")
return instance

def set_n_jobs(self, n_jobs):
"""
Set n_jobs if applicable for the reconsructor.
This is not just a traits option of the class, because
for example in the case of the SKLearnReconstructors you need
to set the property of all of the sklearn models for it to be
applied in the fit and predict steps.
"""
self.log.warning(
f"Trying to set n_jobs to {n_jobs}, but the reconstructor does not make any use of it."
)


class HillasGeometryReconstructor(Reconstructor):
"""
Expand Down
15 changes: 15 additions & 0 deletions ctapipe/reco/sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,13 @@ def fit(self, key, table):
y = self._table_to_y(table, mask=valid)
self._models[key].fit(X, y)

def set_n_jobs(self, n_jobs):
"""
Update n_jobs of all associated models.
"""
for model in self._models.values():
setattr(model, "n_jobs", n_jobs)


class SKLearnRegressionReconstructor(SKLearnReconstructor):
"""
Expand Down Expand Up @@ -803,6 +810,14 @@ def predict_table(self, key, table: Table) -> Dict[ReconstructionProperty, Table
ReconstructionProperty.GEOMETRY: altaz_result,
}

def set_n_jobs(self, n_jobs):
"""
Update n_jobs of all associated models.
"""
for (disp, sign) in self._models.values():
setattr(disp, "n_jobs", n_jobs)
setattr(sign, "n_jobs", n_jobs)


class CrossValidator(Component):
"""Class to train sklearn based reconstructors in a cross validation"""
Expand Down
44 changes: 44 additions & 0 deletions ctapipe/reco/tests/test_sklearn.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from ctapipe.core import Component
from ctapipe.reco import EnergyRegressor, ParticleClassifier
from ctapipe.reco.reconstructor import ReconstructionProperty
from ctapipe.reco.sklearn import DispReconstructor

KEY = "LST_LST_LSTCam"

Expand Down Expand Up @@ -166,6 +167,49 @@ def test_regressor_single_event(model_cls, example_table, example_subarray):
assert valid[0] == False


def test_set_n_jobs(example_subarray):
config = Config(
{
"EnergyRegressor": {
"model_cls": "RandomForestRegressor",
"model_config": {"n_estimators": 20, "max_depth": 15, "n_jobs": -1},
}
}
)
regressor = EnergyRegressor(
example_subarray,
config=config,
)

regressor._models["telescope"] = regressor._new_model()
assert regressor._models["telescope"].n_jobs == -1
regressor.set_n_jobs(42)
assert regressor._models["telescope"].n_jobs == 42

# DISP has two models per telescope, check that aswell
config = Config(
{
"DispReconstructor": {
"norm_cls": "RandomForestRegressor",
"norm_config": {"n_estimators": 20, "max_depth": 15, "n_jobs": -1},
"sign_cls": "RandomForestClassifier",
"sign_config": {"n_estimators": 20, "max_depth": 15, "n_jobs": -1},
}
}
)
disp = DispReconstructor(
example_subarray,
config=config,
)

disp._models["telescope"] = disp._new_models()
assert disp._models["telescope"][0].n_jobs == -1
assert disp._models["telescope"][1].n_jobs == -1
disp.set_n_jobs(42)
assert disp._models["telescope"][0].n_jobs == 42
assert disp._models["telescope"][1].n_jobs == 42


@pytest.mark.parametrize(
"model_cls", ["KNeighborsClassifier", "RandomForestClassifier"]
)
Expand Down
9 changes: 9 additions & 0 deletions ctapipe/tools/apply_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ class ApplyModels(Tool):
help="How many subarray events to load at once for making predictions.",
).tag(config=True)

n_jobs = Integer(
default_value=None,
allow_none=True,
help="Number of threads to use for the reconstruction. This overwrites the values in the config",
).tag(config=True)

progress_bar = Bool(
help="show progress bar during processing",
default_value=True,
Expand Down Expand Up @@ -144,6 +150,9 @@ def setup(self):
Reconstructor.read(path, parent=self, subarray=self.loader.subarray)
for path in self.reconstructor_paths
]
if self.n_jobs:
for r in self._reconstructors:
r.set_n_jobs(self.n_jobs)

def start(self):
"""Apply models to input tables"""
Expand Down
9 changes: 9 additions & 0 deletions ctapipe/tools/train_disp_reconstructor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ class TrainDispReconstructor(Tool):
default_value=0, help="Random seed for sampling and cross validation"
).tag(config=True)

n_jobs = Int(
default_value=None,
allow_none=True,
help="Number of threads to use for the reconstruction. This overwrites the values in the config",
).tag(config=True)

project_disp = Bool(
default_value=False,
help=(
Expand Down Expand Up @@ -97,6 +103,9 @@ def setup(self):
self.n_events.attach_subarray(self.loader.subarray)

self.models = DispReconstructor(self.loader.subarray, parent=self)
if self.n_jobs:
self.models.set_n_jobs(self.n_jobs)

self.cross_validate = CrossValidator(parent=self, model_component=self.models)
self.rng = np.random.default_rng(self.random_seed)
self.check_output(self.output_path, self.cross_validate.output_path)
Expand Down
8 changes: 8 additions & 0 deletions ctapipe/tools/train_energy_regressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ class TrainEnergyRegressor(Tool):
default_value=0, help="Random seed for sampling and cross validation"
).tag(config=True)

n_jobs = Int(
default_value=None,
allow_none=True,
help="Number of threads to use for the reconstruction. This overwrites the values in the config",
).tag(config=True)

aliases = {
("i", "input"): "TableLoader.input_url",
("o", "output"): "TrainEnergyRegressor.output_path",
Expand All @@ -89,6 +95,8 @@ def setup(self):
self.n_events.attach_subarray(self.loader.subarray)

self.regressor = EnergyRegressor(self.loader.subarray, parent=self)
if self.n_jobs:
self.regressor.set_n_jobs(self.n_jobs)
self.cross_validate = CrossValidator(
parent=self, model_component=self.regressor
)
Expand Down
8 changes: 8 additions & 0 deletions ctapipe/tools/train_particle_classifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ class TrainParticleClassifier(Tool):
help="Random number seed for sampling and the cross validation splitting",
).tag(config=True)

n_jobs = Int(
default_value=None,
allow_none=True,
help="Number of threads to use for the reconstruction. This overwrites the values in the config",
).tag(config=True)

aliases = {
"signal": "TrainParticleClassifier.input_url_signal",
"background": "TrainParticleClassifier.input_url_background",
Expand Down Expand Up @@ -134,6 +140,8 @@ def setup(self):
self.n_background.attach_subarray(self.subarray)

self.classifier = ParticleClassifier(subarray=self.subarray, parent=self)
if self.n_jobs:
self.classifier.set_n_jobs(self.n_jobs)
self.rng = np.random.default_rng(self.random_seed)
self.cross_validate = CrossValidator(
parent=self, model_component=self.classifier
Expand Down

0 comments on commit 3cb6aba

Please sign in to comment.