From 2387058802c9b00fbfae0558c6700cae52a1675c Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Sun, 16 Jun 2024 20:25:58 +0100 Subject: [PATCH 1/7] Added logging calls --- reproject/mosaicking/coadd.py | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/reproject/mosaicking/coadd.py b/reproject/mosaicking/coadd.py index 6058a2d9b..23dba8e00 100644 --- a/reproject/mosaicking/coadd.py +++ b/reproject/mosaicking/coadd.py @@ -3,6 +3,7 @@ import sys import tempfile import uuid +from logging import getLogger import numpy as np from astropy.wcs import WCS @@ -143,6 +144,10 @@ def reproject_and_coadd( # up memory usage. We could probably still have references to array # objects, but we'd just make sure these were memory mapped + print(__name__) + + logger = getLogger(__name__) + # Validate inputs if combine_function not in ("mean", "sum", "first", "last", "min", "max"): @@ -176,12 +181,17 @@ def reproject_and_coadd( f"the output shape {shape_out}" ) + logger.info(f'Output mosaic will have shape {shape_out}') + # Define 'on-the-fly' mode: in the case where we don't need to match # the backgrounds and we are combining with 'mean' or 'sum', we don't # have to keep track of the intermediate arrays and can just modify # the output array on-the-fly on_the_fly = not match_background and combine_function in ("mean", "sum") + on_the_fly_prefix = 'Using' if on_the_fly else 'Not using' + logger.info(f'{on_the_fly_prefix} on-the-fly mode for adding individual reprojected images to output array') + # Start off by reprojecting individual images to the final projection if not on_the_fly: @@ -190,6 +200,9 @@ def reproject_and_coadd( with tempfile.TemporaryDirectory(ignore_cleanup_errors=IS_WIN) as local_tmp_dir: for idata in progress_bar(range(len(input_data))): + + logger.info(f'Processing input data {idata + 1} of {len(input_data)}') + # We need to pre-parse the data here since we need to figure out how to # optimize/minimize the size of each output tile (see below). array_in, wcs_in = parse_input_data(input_data[idata], hdu_in=hdu_in) @@ -238,6 +251,7 @@ def reproject_and_coadd( break if skip_data: + logger.info('Skipping reprojection as no predicted overlap with final mosaic header') continue slice_out = tuple([slice(imin, imax) for (imin, imax) in bounds]) @@ -265,6 +279,8 @@ def reproject_and_coadd( array_path = os.path.join(local_tmp_dir, f"array_{uuid.uuid4()}.np") + logger.info(f'Creating memory-mapped array with shape {shape_out_indiv} at {array_path}') + array = np.memmap( array_path, shape=shape_out_indiv, @@ -274,6 +290,8 @@ def reproject_and_coadd( footprint_path = os.path.join(local_tmp_dir, f"footprint_{uuid.uuid4()}.np") + logger.info(f'Creating memory-mapped footprint with shape {shape_out_indiv} at {footprint_path}') + footprint = np.memmap( footprint_path, shape=shape_out_indiv, @@ -285,6 +303,8 @@ def reproject_and_coadd( array = footprint = None + logger.info(f'Calling {reproject_function} with shape_out={shape_out_indiv}') + array, footprint = reproject_function( (array_in, wcs_in), output_projection=wcs_out_indiv, @@ -301,6 +321,8 @@ def reproject_and_coadd( weights_path = os.path.join(local_tmp_dir, f"weights_{uuid.uuid4()}.np") + logger.info(f'Creating memory-mapped weights with shape {shape_out_indiv} at {weights_path}') + weights = np.memmap( weights_path, shape=shape_out_indiv, @@ -312,6 +334,8 @@ def reproject_and_coadd( weights = None + logger.info(f'Calling {reproject_function} with shape_out={shape_out_indiv} for weights') + weights = reproject_function( (weights_in, wcs_in), output_projection=wcs_out_indiv, @@ -335,6 +359,7 @@ def reproject_and_coadd( if intermediate_memmap: # Remove the reference to the memmap before trying to remove the file itself + logger.info(f'Removing memory-mapped weight array') weights = None try: os.remove(weights_path) @@ -347,6 +372,7 @@ def reproject_and_coadd( # output image is empty (due e.g. to no overlap). if on_the_fly: + logger.info(f'Adding reprojected array to final array') # By default, values outside of the footprint are set to NaN # but we set these to 0 here to avoid getting NaNs in the # means/sums. @@ -362,6 +388,7 @@ def reproject_and_coadd( if intermediate_memmap: # Remove the references to the memmaps themesleves before # trying to remove the files thermselves. + logger.info(f'Removing memory-mapped array and footprint arrays') array = None footprint = None try: @@ -371,10 +398,12 @@ def reproject_and_coadd( pass else: + logger.info(f'Adding reprojected array to list to combine later') arrays.append(array) # If requested, try and match the backgrounds. if match_background and len(arrays) > 1: + logger.info(f'Match backgrounds') offset_matrix = determine_offset_matrix(arrays) corrections = solve_corrections_sgd(offset_matrix) if background_reference: @@ -384,6 +413,7 @@ def reproject_and_coadd( if combine_function in ("mean", "sum"): if match_background: + logger.info(f'Combining reprojected arrays with function {combine_function}') # if we're not matching the background, this part has already been done for array in arrays: # By default, values outside of the footprint are set to NaN @@ -394,10 +424,12 @@ def reproject_and_coadd( output_footprint[array.view_in_original_array] += array.footprint if combine_function == "mean": + logger.info(f'Handle normalization of output array') with np.errstate(invalid="ignore"): output_array /= output_footprint elif combine_function in ("first", "last", "min", "max"): + logger.info(f'Combining reprojected arrays with function {combine_function}') if combine_function == "min": output_array[...] = np.inf elif combine_function == "max": @@ -433,6 +465,7 @@ def reproject_and_coadd( # We need to avoid potentially large memory allocation from output == 0 so # we operate in chunks. + logger.info(f'Resetting invalid pixels to {blank_pixel_value}') for chunk in iterate_chunks(output_array.shape, max_chunk_size=256 * 1024**2): output_array[chunk][output_footprint[chunk] == 0] = blank_pixel_value From 8e582c5313fea6510498d83c48b5256f754091db Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Sun, 16 Jun 2024 20:42:58 +0100 Subject: [PATCH 2/7] More logging calls --- reproject/common.py | 16 +++++++++++ reproject/mosaicking/coadd.py | 52 +++++++++++++++++++++-------------- reproject/utils.py | 1 + 3 files changed, 48 insertions(+), 21 deletions(-) diff --git a/reproject/common.py b/reproject/common.py index 8050910c8..a1fd624ce 100644 --- a/reproject/common.py +++ b/reproject/common.py @@ -1,3 +1,4 @@ +import logging import os import tempfile import uuid @@ -17,8 +18,11 @@ @delayed(pure=True) def as_delayed_memmap_path(array, tmp_dir): + logger = logging.getLogger(__name__) if isinstance(array, da.core.Array): + logger.info("Writing input dask array to Numpy memory-mapped array") array_path, _ = _dask_to_numpy_memmap(array, tmp_dir) + logger.info(f"Numpy memory-mapped array is now at {array_path}") else: array_path = os.path.join(tmp_dir, f"{uuid.uuid4()}.npy") array_memmapped = np.memmap( @@ -95,6 +99,8 @@ def _reproject_dispatcher( Whether to return numpy or dask arrays - defaults to 'numpy'. """ + logger = logging.getLogger(__name__) + if return_type is None: return_type = "numpy" elif return_type not in ("numpy", "dask"): @@ -138,7 +144,11 @@ def _reproject_dispatcher( ) if isinstance(array_in, da.core.Array): + logger.info("Writing input dask array to Numpy memory-mapped array") _, array_in = _dask_to_numpy_memmap(array_in, local_tmp_dir) + logger.info(f"Numpy memory-mapped array is now at {array_path}") + + logger.info(f"Calling {reproject_func.__name__} in non-dask mode") try: return reproject_func( @@ -262,6 +272,8 @@ def reproject_single_block(a, array_or_path, block_info=None): array_out_dask = da.empty(shape_out) array_out_dask = array_out_dask.rechunk(block_size_limit=64 * 1024**2, **rechunk_kwargs) + logger.info(f"Setting out output dask array with map_blocks") + result = da.map_blocks( reproject_single_block, array_out_dask, @@ -297,6 +309,8 @@ def reproject_single_block(a, array_or_path, block_info=None): zarr_path = os.path.join(local_tmp_dir, f"{uuid.uuid4()}.zarr") + logger.info(f"Computing output array directly to zarr array at {zarr_path}") + if parallel == "current-scheduler": # Just use whatever is the current active scheduler, which can # be used for e.g. dask.distributed @@ -317,6 +331,8 @@ def reproject_single_block(a, array_or_path, block_info=None): result = da.from_zarr(zarr_path) + logger.info(f"Copying output zarr array into output Numpy arrays") + if return_footprint: da.store( [result[0], result[1]], diff --git a/reproject/mosaicking/coadd.py b/reproject/mosaicking/coadd.py index 23dba8e00..56ddb209b 100644 --- a/reproject/mosaicking/coadd.py +++ b/reproject/mosaicking/coadd.py @@ -144,8 +144,6 @@ def reproject_and_coadd( # up memory usage. We could probably still have references to array # objects, but we'd just make sure these were memory mapped - print(__name__) - logger = getLogger(__name__) # Validate inputs @@ -181,7 +179,7 @@ def reproject_and_coadd( f"the output shape {shape_out}" ) - logger.info(f'Output mosaic will have shape {shape_out}') + logger.info(f"Output mosaic will have shape {shape_out}") # Define 'on-the-fly' mode: in the case where we don't need to match # the backgrounds and we are combining with 'mean' or 'sum', we don't @@ -189,8 +187,10 @@ def reproject_and_coadd( # the output array on-the-fly on_the_fly = not match_background and combine_function in ("mean", "sum") - on_the_fly_prefix = 'Using' if on_the_fly else 'Not using' - logger.info(f'{on_the_fly_prefix} on-the-fly mode for adding individual reprojected images to output array') + on_the_fly_prefix = "Using" if on_the_fly else "Not using" + logger.info( + f"{on_the_fly_prefix} on-the-fly mode for adding individual reprojected images to output array" + ) # Start off by reprojecting individual images to the final projection @@ -201,7 +201,7 @@ def reproject_and_coadd( for idata in progress_bar(range(len(input_data))): - logger.info(f'Processing input data {idata + 1} of {len(input_data)}') + logger.info(f"Processing input data {idata + 1} of {len(input_data)}") # We need to pre-parse the data here since we need to figure out how to # optimize/minimize the size of each output tile (see below). @@ -251,7 +251,9 @@ def reproject_and_coadd( break if skip_data: - logger.info('Skipping reprojection as no predicted overlap with final mosaic header') + logger.info( + "Skipping reprojection as no predicted overlap with final mosaic header" + ) continue slice_out = tuple([slice(imin, imax) for (imin, imax) in bounds]) @@ -279,7 +281,9 @@ def reproject_and_coadd( array_path = os.path.join(local_tmp_dir, f"array_{uuid.uuid4()}.np") - logger.info(f'Creating memory-mapped array with shape {shape_out_indiv} at {array_path}') + logger.info( + f"Creating memory-mapped array with shape {shape_out_indiv} at {array_path}" + ) array = np.memmap( array_path, @@ -290,7 +294,9 @@ def reproject_and_coadd( footprint_path = os.path.join(local_tmp_dir, f"footprint_{uuid.uuid4()}.np") - logger.info(f'Creating memory-mapped footprint with shape {shape_out_indiv} at {footprint_path}') + logger.info( + f"Creating memory-mapped footprint with shape {shape_out_indiv} at {footprint_path}" + ) footprint = np.memmap( footprint_path, @@ -303,7 +309,7 @@ def reproject_and_coadd( array = footprint = None - logger.info(f'Calling {reproject_function} with shape_out={shape_out_indiv}') + logger.info(f"Calling {reproject_function.__name__} with shape_out={shape_out_indiv}") array, footprint = reproject_function( (array_in, wcs_in), @@ -321,7 +327,9 @@ def reproject_and_coadd( weights_path = os.path.join(local_tmp_dir, f"weights_{uuid.uuid4()}.np") - logger.info(f'Creating memory-mapped weights with shape {shape_out_indiv} at {weights_path}') + logger.info( + f"Creating memory-mapped weights with shape {shape_out_indiv} at {weights_path}" + ) weights = np.memmap( weights_path, @@ -334,7 +342,9 @@ def reproject_and_coadd( weights = None - logger.info(f'Calling {reproject_function} with shape_out={shape_out_indiv} for weights') + logger.info( + f"Calling {reproject_function.__name__} with shape_out={shape_out_indiv} for weights" + ) weights = reproject_function( (weights_in, wcs_in), @@ -359,7 +369,7 @@ def reproject_and_coadd( if intermediate_memmap: # Remove the reference to the memmap before trying to remove the file itself - logger.info(f'Removing memory-mapped weight array') + logger.info(f"Removing memory-mapped weight array") weights = None try: os.remove(weights_path) @@ -372,7 +382,7 @@ def reproject_and_coadd( # output image is empty (due e.g. to no overlap). if on_the_fly: - logger.info(f'Adding reprojected array to final array') + logger.info(f"Adding reprojected array to final array") # By default, values outside of the footprint are set to NaN # but we set these to 0 here to avoid getting NaNs in the # means/sums. @@ -388,7 +398,7 @@ def reproject_and_coadd( if intermediate_memmap: # Remove the references to the memmaps themesleves before # trying to remove the files thermselves. - logger.info(f'Removing memory-mapped array and footprint arrays') + logger.info(f"Removing memory-mapped array and footprint arrays") array = None footprint = None try: @@ -398,12 +408,12 @@ def reproject_and_coadd( pass else: - logger.info(f'Adding reprojected array to list to combine later') + logger.info(f"Adding reprojected array to list to combine later") arrays.append(array) # If requested, try and match the backgrounds. if match_background and len(arrays) > 1: - logger.info(f'Match backgrounds') + logger.info(f"Match backgrounds") offset_matrix = determine_offset_matrix(arrays) corrections = solve_corrections_sgd(offset_matrix) if background_reference: @@ -413,7 +423,7 @@ def reproject_and_coadd( if combine_function in ("mean", "sum"): if match_background: - logger.info(f'Combining reprojected arrays with function {combine_function}') + logger.info(f"Combining reprojected arrays with function {combine_function}") # if we're not matching the background, this part has already been done for array in arrays: # By default, values outside of the footprint are set to NaN @@ -424,12 +434,12 @@ def reproject_and_coadd( output_footprint[array.view_in_original_array] += array.footprint if combine_function == "mean": - logger.info(f'Handle normalization of output array') + logger.info(f"Handle normalization of output array") with np.errstate(invalid="ignore"): output_array /= output_footprint elif combine_function in ("first", "last", "min", "max"): - logger.info(f'Combining reprojected arrays with function {combine_function}') + logger.info(f"Combining reprojected arrays with function {combine_function}") if combine_function == "min": output_array[...] = np.inf elif combine_function == "max": @@ -465,7 +475,7 @@ def reproject_and_coadd( # We need to avoid potentially large memory allocation from output == 0 so # we operate in chunks. - logger.info(f'Resetting invalid pixels to {blank_pixel_value}') + logger.info(f"Resetting invalid pixels to {blank_pixel_value}") for chunk in iterate_chunks(output_array.shape, max_chunk_size=256 * 1024**2): output_array[chunk][output_footprint[chunk] == 0] = blank_pixel_value diff --git a/reproject/utils.py b/reproject/utils.py index b5428857d..4d2a609e9 100644 --- a/reproject/utils.py +++ b/reproject/utils.py @@ -1,3 +1,4 @@ +import logging import os import tempfile import uuid From 5f0be30ca4cbfb73b116f55c4244f4f8c528b922 Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Mon, 17 Jun 2024 10:31:00 +0100 Subject: [PATCH 3/7] Fix condition to detect test call to reproject_single_block, in some cases block_info can be an empty Numpy array --- reproject/common.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/reproject/common.py b/reproject/common.py index a1fd624ce..3d69c3693 100644 --- a/reproject/common.py +++ b/reproject/common.py @@ -200,7 +200,13 @@ def _reproject_dispatcher( array_in_or_path = array_in def reproject_single_block(a, array_or_path, block_info=None): - if a.ndim == 0 or block_info is None or block_info == []: + + if ( + a.ndim == 0 + or block_info is None + or block_info == [] + or (isinstance(block_info, np.ndarray) and block_info.tolist() == []) + ): return np.array([a, a]) # The WCS class from astropy is not thread-safe, see e.g. From 1873b98b849faf67dcfbcd80338b208eaba11131 Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Mon, 17 Jun 2024 10:32:26 +0100 Subject: [PATCH 4/7] Put array in wrapper to avoid having dask compute it at any point --- reproject/common.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/reproject/common.py b/reproject/common.py index 3d69c3693..43e868941 100644 --- a/reproject/common.py +++ b/reproject/common.py @@ -16,11 +16,27 @@ __all__ = ["_reproject_dispatcher"] +class _ArrayContainer: + # When we set up as_delayed_memmap_path, if we pass a dask array to it, + # dask will actually compute the array before we get to the code inside + # as_delayed_memmap_path, so as a workaround we wrap any array we + # pass in using _ArrayContainer to make sure dask doesn't try and be smart. + def __init__(self, array): + self._array = array + + @delayed(pure=True) def as_delayed_memmap_path(array, tmp_dir): + + # Extract array from _ArrayContainer + if isinstance(array, _ArrayContainer): + array = array._array + else: + raise TypeError("Expected _ArrayContainer in as_delayed_memmap_path") + logger = logging.getLogger(__name__) if isinstance(array, da.core.Array): - logger.info("Writing input dask array to Numpy memory-mapped array") + logger.info("Computing input dask array to Numpy memory-mapped array") array_path, _ = _dask_to_numpy_memmap(array, tmp_dir) logger.info(f"Numpy memory-mapped array is now at {array_path}") else: @@ -190,7 +206,7 @@ def _reproject_dispatcher( tmp_dir = tempfile.mkdtemp() else: tmp_dir = local_tmp_dir - array_in_or_path = as_delayed_memmap_path(array_in, tmp_dir) + array_in_or_path = as_delayed_memmap_path(_ArrayContainer(array_in), tmp_dir) else: # Here we could set array_in_or_path to array_in_path if it has # been set previously, but in synchronous and threaded mode it is From 0860f9fd081f57b665580ec443e47fcc3403d9da Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Mon, 17 Jun 2024 10:57:49 +0100 Subject: [PATCH 5/7] Fix test --- reproject/common.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reproject/common.py b/reproject/common.py index 43e868941..74d7fc8a5 100644 --- a/reproject/common.py +++ b/reproject/common.py @@ -160,8 +160,8 @@ def _reproject_dispatcher( ) if isinstance(array_in, da.core.Array): - logger.info("Writing input dask array to Numpy memory-mapped array") - _, array_in = _dask_to_numpy_memmap(array_in, local_tmp_dir) + logger.info("Computing input dask array to Numpy memory-mapped array") + array_path, array_in = _dask_to_numpy_memmap(array_in, local_tmp_dir) logger.info(f"Numpy memory-mapped array is now at {array_path}") logger.info(f"Calling {reproject_func.__name__} in non-dask mode") From f264e23e46e90e5dfeabdc63d45dc58173d5a4e4 Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Wed, 26 Jun 2024 15:18:40 +0100 Subject: [PATCH 6/7] Fix typo --- reproject/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reproject/common.py b/reproject/common.py index 74d7fc8a5..e9de813ac 100644 --- a/reproject/common.py +++ b/reproject/common.py @@ -294,7 +294,7 @@ def reproject_single_block(a, array_or_path, block_info=None): array_out_dask = da.empty(shape_out) array_out_dask = array_out_dask.rechunk(block_size_limit=64 * 1024**2, **rechunk_kwargs) - logger.info(f"Setting out output dask array with map_blocks") + logger.info(f"Setting up output dask array with map_blocks") result = da.map_blocks( reproject_single_block, From 2ab5757648b2f235d675b92540b81b45334d0e5e Mon Sep 17 00:00:00 2001 From: Thomas Robitaille Date: Wed, 26 Jun 2024 15:19:11 +0100 Subject: [PATCH 7/7] Removed unused import --- reproject/utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/reproject/utils.py b/reproject/utils.py index 4d2a609e9..b5428857d 100644 --- a/reproject/utils.py +++ b/reproject/utils.py @@ -1,4 +1,3 @@ -import logging import os import tempfile import uuid