Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-43077: Convert all tasks to use CalibrateImageTask outputs #979

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
100 changes: 93 additions & 7 deletions python/lsst/pipe/tasks/finalizeCharacterization.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
'FinalizeCharacterizationConfig',
'FinalizeCharacterizationTask']

import logging

import numpy as np
import esutil
import pandas as pd
Expand All @@ -43,25 +45,28 @@
from .reserveIsolatedStars import ReserveIsolatedStarsTask


_LOG = logging.getLogger(__name__)


class FinalizeCharacterizationConnections(pipeBase.PipelineTaskConnections,
dimensions=('instrument', 'visit',),
defaultTemplates={}):
src_schema = pipeBase.connectionTypes.InitInput(
doc='Input schema used for src catalogs.',
name='src_schema',
name='initial_stars_schema',
storageClass='SourceCatalog',
)
srcs = pipeBase.connectionTypes.Input(
doc='Source catalogs for the visit',
name='src',
name='initial_stars_footprints_detector',
storageClass='SourceCatalog',
dimensions=('instrument', 'visit', 'detector'),
deferLoad=True,
multiple=True,
)
calexps = pipeBase.connectionTypes.Input(
doc='Calexps for the visit',
name='calexp',
name='initial_pvi',
storageClass='ExposureF',
dimensions=('instrument', 'visit', 'detector'),
deferLoad=True,
Expand All @@ -85,6 +90,17 @@ class FinalizeCharacterizationConnections(pipeBase.PipelineTaskConnections,
deferLoad=True,
multiple=True,
)
initial_photo_calibs = pipeBase.connectionTypes.Input(
doc=("Initial photometric calibration that was already applied to "
"calexps, to be removed prior to measurement in order to recover "
"instrumental fluxes."),
name="initial_photoCalib_detector",
storageClass="PhotoCalib",
dimensions=("instrument", "visit", "detector"),
multiple=True,
deferLoad=True,
minimum=0,
)
finalized_psf_ap_corr_cat = pipeBase.connectionTypes.Output(
doc=('Per-visit finalized psf models and aperture corrections. This '
'catalog uses detector id for the id and are sorted for fast '
Expand All @@ -100,6 +116,28 @@ class FinalizeCharacterizationConnections(pipeBase.PipelineTaskConnections,
dimensions=('instrument', 'visit'),
)

def adjustQuantum(self, inputs, outputs, label, data_id):
if self.config.remove_initial_photo_calib and not inputs["initial_photo_calibs"]:
_LOG.warning(
"Dropping %s quantum %s because initial photo calibs are needed and none were present "
"this may be an upstream partial-outputs error covering an entire visit (which is why this "
"is not an error), but it may mean that 'config.remove_initial_photo_calib' should be "
"False.",
label,
data_id,
)
raise pipeBase.NoWorkFound("No initial photo calibs.")
elif not self.config.remove_initial_photo_calib and inputs["initial_photo_calibs"]:
_LOG.warning(
"For %s quantum %s, input collections have initial photo calib datasets but "
"'config.remove_initial_photo_calib=False'. This is either a very unusual collection "
"search path or (more likely) a bad configuration. Not that this config option should "
"be true when using images produced by CalibrateImageTask.",
label,
data_id,
)
return super().adjustQuantum(inputs, outputs, label, data_id)


class FinalizeCharacterizationConfig(pipeBase.PipelineTaskConfig,
pipelineConnections=FinalizeCharacterizationConnections):
Expand All @@ -113,6 +151,12 @@ class FinalizeCharacterizationConfig(pipeBase.PipelineTaskConfig,
dtype=str,
default='sourceId',
)
remove_initial_photo_calib = pexConfig.Field(
doc=("Expect an initial photo calib input to be present, and use it ",
"to restore the image to instrumental units."),
dtype=bool,
default=True,
)
reserve_selection = pexConfig.ConfigurableField(
target=ReserveIsolatedStarsTask,
doc='Task to select reserved stars',
Expand Down Expand Up @@ -269,6 +313,8 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
for handle in input_handle_dict['srcs']}
calexp_dict_temp = {handle.dataId['detector']: handle
for handle in input_handle_dict['calexps']}
initial_photo_calib_dict_temp = {handle.dataId['detector']: handle
for handle in input_handle_dict['initial_photo_calibs']}
isolated_star_cat_dict_temp = {handle.dataId['tract']: handle
for handle in input_handle_dict['isolated_star_cats']}
isolated_star_source_dict_temp = {handle.dataId['tract']: handle
Expand All @@ -279,6 +325,8 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
detector in sorted(src_dict_temp.keys())}
calexp_dict = {detector: calexp_dict_temp[detector] for
detector in sorted(calexp_dict_temp.keys())}
initial_photo_calib_dict = {detector: initial_photo_calib_dict_temp[detector]
for detector in sorted(initial_photo_calib_dict_temp.keys())}
isolated_star_cat_dict = {tract: isolated_star_cat_dict_temp[tract] for
tract in sorted(isolated_star_cat_dict_temp.keys())}
isolated_star_source_dict = {tract: isolated_star_source_dict_temp[tract] for
Expand All @@ -289,14 +337,24 @@ def runQuantum(self, butlerQC, inputRefs, outputRefs):
isolated_star_cat_dict,
isolated_star_source_dict,
src_dict,
calexp_dict)
calexp_dict,
initial_photo_calib_dict)

butlerQC.put(struct.psf_ap_corr_cat,
outputRefs.finalized_psf_ap_corr_cat)
butlerQC.put(pd.DataFrame(struct.output_table),
outputRefs.finalized_src_table)

def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, src_dict, calexp_dict):
def run(
self,
visit,
band,
isolated_star_cat_dict,
isolated_star_source_dict,
src_dict,
calexp_dict,
initial_photo_calib_dict,
):
"""
Run the FinalizeCharacterizationTask.

Expand All @@ -314,6 +372,8 @@ def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, sr
Per-detector dict of src catalog handles.
calexp_dict : `dict`
Per-detector dict of calibrated exposure handles.
initial_photo_calib_dict : `dict`
Per-detector dict of initial photometric calibration handles

Returns
-------
Expand Down Expand Up @@ -349,13 +409,18 @@ def run(self, visit, band, isolated_star_cat_dict, isolated_star_source_dict, sr
for detector in src_dict:
src = src_dict[detector].get()
exposure = calexp_dict[detector].get()
if detector in initial_photo_calib_dict:
initial_photo_calib = initial_photo_calib_dict[detector].get()
else:
initial_photo_calib = None

psf, ap_corr_map, measured_src = self.compute_psf_and_ap_corr_map(
visit,
detector,
exposure,
src,
isolated_source_table
isolated_source_table,
initial_photo_calib
)

# And now we package it together...
Expand Down Expand Up @@ -591,7 +656,15 @@ def concat_isolated_star_cats(self, band, isolated_star_cat_dict, isolated_star_

return isolated_table, isolated_source_table

def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_source_table):
def compute_psf_and_ap_corr_map(
self,
visit,
detector,
exposure,
src,
isolated_source_table,
initial_photo_calib,
):
"""Compute psf model and aperture correction map for a single exposure.

Parameters
Expand All @@ -603,6 +676,8 @@ def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_s
exposure : `lsst.afw.image.ExposureF`
src : `lsst.afw.table.SourceCatalog`
isolated_source_table : `np.ndarray`
initial_photo_calib : `lsst.afw.image.PhotoCalib` or `None`
Initial photometric calibration to remove from the image.

Returns
-------
Expand All @@ -613,6 +688,17 @@ def compute_psf_and_ap_corr_map(self, visit, detector, exposure, src, isolated_s
measured_src : `lsst.afw.table.SourceCatalog`
Updated source catalog with measurements, flags and aperture corrections.
"""
if self.config.remove_initial_photo_calib:
if initial_photo_calib is None:
self.log.warning("No initial photo calib found for visit %d, detector %d", visit, detector)
return None, None, None
if not initial_photo_calib._isConstant:
# TODO DM-46720: remove this limitation and usage of private (why?!) property.
raise NotImplementedError(
"removeInitialPhotoCalib=True can only work when the initialPhotoCalib is constant."
)
exposure.maskedImage /= initial_photo_calib.getCalibrationMean()

# Extract footprints from the input src catalog for noise replacement.
footprints = SingleFrameMeasurementTask.getFootprintsFromCatalog(src)

Expand Down
Loading
Loading