From 78e6375e87d4b0c04c86521ccc9b387d9f65e19c Mon Sep 17 00:00:00 2001 From: Scott Staniewicz Date: Sat, 14 Sep 2024 21:51:37 -0400 Subject: [PATCH] Add logic for manually specifying reference dates mid-stack (#334) * use the `reference_idx` from each `ministack`, dont hardcode to 0 * remove commented `stack.py` code * Add a `reference_idx` arg to `compress` * update `MiniStackPlanner` to accept manual reference indexes * start passing through logic to account for manual reference changeovers * fix num parallel to respect the file length * get the linked phase part working with manual references * move the reference date to be an output option * get basic single-reference changeover working for interferograms * specify naive datetime * add a test for ref update workflow * Setup test to check names of output ifgs * fix pre-commit/test err * Add an `output_reference_idx` to `PhaseLinkingOptions` This accounts for the fact that we may pass in several compressed SLCs for a single run, and we wish to make the phase linking outputs referenced to something besides the first SLC. For example, if we had 1_1_3 1_4_6 7_7_9 10 11 12 And we wanted to output interferograms relative to `7`, we need to specify that to the phase linking functions; otherwise, it will output things relative to day 1. Since we dont write out rasters corresponding to compressed SLC inputs, we'd have nothing from day 7 to re-refrence later. Note that an alternative might be to always write out rasters for every input in a ministack, including the compressed inputs. The downside is that during a normal local Sequential run with multiple ministacks, you write out redundant rasters and need to remove/select the right ones afterward. But if that way seems cleaner later, we could revert this commit. * Make 2 ministack idxs- one for `compressed_reference`, one for `output_reference` * separate ref/compressed idx in `.plan()`, use new api for `run_wrapped_phase_sequential` * fix tests, make `single` also use `vrt_stack` instead of filename * fix tests for `stack.py` * let only 1 compressed reference idx be passed to `.plan` * clean up commented, pass the `output_reference_index` to `run_phase_linking` in `single` --- .pre-commit-config.yaml | 1 + src/dolphin/phase_link/_compress.py | 15 +++- src/dolphin/stack.py | 99 ++++++++++---------- src/dolphin/workflows/config/_common.py | 25 +++++- src/dolphin/workflows/displacement.py | 6 +- src/dolphin/workflows/sequential.py | 63 +++++++++---- src/dolphin/workflows/single.py | 34 ++++--- src/dolphin/workflows/wrapped_phase.py | 114 +++++++++++++++++------- tests/test_stack.py | 27 +++++- tests/test_workflows_displacement.py | 44 ++++++++- tests/test_workflows_sequential.py | 33 ++----- tests/test_workflows_single.py | 3 +- 12 files changed, 308 insertions(+), 156 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f31ec74e..15051596 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,6 +23,7 @@ repos: rev: "24.8.0" hooks: - id: black + args: [--preview] - repo: https://github.com/astral-sh/ruff-pre-commit rev: v0.6.4 diff --git a/src/dolphin/phase_link/_compress.py b/src/dolphin/phase_link/_compress.py index 472aa17a..e020ca80 100644 --- a/src/dolphin/phase_link/_compress.py +++ b/src/dolphin/phase_link/_compress.py @@ -7,7 +7,10 @@ def compress( - slc_stack: ArrayLike, pl_cpx_phase: ArrayLike, slc_mean: ArrayLike | None = None + slc_stack: ArrayLike, + pl_cpx_phase: ArrayLike, + slc_mean: ArrayLike | None = None, + reference_idx: int | None = None, ): """Compress the stack of SLC data using the estimated phase. @@ -21,6 +24,10 @@ def compress( slc_mean : ArrayLike, optional The mean SLC magnitude, shape (rows, cols), to use as output pixel magnitudes. If None, the mean is computed from the input SLC stack. + reference_idx : int, optional + If provided, the `pl_cpx_phase` will be re-referenced to `reference_idx` before + performing the dot product. + Default is `None`, which keeps `pl_cpx_phase` as provided. Returns ------- @@ -28,9 +35,13 @@ def compress( The compressed SLC data, shape (rows, cols) """ + if reference_idx is not None: + pl_referenced = pl_cpx_phase * pl_cpx_phase[reference_idx][None, :, :].conj() + else: + pl_referenced = pl_cpx_phase # If the output is downsampled, we need to make `pl_cpx_phase` the same shape # as the output - pl_estimate_upsampled = upsample_nearest(pl_cpx_phase, slc_stack.shape[1:]) + pl_estimate_upsampled = upsample_nearest(pl_referenced, slc_stack.shape[1:]) # For each pixel, project the SLCs onto the (normalized) estimated phase # by performing a pixel-wise complex dot product with warnings.catch_warnings(): diff --git a/src/dolphin/stack.py b/src/dolphin/stack.py index 139acbf6..273a7c91 100755 --- a/src/dolphin/stack.py +++ b/src/dolphin/stack.py @@ -49,10 +49,6 @@ class BaseStack(BaseModel): Path(), description="Folder/location where ministack will write outputs to.", ) - reference_idx: int = Field( - 0, - description="Index of the SLC to use as reference during phase linking", - ) model_config = { # For the `Filename, so it can handle the `GeneralPath` protocol` @@ -84,33 +80,6 @@ def _check_lengths(self): raise ValueError(msg) return self - # @model_validator(mode="after") - # def _check_unset_reference_date(self): - # if self.reference_date == _DUMMY_DATE: - # ref_date = self.dates[self.reference_idx][0] - # logger.debug("No reference date provided, using first date: %s", ref_date) - # self.reference_date = ref_date - # return self - @property - def reference_date(self): - """Date of the reference phase of the stack.""" - # Note this works for either a length-1 tuple (real SLC), or for - # the compressed SLC formate (ref, start, end) - return self.dates[self.reference_idx][0] - - @property - def full_date_range(self) -> tuple[DateOrDatetime, DateOrDatetime]: - """Full date range of all SLCs in the ministack.""" - return (self.reference_date, self.dates[-1][-1]) - - @property - def full_date_range_str(self) -> str: - """Full date range of the ministack as a string, e.g. '20210101_20210202'. - - Includes both compressed + normal SLCs in the range. - """ - return format_dates(*self.full_date_range, fmt=self.file_date_fmt) - @property def first_real_slc_idx(self) -> int: """Index of the first real SLC in the ministack.""" @@ -151,10 +120,8 @@ def __rich_repr__(self): yield "file_list", self.file_list yield "dates", self.dates yield "is_compressed", self.is_compressed - yield "reference_date", self.reference_date yield "file_date_fmt", self.file_date_fmt yield "output_folder", self.output_folder - yield "reference_idx", self.reference_idx class CompressedSlcInfo(BaseModel): @@ -363,6 +330,32 @@ class MiniStackInfo(BaseStack): Used for planning the processing of a batch of SLCs. """ + output_reference_idx: int = Field( + 0, + description="Index of the SLC to use as reference during phase linking", + ) + compressed_reference_idx: int = Field( + 0, + description=( + "Index of the SLC to use as during compressed SLC creation. May be" + " different than `output_reference_idx`." + ), + ) + + @property + def output_reference_date(self): + """Date of the reference phase of the stack.""" + # Note this works for either a length-1 tuple (real SLC), or for + # the compressed SLC formate (ref, start, end) + return self.dates[self.output_reference_idx][0] + + @property + def compressed_reference_date(self): + """Date of the reference phase of the stack.""" + # Note this works for either a length-1 tuple (real SLC), or for + # the compressed SLC formate (ref, start, end) + return self.dates[self.compressed_reference_idx][0] + def get_compressed_slc_info(self) -> CompressedSlcInfo: """Get the compressed SLC which will come from this ministack. @@ -379,7 +372,7 @@ def get_compressed_slc_info(self) -> CompressedSlcInfo: real_slc_dates.append(d) return CompressedSlcInfo( - reference_date=self.reference_date, + reference_date=self.compressed_reference_date, start_date=real_slc_dates[0][0], end_date=real_slc_dates[-1][0], real_slc_file_list=real_slc_files, @@ -394,21 +387,33 @@ class MiniStackPlanner(BaseStack): """Class for planning the processing of batches of SLCs.""" max_num_compressed: int = 5 + output_reference_idx: int = Field( + 0, + description="Index of the SLC to use as reference during phase linking", + ) - def plan(self, ministack_size: int) -> list[MiniStackInfo]: + def plan( + self, ministack_size: int, compressed_idx: int | None = None + ) -> list[MiniStackInfo]: """Create a list of ministacks to be processed.""" if ministack_size < 2: msg = "Cannot create ministacks with size < 2" raise ValueError(msg) + # For now, only allow `compressed_idx` when doing a single batch. + # The logic is more complicated for multiple `compressed_idx`s, and + # it's unclear who would need that + if compressed_idx is not None and ministack_size < len(self.file_list): + raise ValueError( + "Cannot set `compressed_idx` when creating multiple ministacks." + ) + output_ministacks: list[MiniStackInfo] = [] # Start of with any compressed SLCs that are passed in compressed_slc_infos: list[CompressedSlcInfo] = [] for f in self.compressed_slc_file_list: # TODO: will we ever actually need to read the old metadata here? - # # Note: these must actually exist to be used! - # compressed_slc_infos.append(CompressedSlcInfo.from_file_metadata(f)) compressed_slc_infos.append(CompressedSlcInfo.from_filename(f)) # Solve each ministack using current chunk (and the previous compressed SLCs) @@ -436,27 +441,27 @@ def plan(self, ministack_size: int) -> list[MiniStackInfo]: combined_is_compressed = num_ccslc * [True] + list( self.is_compressed[cur_slice] ) - # If there are any compressed SLCs, set the reference to the last one - try: - last_compressed_idx = np.where(combined_is_compressed)[0] - reference_idx = last_compressed_idx[-1] - except IndexError: - reference_idx = 0 # Make the current ministack output folder using the start/end dates new_date_str = format_dates( cur_dates[0][0], cur_dates[-1][-1], fmt=self.file_date_fmt ) cur_output_folder = self.output_folder / new_date_str + + if compressed_idx is None: + # TODO: To change to Ansari-style ministack references, change this + # so that a new `output_reference_idx` gets made + compressed_reference_idx = self.output_reference_idx + else: + compressed_reference_idx = compressed_idx + cur_ministack = MiniStackInfo( file_list=combined_files, dates=combined_dates, is_compressed=combined_is_compressed, - reference_idx=reference_idx, + output_reference_idx=self.output_reference_idx, + compressed_reference_idx=compressed_reference_idx, output_folder=cur_output_folder, - reference_date=self.reference_date, - # TODO: we'll need to alter logic here if we dont fix - # reference_idx=0, since this will change the reference date ) output_ministacks.append(cur_ministack) diff --git a/src/dolphin/workflows/config/_common.py b/src/dolphin/workflows/config/_common.py index 1f9e0c71..ae6156d0 100644 --- a/src/dolphin/workflows/config/_common.py +++ b/src/dolphin/workflows/config/_common.py @@ -10,6 +10,7 @@ BaseModel, ConfigDict, Field, + NaiveDatetime, PrivateAttr, field_validator, model_validator, @@ -75,14 +76,21 @@ class PhaseLinkingOptions(BaseModel, extra="forbid"): 10, description="Size of the ministack for sequential estimator.", gt=1 ) max_num_compressed: int = Field( - 5, + 100, description=( "Maximum number of compressed images to use in sequential estimator." " If there are more ministacks than this, the earliest CCSLCs will be" - " left out of the later stacks." + " left out of the later stacks. " ), gt=0, ) + output_reference_idx: int = Field( + 0, + description=( + "Index of input SLC to use for making phase linked interferograms after" + " EVD/EMI." + ), + ) half_window: HalfWindow = HalfWindow() use_evd: bool = Field( False, description="Use EVD on the coherence instead of using the EMI algorithm" @@ -155,7 +163,6 @@ class InterferogramNetwork(BaseModel, extra="forbid"): ), ) - # validation @model_validator(mode="after") def _check_zero_parameters(self) -> InterferogramNetwork: ref_idx = self.reference_idx @@ -318,6 +325,18 @@ class OutputOptions(BaseModel, extra="forbid"): [4, 8, 16, 32, 64], description="List of overview levels to create (if `add_overviews=True`).", ) + # Note: we use NaiveDatetime, since other datetime parsing results in Naive + # (no TzInfo) datetimes, which can't be compared to datetimes with timezones + extra_reference_date: Optional[NaiveDatetime] = Field( + None, + description=( + "Specify an extra reference datetime in UTC. Adding this lets you" + " to create and unwrap two single reference networks; the later resets at" + " the given date (e.g. for a large earthquake event). If passing strings," + " formats accepted are YYYY-MM-DD[T]HH:MM[:SS[.ffffff]][Z or [±]HH[:]MM]," + " or YYYY-MM-DD" + ), + ) # validators @field_validator("bounds", mode="after") diff --git a/src/dolphin/workflows/displacement.py b/src/dolphin/workflows/displacement.py index 50a40158..baed680c 100755 --- a/src/dolphin/workflows/displacement.py +++ b/src/dolphin/workflows/displacement.py @@ -142,10 +142,10 @@ def run( comp_slc_dict: dict[str, list[Path]] = {} # Now for each burst, run the wrapped phase estimation # Try running several bursts in parallel... + # Use the Dummy one if not going parallel, as debugging is much simpler + num_parallel = min(cfg.worker_settings.n_parallel_bursts, len(grouped_slc_files)) Executor = ( - ProcessPoolExecutor - if cfg.worker_settings.n_parallel_bursts > 1 - else utils.DummyProcessPoolExecutor + ProcessPoolExecutor if num_parallel > 1 else utils.DummyProcessPoolExecutor ) mw = cfg.worker_settings.n_parallel_bursts ctx = mp.get_context("spawn") diff --git a/src/dolphin/workflows/sequential.py b/src/dolphin/workflows/sequential.py index b428e2d5..24c17e87 100644 --- a/src/dolphin/workflows/sequential.py +++ b/src/dolphin/workflows/sequential.py @@ -5,12 +5,14 @@ from __future__ import annotations +import datetime import logging from itertools import chain from os import fspath from pathlib import Path -from typing import Optional +from typing import Optional, Sequence +from opera_utils import get_dates from osgeo_utils import gdal_calc from dolphin import io @@ -28,8 +30,8 @@ def run_wrapped_phase_sequential( *, - slc_vrt_file: Filename, - ministack_planner: MiniStackPlanner, + slc_vrt_stack: VRTStack, + output_folder: Path, ministack_size: int, half_window: dict, strides: Optional[dict] = None, @@ -42,6 +44,10 @@ def run_wrapped_phase_sequential( shp_nslc: Optional[int] = None, use_evd: bool = False, beta: float = 0.00, + max_num_compressed: int = 100, + output_reference_idx: int = 0, + new_compressed_reference_idx: int | None = None, + cslc_date_fmt: str = "%Y%m%d", block_shape: tuple[int, int] = (512, 512), baseline_lag: Optional[int] = None, **tqdm_kwargs, @@ -49,17 +55,31 @@ def run_wrapped_phase_sequential( """Estimate wrapped phase using batches of ministacks.""" if strides is None: strides = {"x": 1, "y": 1} - output_folder = ministack_planner.output_folder output_folder.mkdir(parents=True, exist_ok=True) - ministacks = ministack_planner.plan(ministack_size) + input_file_list = slc_vrt_stack.file_list + + is_compressed = ["compressed" in str(f).lower() for f in slc_vrt_stack.file_list] + input_dates = _get_input_dates(input_file_list, is_compressed, cslc_date_fmt) + + ministack_planner = MiniStackPlanner( + file_list=slc_vrt_stack.file_list, + dates=input_dates, + is_compressed=is_compressed, + output_folder=output_folder, + max_num_compressed=max_num_compressed, + output_reference_idx=output_reference_idx, + ) + ministacks = ministack_planner.plan( + ministack_size, compressed_idx=new_compressed_reference_idx + ) - v_all = VRTStack.from_vrt_file(slc_vrt_file) - logger.info(f"Full file range: {v_all.file_list[0]} to {v_all.file_list[-1]}") + logger.info(f"File range start: {Path(slc_vrt_stack.file_list[0]).name}") + logger.info(f"File range end: {Path(slc_vrt_stack.file_list[-1]).name}") logger.info(f"Output folder: {output_folder}") logger.info(f"Number of ministacks of size {ministack_size}: {len(ministacks)}") if shp_nslc is None: - shp_nslc = v_all.shape[0] + shp_nslc = slc_vrt_stack.shape[0] # list where each item is [output_slc_files] from a ministack output_slc_files: list[list] = [] @@ -87,21 +107,15 @@ def already_processed(d: Path, search_ext: str = ".tif") -> bool: cur_files, outfile=output_folder / f"{start_end}.vrt", sort_files=False, - subdataset=v_all.subdataset, + subdataset=slc_vrt_stack.subdataset, ) - # Currently: we are always using the first SLC as the reference, - # even if this is a compressed SLC. - # Will need to change this if we want to accommodate the original - # Sequential Estimator+Datum Adjustment method. - reference_idx = 0 run_wrapped_phase_single( - slc_vrt_file=cur_vrt, + vrt_stack=cur_vrt, ministack=ministack, output_folder=cur_output_folder, half_window=half_window, strides=strides, - reference_idx=reference_idx, use_evd=use_evd, beta=beta, mask_file=mask_file, @@ -181,3 +195,20 @@ def _average_rasters(file_list: list[Path], outfile: Path, output_type: str): A=file_list, calc="numpy.nanmean(A, axis=0)", ) + + +def _get_input_dates( + input_file_list: Sequence[Filename], is_compressed: Sequence[bool], date_fmt: str +) -> list[list[datetime.datetime]]: + input_dates = [get_dates(f, fmt=date_fmt) for f in input_file_list] + # For any that aren't compressed, take the first date. + # this is because the official product name of OPERA/Sentinel1 has both + # "acquisition_date" ... "generation_date" in the filename + # For compressed, we want the first 3 dates: (base phase, start, end) + # TODO: this is a bit hacky, perhaps we can make this some input option + # so that the user can specify how to get dates from their files (or even + # directly pass in dates?) + return [ + dates[:1] if not is_comp else dates[:3] + for dates, is_comp in zip(input_dates, is_compressed) + ] diff --git a/src/dolphin/workflows/single.py b/src/dolphin/workflows/single.py index 75e7eb69..01bf89dd 100644 --- a/src/dolphin/workflows/single.py +++ b/src/dolphin/workflows/single.py @@ -38,12 +38,11 @@ class OutputFile: @atomic_output(output_arg="output_folder", is_dir=True) def run_wrapped_phase_single( *, - slc_vrt_file: Filename, + vrt_stack: VRTStack, ministack: MiniStackInfo, output_folder: Filename, half_window: dict, strides: Optional[dict] = None, - reference_idx: int = 0, beta: float = 0.00, use_evd: bool = False, mask_file: Optional[Filename] = None, @@ -61,24 +60,24 @@ def run_wrapped_phase_single( Output files will all be placed in the provided `output_folder`. """ - # TODO: extract common stuff between here and sequential if strides is None: strides = {"x": 1, "y": 1} + # TODO: extract common stuff between here and sequential strides_tup = Strides(y=strides["y"], x=strides["x"]) half_window_tup = HalfWindow(y=half_window["y"], x=half_window["x"]) output_folder = Path(output_folder) - vrt = VRTStack.from_vrt_file(slc_vrt_file) input_slc_files = ministack.file_list - assert len(input_slc_files) == vrt.shape[0] + if len(input_slc_files) != vrt_stack.shape[0]: + raise ValueError(f"{len(ministack.file_list) = }, but {vrt_stack.shape = }") # If we are using a different number of SLCs for the amplitude data, # we should note that for the SHP finding algorithms if shp_nslc is None: shp_nslc = len(input_slc_files) - logger.info(f"{vrt}: from {ministack.dates[0]} {ministack.file_list[-1]}") + logger.info(f"{vrt_stack}: from {ministack.dates[0]} {ministack.file_list[-1]}") - nrows, ncols = vrt.shape[-2:] + nrows, ncols = vrt_stack.shape[-2:] nodata_mask = _get_nodata_mask(mask_file, nrows, ncols) ps_mask = _get_ps_mask(ps_mask_file, nrows, ncols) @@ -101,14 +100,14 @@ def run_wrapped_phase_single( # Create the background writer for this ministack writer = io.BackgroundBlockWriter() - logger.info(f"Total stack size (in pixels): {vrt.shape}") + logger.info(f"Total stack size (in pixels): {vrt_stack.shape}") # Set up the output folder with empty files to write into phase_linked_slc_files = setup_output_folder( ministack=ministack, driver="GTiff", strides=strides, output_folder=output_folder, - like_filename=vrt.outfile, + like_filename=vrt_stack.outfile, nodata=0, ) @@ -131,7 +130,7 @@ def run_wrapped_phase_single( for op in output_files: io.write_arr( arr=None, - like_filename=vrt.outfile, + like_filename=vrt_stack.outfile, output_name=op.filename, dtype=op.dtype, strides=op.strides, @@ -147,7 +146,7 @@ def run_wrapped_phase_single( half_window=half_window_tup, ) # Set up the background loader - loader = EagerLoader(reader=vrt, block_shape=block_shape) + loader = EagerLoader(reader=vrt_stack, block_shape=block_shape) # Queue all input slices, skip ones that are all nodata blocks = [] # Queue all input slices, skip ones that are all nodata @@ -190,8 +189,6 @@ def run_wrapped_phase_single( amp_stack=amp_stack, method=shp_method, ) - # Run the phase linking process on the current ministack - reference_idx = max(0, first_real_slc_idx - 1) try: pl_output = run_phase_linking( cur_data, @@ -199,7 +196,7 @@ def run_wrapped_phase_single( strides=strides_tup, use_evd=use_evd, beta=beta, - reference_idx=reference_idx, + reference_idx=ministack.output_reference_idx, nodata_mask=nodata_mask[in_rows, in_cols], ps_mask=ps_mask[in_rows, in_cols], neighbor_arrays=neighbor_arrays, @@ -244,6 +241,7 @@ def run_wrapped_phase_single( cur_data[first_real_slc_idx:, in_trim_rows, in_trim_cols], pl_output.cpx_phase[first_real_slc_idx:, out_trim_rows, out_trim_cols], slc_mean=cur_data_mean, + reference_idx=ministack.compressed_reference_idx, ) # TODO: truncate @@ -288,7 +286,7 @@ def run_wrapped_phase_single( # Block until all the writers for this ministack have finished logger.info(f"Waiting to write {writer.num_queued} blocks of data.") writer.notify_finished() - logger.info(f"Finished ministack of size {vrt.shape}.") + logger.info(f"Finished ministack of size {vrt_stack.shape}.") logger.info("Repacking for more compression") io.repack_rasters(phase_linked_slc_files, keep_bits=12) @@ -386,9 +384,9 @@ def setup_output_folder( strides = {"y": 1, "x": 1} if output_folder is None: output_folder = ministack.output_folder - # Note: DONT use the ministack.output_folder here, since that will - # be the tempdir made by @atomic_output - # # output_folder = Path(ministack.output_folder) + # Note: during the workflow, the ministack.output_folder is different than + # the `run_wrapped_phase_single` argument `output_folder`. + # The latter is the tempdir made by @atomic_output output_folder.mkdir(exist_ok=True, parents=True) start_idx = ministack.first_real_slc_idx diff --git a/src/dolphin/workflows/wrapped_phase.py b/src/dolphin/workflows/wrapped_phase.py index 8e7e7b53..acc9ca5c 100644 --- a/src/dolphin/workflows/wrapped_phase.py +++ b/src/dolphin/workflows/wrapped_phase.py @@ -9,7 +9,7 @@ import numpy as np from opera_utils import get_dates, make_nodata_mask -from dolphin import Bbox, Filename, interferogram, masking, ps, stack +from dolphin import Bbox, Filename, interferogram, masking, ps from dolphin._log import log_runtime, setup_logging from dolphin.io import VRTStack @@ -75,12 +75,6 @@ def run( # Mark any files beginning with "compressed" as compressed is_compressed = ["compressed" in str(f).lower() for f in input_file_list] - input_dates = _get_input_dates( - input_file_list, is_compressed, cfg.input_options.cslc_date_fmt - ) - reference_date, reference_idx = _get_reference_date_idx( - input_file_list, is_compressed, input_dates - ) non_compressed_slcs = [ f for f, is_comp in zip(input_file_list, is_compressed) if not is_comp @@ -140,16 +134,18 @@ def run( pl_path = cfg.phase_linking._directory pl_path.mkdir(parents=True, exist_ok=True) - ministack_planner = stack.MiniStackPlanner( - file_list=input_file_list, - dates=input_dates, - is_compressed=is_compressed, - output_folder=pl_path, - max_num_compressed=cfg.phase_linking.max_num_compressed, - reference_date=reference_date, - reference_idx=reference_idx, + input_dates = _get_input_dates( + input_file_list, is_compressed, cfg.input_options.cslc_date_fmt ) + extra_reference_date = cfg.output_options.extra_reference_date + if extra_reference_date: + new_compressed_slc_reference_idx = _get_nearest_idx( + [dtup[0] for dtup in input_dates], extra_reference_date + ) + else: + new_compressed_slc_reference_idx = None + phase_linked_slcs = sorted(pl_path.glob("2*.tif")) if len(phase_linked_slcs) > 0: logger.info(f"Skipping EVD step, {len(phase_linked_slcs)} files already exist") @@ -166,9 +162,10 @@ def run( shp_nslc = None (phase_linked_slcs, comp_slc_list, temp_coh_file, shp_count_file) = ( sequential.run_wrapped_phase_sequential( - slc_vrt_file=vrt_stack.outfile, - ministack_planner=ministack_planner, + slc_vrt_stack=vrt_stack, + output_folder=pl_path, ministack_size=cfg.phase_linking.ministack_size, + new_compressed_reference_idx=new_compressed_slc_reference_idx, half_window=cfg.phase_linking.half_window.model_dump(), strides=strides, use_evd=cfg.phase_linking.use_evd, @@ -180,6 +177,7 @@ def run( shp_method=cfg.phase_linking.shp_method, shp_alpha=cfg.phase_linking.shp_alpha, shp_nslc=shp_nslc, + cslc_date_fmt=cfg.input_options.cslc_date_fmt, block_shape=cfg.worker_settings.block_shape, baseline_lag=cfg.phase_linking.baseline_lag, **kwargs, @@ -212,8 +210,18 @@ def run( ) logger.info(f"Creating virtual interferograms from {len(phase_linked_slcs)} files") + # TODO: with manual indexes, this may be split into 2 and redone + reference_date = [ + get_dates(f, fmt=cfg.input_options.cslc_date_fmt)[0] for f in input_file_list + ][cfg.phase_linking.output_reference_idx] + + ifg_file_list: list[Path] = [] ifg_file_list = create_ifgs( - ifg_network, phase_linked_slcs, any(is_compressed), reference_date + interferogram_network=ifg_network, + phase_linked_slcs=phase_linked_slcs, + contained_compressed_slcs=any(is_compressed), + reference_date=reference_date, + extra_reference_date=cfg.output_options.extra_reference_date, ) return ( ifg_file_list, @@ -230,6 +238,7 @@ def create_ifgs( phase_linked_slcs: Sequence[Path], contained_compressed_slcs: bool, reference_date: datetime.datetime, + extra_reference_date: datetime.datetime | None = None, dry_run: bool = False, ) -> list[Path]: """Create the list of interferograms for the `phase_linked_slcs`. @@ -246,6 +255,9 @@ def create_ifgs( compressed SLCs. reference_date : datetime.datetime Date/datetime of the "base phase" for the `phase_linked_slcs` + extra_reference_date : datetime.datetime, optional + If provided, makes another set of interferograms referenced to this + for all dates later than it. dry_run : bool Flag indicating that the ifgs should not be written to disk. Default = False (ifgs will be created). @@ -267,7 +279,10 @@ def create_ifgs( ifg_dir = interferogram_network._directory if not dry_run: ifg_dir.mkdir(parents=True, exist_ok=True) + ifg_file_list: list[Path] = [] + + secondary_dates = [get_dates(f)[0] for f in phase_linked_slcs] if not contained_compressed_slcs: # When no compressed SLCs were passed in to the config, we can directly pass # options to `Network` and get the ifg list @@ -294,21 +309,39 @@ def create_ifgs( # The total SLC phases we have to work with are # 1. reference date (might be before any dates in the filenames) # 2. the secondary of all phase-linked SLCs (which are the names of the files) + if extra_reference_date is None: + # To get the ifgs from the reference date to secondary(conj), this means + # a `.conj()` on the phase-linked SLCs (currently `day1.conj() * day2`) + single_ref_ifgs = [ + interferogram.convert_pl_to_ifg( + f, reference_date=reference_date, output_dir=ifg_dir, dry_run=dry_run + ) + for f in phase_linked_slcs + ] + else: + manual_reference_idx = _get_nearest_idx(secondary_dates, extra_reference_date) - # To get the ifgs from the reference date to secondary(conj), this involves doing - # a `.conj()` on the phase-linked SLCs (which are currently `day1.conj() * day2`) - single_ref_ifgs = [ - interferogram.convert_pl_to_ifg( - f, reference_date=reference_date, output_dir=ifg_dir, dry_run=dry_run + single_ref_ifgs = [ + interferogram.convert_pl_to_ifg( + f, reference_date=reference_date, output_dir=ifg_dir, dry_run=dry_run + ) + for f in phase_linked_slcs[: manual_reference_idx + 1] + ] + single_ref_ifgs.extend( + [ + interferogram.convert_pl_to_ifg( + f, + reference_date=extra_reference_date, + output_dir=ifg_dir, + dry_run=dry_run, + ) + for f in phase_linked_slcs[manual_reference_idx + 1 :] + ] ) - for f in phase_linked_slcs - ] - # If we're only wanting single-reference day-(reference) to day-k interferograms, - # these are all we need - # XX Fix this hack for later if interferogram_network.indexes and interferogram_network.indexes == [(0, -1)]: ifg_file_list.append(single_ref_ifgs[-1]) + # XXX Fix this hack for later # # This isn't really what we want here, the logic is different than Network: # ifgs = [ # (single_ref_ifgs[ref_idx], single_ref_ifgs[sec_idx]) @@ -334,12 +367,11 @@ def create_ifgs( # (which are the (ref_date, ...) interferograms),... ifgs_ref_date = single_ref_ifgs[:max_b] # ...then combine it with the results from the `Network` - # Manually specify the dates, which come from the names of `phase_linked_slcs` - secondary_dates = [get_dates(f)[0] for f in phase_linked_slcs] network_rest = interferogram.Network( slc_list=phase_linked_slcs, max_bandwidth=max_b, outdir=ifg_dir, + # Manually specify the dates, which come from the names of phase_linked_slcs dates=secondary_dates, write=not dry_run, verify_slcs=not dry_run, @@ -389,11 +421,12 @@ def _get_input_dates( # For any that aren't compressed, take the first date. # this is because the official product name of OPERA/Sentinel1 has both # "acquisition_date" ... "generation_date" in the filename + # For compressed, we want the first 3 dates: (base phase, start, end) # TODO: this is a bit hacky, perhaps we can make this some input option # so that the user can specify how to get dates from their files (or even # directly pass in dates?) return [ - dates[:1] if not is_comp else dates + dates[:1] if not is_comp else dates[:3] for dates, is_comp in zip(input_dates, is_compressed) ] @@ -446,3 +479,22 @@ def _get_mask( mask_filename = nodata_mask_file return mask_filename + + +def _get_nearest_idx( + input_dates: Sequence[datetime.datetime], + selected_date: datetime.datetime, +) -> int: + """Find the index nearest to `selected_date` within `input_dates`.""" + sorted_inputs = sorted(input_dates) + if not sorted_inputs[0] <= selected_date <= sorted_inputs[-1]: + msg = f"Requested {selected_date} falls outside of input range: " + msg += f"{sorted_inputs[0]}, {sorted_inputs[-1]}" + raise ValueError(msg) + + nearest_idx = min( + range(len(input_dates)), + key=lambda i: abs((input_dates[i] - selected_date).total_seconds()), + ) + + return nearest_idx diff --git a/tests/test_stack.py b/tests/test_stack.py index cff5c0e7..c947d4ca 100644 --- a/tests/test_stack.py +++ b/tests/test_stack.py @@ -89,8 +89,7 @@ def ministack(files, date_lists, is_compressed): ) -def test_ministack_attrs(ministack, dates): - assert ministack.full_date_range == (dates[0], dates[-1]) +def test_ministack_attrs(ministack): assert ministack.real_slc_date_range_str == "20220101_20220110" @@ -146,7 +145,8 @@ def run_ministack_planner(files, date_lists, is_compressed): assert ms.output_folder == expected_out_folders[idx] - assert all(ms.reference_date == datetime(2022, 1, 1) for ms in ms_list) + assert all(ms.compressed_reference_date == datetime(2022, 1, 1) for ms in ms_list) + assert all(ms.output_reference_date == datetime(2022, 1, 1) for ms in ms_list) def test_ministack_planner_gtiff(files, date_lists, is_compressed): @@ -241,3 +241,24 @@ def test_hit_max_compressed(slc_file_list, slc_date_list, is_compressed): assert len(ministacks[2].file_list) == 5 assert len(ministacks[3].file_list) == 6 assert all(len(m.file_list) == 6 for m in ministacks[3:-1]) + + +def test_compressed_idx_setting(slc_file_list, slc_date_list, is_compressed): + """Check the planning works to set a manually passed compressed index.""" + is_compressed = [False] * len(slc_file_list) + ministack_planner = MiniStackPlanner( + file_list=slc_file_list, + dates=slc_date_list, + is_compressed=is_compressed, + output_folder=Path("fake_dir"), + ) + # Check we can get the compressed SLC info + expected_ref_date = slc_date_list[8] + + # This currently works ONLY for one single ministack planning + # (e.g. manually separating ministack runs and re-passing in CCSLCs) + ministacks = ministack_planner.plan(len(slc_file_list), compressed_idx=8) + + assert ministacks[0].compressed_reference_date == expected_ref_date + # But, the output reference date should be the first one (since it was unset) + assert ministacks[0].output_reference_date == slc_date_list[0] diff --git a/tests/test_workflows_displacement.py b/tests/test_workflows_displacement.py index ee4f34f0..9d111a3b 100644 --- a/tests/test_workflows_displacement.py +++ b/tests/test_workflows_displacement.py @@ -15,10 +15,7 @@ ) -def test_displacement_run_single( - opera_slc_files: list[Path], - tmpdir, -): +def test_displacement_run_single(opera_slc_files: list[Path], tmpdir): with tmpdir.as_cwd(): cfg = config.DisplacementWorkflow( cslc_file_list=opera_slc_files, @@ -183,3 +180,42 @@ def test_separate_workflow_runs(slc_file_list, tmp_path): assert len(ifgs3_b) == 10 # Names should be the same as the previous run assert [f.name for f in ifgs3_b] == [f.name for f in ifgs3] + + +def test_displacement_run_extra_reference_date(opera_slc_files: list[Path], tmpdir): + with tmpdir.as_cwd(): + cfg = config.DisplacementWorkflow( + # start_date = 20220101 + # shape = (4, 128, 128) + # First one is COMPRESSED_ + output_options={"extra_reference_date": "2022-01-03"}, + unwrap_options={"unwrap_method": "phass"}, + cslc_file_list=opera_slc_files, + input_options={"subdataset": "/data/VV"}, + phase_linking={ + "ministack_size": 4, + }, + ) + paths = displacement.run(cfg) + + for slc_paths in paths.comp_slc_dict.values(): + # The "base phase" should be 20220103 + assert slc_paths[0].name == "compressed_20220103_20220102_20220104.tif" + + # The unwrappd files should have a changeover to the new reference + assert paths.unwrapped_paths is not None + unw_names = [pp.name for pp in paths.unwrapped_paths] + assert unw_names == [ + "20220101_20220102.unw.tif", + "20220101_20220103.unw.tif", + "20220103_20220104.unw.tif", + ] + + # But the timeseries will have inverted the results + assert paths.timeseries_paths is not None + ts_names = [pp.name for pp in paths.timeseries_paths] + assert ts_names == [ + "20220101_20220102.tif", + "20220101_20220103.tif", + "20220101_20220104.tif", + ] diff --git a/tests/test_workflows_sequential.py b/tests/test_workflows_sequential.py index 6d93a9f3..ecb4bb73 100644 --- a/tests/test_workflows_sequential.py +++ b/tests/test_workflows_sequential.py @@ -1,7 +1,5 @@ import pytest -from dolphin import stack - # from dolphin._types import HalfWindow, Strides from dolphin.io import _readers from dolphin.phase_link import simulate @@ -25,16 +23,10 @@ def test_sequential_gtiff(tmp_path, slc_file_list): pytest.skip(f"Output shape = {out_shape}") output_folder = tmp_path / "sequential" ms_size = 10 - ms_planner = stack.MiniStackPlanner( - file_list=slc_file_list, - dates=vrt_stack.dates, - is_compressed=[False] * len(slc_file_list), - output_folder=output_folder, - ) sequential.run_wrapped_phase_sequential( - slc_vrt_file=vrt_file, - ministack_planner=ms_planner, + slc_vrt_stack=vrt_stack, + output_folder=output_folder, ministack_size=ms_size, half_window=half_window, strides=strides, @@ -71,16 +63,9 @@ def test_sequential_nc(tmp_path, slc_file_list_nc, half_window, strides): if not all(out_shape): pytest.skip(f"Output shape = {out_shape}") - ms_planner = stack.MiniStackPlanner( - file_list=slc_file_list_nc, - dates=v.dates, - is_compressed=[False] * len(slc_file_list_nc), - output_folder=tmp_path / "sequential", - ) - sequential.run_wrapped_phase_sequential( - slc_vrt_file=vrt_file, - ministack_planner=ms_planner, + slc_vrt_stack=v, + output_folder=tmp_path / "sequential", ministack_size=10, half_window=half_window, strides=strides, @@ -102,18 +87,12 @@ def test_sequential_ministack_sizes(tmp_path, slc_file_list_nc, ministack_size): slc_file_list_nc[:21], outfile=vrt_file, subdataset="data" ) _, rows, cols = vrt_stack.shape - ms_planner = stack.MiniStackPlanner( - file_list=vrt_stack.file_list, - dates=vrt_stack.dates, - is_compressed=[False] * len(vrt_stack.file_list), - output_folder=tmp_path / "sequential", - ) # Record the warning, check after if it's thrown sequential.run_wrapped_phase_sequential( - slc_vrt_file=vrt_file, - ministack_planner=ms_planner, + slc_vrt_stack=vrt_stack, ministack_size=ministack_size, + output_folder=tmp_path / "sequential", half_window={"x": cols // 2, "y": rows // 2}, strides={"x": 1, "y": 1}, ps_mask_file=None, diff --git a/tests/test_workflows_single.py b/tests/test_workflows_single.py index b8c3872d..98cc44dd 100644 --- a/tests/test_workflows_single.py +++ b/tests/test_workflows_single.py @@ -13,7 +13,6 @@ def test_sequential_gtiff(tmp_path, slc_file_list): vrt_file = tmp_path / "slc_stack.vrt" files = slc_file_list[:3] vrt_stack = _readers.VRTStack(files, outfile=vrt_file) - _, rows, cols = vrt_stack.shape is_compressed = [False] * len(files) ministack = stack.MiniStackInfo( file_list=vrt_stack.file_list, @@ -26,7 +25,7 @@ def test_sequential_gtiff(tmp_path, slc_file_list): strides = {"x": 1, "y": 1} output_folder = tmp_path / "single" single.run_wrapped_phase_single( - slc_vrt_file=vrt_file, + vrt_stack=vrt_stack, ministack=ministack, output_folder=output_folder, half_window=half_window,