Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logging calls and fix a couple of dask-related issues #450

Merged
merged 7 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions reproject/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import tempfile
import uuid
Expand All @@ -15,10 +16,29 @@
__all__ = ["_reproject_dispatcher"]


class _ArrayContainer:
# When we set up as_delayed_memmap_path, if we pass a dask array to it,
# dask will actually compute the array before we get to the code inside
# as_delayed_memmap_path, so as a workaround we wrap any array we
# pass in using _ArrayContainer to make sure dask doesn't try and be smart.
def __init__(self, array):
self._array = array


@delayed(pure=True)
def as_delayed_memmap_path(array, tmp_dir):

# Extract array from _ArrayContainer
if isinstance(array, _ArrayContainer):
array = array._array
else:
raise TypeError("Expected _ArrayContainer in as_delayed_memmap_path")

Check warning on line 35 in reproject/common.py

View check run for this annotation

Codecov / codecov/patch

reproject/common.py#L35

Added line #L35 was not covered by tests

logger = logging.getLogger(__name__)
if isinstance(array, da.core.Array):
logger.info("Computing input dask array to Numpy memory-mapped array")
array_path, _ = _dask_to_numpy_memmap(array, tmp_dir)
logger.info(f"Numpy memory-mapped array is now at {array_path}")
else:
array_path = os.path.join(tmp_dir, f"{uuid.uuid4()}.npy")
array_memmapped = np.memmap(
Expand Down Expand Up @@ -95,6 +115,8 @@
Whether to return numpy or dask arrays - defaults to 'numpy'.
"""

logger = logging.getLogger(__name__)

if return_type is None:
return_type = "numpy"
elif return_type not in ("numpy", "dask"):
Expand Down Expand Up @@ -138,7 +160,11 @@
)

if isinstance(array_in, da.core.Array):
_, array_in = _dask_to_numpy_memmap(array_in, local_tmp_dir)
logger.info("Computing input dask array to Numpy memory-mapped array")
array_path, array_in = _dask_to_numpy_memmap(array_in, local_tmp_dir)
logger.info(f"Numpy memory-mapped array is now at {array_path}")

logger.info(f"Calling {reproject_func.__name__} in non-dask mode")

try:
return reproject_func(
Expand Down Expand Up @@ -180,7 +206,7 @@
tmp_dir = tempfile.mkdtemp()
else:
tmp_dir = local_tmp_dir
array_in_or_path = as_delayed_memmap_path(array_in, tmp_dir)
array_in_or_path = as_delayed_memmap_path(_ArrayContainer(array_in), tmp_dir)
else:
# Here we could set array_in_or_path to array_in_path if it has
# been set previously, but in synchronous and threaded mode it is
Expand All @@ -190,7 +216,13 @@
array_in_or_path = array_in

def reproject_single_block(a, array_or_path, block_info=None):
if a.ndim == 0 or block_info is None or block_info == []:

if (
a.ndim == 0
or block_info is None
or block_info == []
or (isinstance(block_info, np.ndarray) and block_info.tolist() == [])
):
return np.array([a, a])

# The WCS class from astropy is not thread-safe, see e.g.
Expand Down Expand Up @@ -262,6 +294,8 @@
array_out_dask = da.empty(shape_out)
array_out_dask = array_out_dask.rechunk(block_size_limit=64 * 1024**2, **rechunk_kwargs)

logger.info(f"Setting up output dask array with map_blocks")

result = da.map_blocks(
reproject_single_block,
array_out_dask,
Expand Down Expand Up @@ -297,6 +331,8 @@

zarr_path = os.path.join(local_tmp_dir, f"{uuid.uuid4()}.zarr")

logger.info(f"Computing output array directly to zarr array at {zarr_path}")

if parallel == "current-scheduler":
# Just use whatever is the current active scheduler, which can
# be used for e.g. dask.distributed
Expand All @@ -317,6 +353,8 @@

result = da.from_zarr(zarr_path)

logger.info(f"Copying output zarr array into output Numpy arrays")

if return_footprint:
da.store(
[result[0], result[1]],
Expand Down
43 changes: 43 additions & 0 deletions reproject/mosaicking/coadd.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import tempfile
import uuid
from logging import getLogger

import numpy as np
from astropy.wcs import WCS
Expand Down Expand Up @@ -143,6 +144,8 @@
# up memory usage. We could probably still have references to array
# objects, but we'd just make sure these were memory mapped

logger = getLogger(__name__)

# Validate inputs

if combine_function not in ("mean", "sum", "first", "last", "min", "max"):
Expand Down Expand Up @@ -176,12 +179,19 @@
f"the output shape {shape_out}"
)

logger.info(f"Output mosaic will have shape {shape_out}")

# Define 'on-the-fly' mode: in the case where we don't need to match
# the backgrounds and we are combining with 'mean' or 'sum', we don't
# have to keep track of the intermediate arrays and can just modify
# the output array on-the-fly
on_the_fly = not match_background and combine_function in ("mean", "sum")

on_the_fly_prefix = "Using" if on_the_fly else "Not using"
logger.info(
f"{on_the_fly_prefix} on-the-fly mode for adding individual reprojected images to output array"
)

# Start off by reprojecting individual images to the final projection

if not on_the_fly:
Expand All @@ -190,6 +200,9 @@
with tempfile.TemporaryDirectory(ignore_cleanup_errors=IS_WIN) as local_tmp_dir:

for idata in progress_bar(range(len(input_data))):

logger.info(f"Processing input data {idata + 1} of {len(input_data)}")

# We need to pre-parse the data here since we need to figure out how to
# optimize/minimize the size of each output tile (see below).
array_in, wcs_in = parse_input_data(input_data[idata], hdu_in=hdu_in)
Expand Down Expand Up @@ -238,6 +251,9 @@
break

if skip_data:
logger.info(

Check warning on line 254 in reproject/mosaicking/coadd.py

View check run for this annotation

Codecov / codecov/patch

reproject/mosaicking/coadd.py#L254

Added line #L254 was not covered by tests
"Skipping reprojection as no predicted overlap with final mosaic header"
)
continue

slice_out = tuple([slice(imin, imax) for (imin, imax) in bounds])
Expand Down Expand Up @@ -265,6 +281,10 @@

array_path = os.path.join(local_tmp_dir, f"array_{uuid.uuid4()}.np")

logger.info(

Check warning on line 284 in reproject/mosaicking/coadd.py

View check run for this annotation

Codecov / codecov/patch

reproject/mosaicking/coadd.py#L284

Added line #L284 was not covered by tests
f"Creating memory-mapped array with shape {shape_out_indiv} at {array_path}"
)

array = np.memmap(
array_path,
shape=shape_out_indiv,
Expand All @@ -274,6 +294,10 @@

footprint_path = os.path.join(local_tmp_dir, f"footprint_{uuid.uuid4()}.np")

logger.info(

Check warning on line 297 in reproject/mosaicking/coadd.py

View check run for this annotation

Codecov / codecov/patch

reproject/mosaicking/coadd.py#L297

Added line #L297 was not covered by tests
f"Creating memory-mapped footprint with shape {shape_out_indiv} at {footprint_path}"
)

footprint = np.memmap(
footprint_path,
shape=shape_out_indiv,
Expand All @@ -285,6 +309,8 @@

array = footprint = None

logger.info(f"Calling {reproject_function.__name__} with shape_out={shape_out_indiv}")

array, footprint = reproject_function(
(array_in, wcs_in),
output_projection=wcs_out_indiv,
Expand All @@ -301,6 +327,10 @@

weights_path = os.path.join(local_tmp_dir, f"weights_{uuid.uuid4()}.np")

logger.info(

Check warning on line 330 in reproject/mosaicking/coadd.py

View check run for this annotation

Codecov / codecov/patch

reproject/mosaicking/coadd.py#L330

Added line #L330 was not covered by tests
f"Creating memory-mapped weights with shape {shape_out_indiv} at {weights_path}"
)

weights = np.memmap(
weights_path,
shape=shape_out_indiv,
Expand All @@ -312,6 +342,10 @@

weights = None

logger.info(
f"Calling {reproject_function.__name__} with shape_out={shape_out_indiv} for weights"
)

weights = reproject_function(
(weights_in, wcs_in),
output_projection=wcs_out_indiv,
Expand All @@ -335,6 +369,7 @@

if intermediate_memmap:
# Remove the reference to the memmap before trying to remove the file itself
logger.info(f"Removing memory-mapped weight array")

Check warning on line 372 in reproject/mosaicking/coadd.py

View check run for this annotation

Codecov / codecov/patch

reproject/mosaicking/coadd.py#L372

Added line #L372 was not covered by tests
weights = None
try:
os.remove(weights_path)
Expand All @@ -347,6 +382,7 @@
# output image is empty (due e.g. to no overlap).

if on_the_fly:
logger.info(f"Adding reprojected array to final array")
# By default, values outside of the footprint are set to NaN
# but we set these to 0 here to avoid getting NaNs in the
# means/sums.
Expand All @@ -362,6 +398,7 @@
if intermediate_memmap:
# Remove the references to the memmaps themesleves before
# trying to remove the files thermselves.
logger.info(f"Removing memory-mapped array and footprint arrays")

Check warning on line 401 in reproject/mosaicking/coadd.py

View check run for this annotation

Codecov / codecov/patch

reproject/mosaicking/coadd.py#L401

Added line #L401 was not covered by tests
array = None
footprint = None
try:
Expand All @@ -371,10 +408,12 @@
pass

else:
logger.info(f"Adding reprojected array to list to combine later")
arrays.append(array)

# If requested, try and match the backgrounds.
if match_background and len(arrays) > 1:
logger.info(f"Match backgrounds")
offset_matrix = determine_offset_matrix(arrays)
corrections = solve_corrections_sgd(offset_matrix)
if background_reference:
Expand All @@ -384,6 +423,7 @@

if combine_function in ("mean", "sum"):
if match_background:
logger.info(f"Combining reprojected arrays with function {combine_function}")
# if we're not matching the background, this part has already been done
for array in arrays:
# By default, values outside of the footprint are set to NaN
Expand All @@ -394,10 +434,12 @@
output_footprint[array.view_in_original_array] += array.footprint

if combine_function == "mean":
logger.info(f"Handle normalization of output array")
with np.errstate(invalid="ignore"):
output_array /= output_footprint

elif combine_function in ("first", "last", "min", "max"):
logger.info(f"Combining reprojected arrays with function {combine_function}")
if combine_function == "min":
output_array[...] = np.inf
elif combine_function == "max":
Expand Down Expand Up @@ -433,6 +475,7 @@

# We need to avoid potentially large memory allocation from output == 0 so
# we operate in chunks.
logger.info(f"Resetting invalid pixels to {blank_pixel_value}")
for chunk in iterate_chunks(output_array.shape, max_chunk_size=256 * 1024**2):
output_array[chunk][output_footprint[chunk] == 0] = blank_pixel_value

Expand Down
Loading