Skip to content

Commit

Permalink
Add logic for manually specifying reference dates mid-stack (#334)
Browse files Browse the repository at this point in the history
* use the `reference_idx` from each `ministack`, dont hardcode to 0

* remove commented `stack.py` code

* Add a `reference_idx` arg to `compress`

* update `MiniStackPlanner` to accept manual reference indexes

* start passing through logic to account for manual reference changeovers

* fix num parallel to respect the file length

* get the linked phase part working with manual references

* move the reference date to be an output option

* get basic single-reference changeover working for interferograms

* specify naive datetime

* add a test for ref update workflow

* Setup test to check names of output ifgs

* fix pre-commit/test err

* Add an `output_reference_idx` to `PhaseLinkingOptions`

This accounts for the fact that we may pass in several compressed SLCs for a
single run, and we wish to make the phase linking outputs referenced to
something besides the first SLC.

For example, if we had
   1_1_3
   1_4_6
   7_7_9
   10
   11
   12

And we wanted to output interferograms relative to `7`, we need to
specify that to the phase linking functions; otherwise, it will
output things relative to day 1. Since we dont write out rasters
corresponding to compressed SLC inputs, we'd have nothing from
day 7 to re-refrence later.

Note that an alternative might be to always write out rasters for
every input in a ministack, including the compressed inputs.
The downside is that during a normal local Sequential run with
multiple ministacks, you write out redundant rasters and need to
remove/select the right ones afterward.
But if that way seems cleaner later, we could revert this commit.

* Make 2 ministack idxs- one for `compressed_reference`, one for `output_reference`

* separate ref/compressed idx in `.plan()`, use new api for `run_wrapped_phase_sequential`

* fix tests, make `single` also use `vrt_stack` instead of filename

* fix tests for `stack.py`

* let only 1 compressed reference idx be passed to `.plan`

* clean up commented, pass the `output_reference_index` to `run_phase_linking` in `single`
  • Loading branch information
scottstanie authored Sep 15, 2024
1 parent 733f414 commit 78e6375
Show file tree
Hide file tree
Showing 12 changed files with 308 additions and 156 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ repos:
rev: "24.8.0"
hooks:
- id: black
args: [--preview]

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.4
Expand Down
15 changes: 13 additions & 2 deletions src/dolphin/phase_link/_compress.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@


def compress(
slc_stack: ArrayLike, pl_cpx_phase: ArrayLike, slc_mean: ArrayLike | None = None
slc_stack: ArrayLike,
pl_cpx_phase: ArrayLike,
slc_mean: ArrayLike | None = None,
reference_idx: int | None = None,
):
"""Compress the stack of SLC data using the estimated phase.
Expand All @@ -21,16 +24,24 @@ def compress(
slc_mean : ArrayLike, optional
The mean SLC magnitude, shape (rows, cols), to use as output pixel magnitudes.
If None, the mean is computed from the input SLC stack.
reference_idx : int, optional
If provided, the `pl_cpx_phase` will be re-referenced to `reference_idx` before
performing the dot product.
Default is `None`, which keeps `pl_cpx_phase` as provided.
Returns
-------
np.array
The compressed SLC data, shape (rows, cols)
"""
if reference_idx is not None:
pl_referenced = pl_cpx_phase * pl_cpx_phase[reference_idx][None, :, :].conj()
else:
pl_referenced = pl_cpx_phase
# If the output is downsampled, we need to make `pl_cpx_phase` the same shape
# as the output
pl_estimate_upsampled = upsample_nearest(pl_cpx_phase, slc_stack.shape[1:])
pl_estimate_upsampled = upsample_nearest(pl_referenced, slc_stack.shape[1:])
# For each pixel, project the SLCs onto the (normalized) estimated phase
# by performing a pixel-wise complex dot product
with warnings.catch_warnings():
Expand Down
99 changes: 52 additions & 47 deletions src/dolphin/stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ class BaseStack(BaseModel):
Path(),
description="Folder/location where ministack will write outputs to.",
)
reference_idx: int = Field(
0,
description="Index of the SLC to use as reference during phase linking",
)

model_config = {
# For the `Filename, so it can handle the `GeneralPath` protocol`
Expand Down Expand Up @@ -84,33 +80,6 @@ def _check_lengths(self):
raise ValueError(msg)
return self

# @model_validator(mode="after")
# def _check_unset_reference_date(self):
# if self.reference_date == _DUMMY_DATE:
# ref_date = self.dates[self.reference_idx][0]
# logger.debug("No reference date provided, using first date: %s", ref_date)
# self.reference_date = ref_date
# return self
@property
def reference_date(self):
"""Date of the reference phase of the stack."""
# Note this works for either a length-1 tuple (real SLC), or for
# the compressed SLC formate (ref, start, end)
return self.dates[self.reference_idx][0]

@property
def full_date_range(self) -> tuple[DateOrDatetime, DateOrDatetime]:
"""Full date range of all SLCs in the ministack."""
return (self.reference_date, self.dates[-1][-1])

@property
def full_date_range_str(self) -> str:
"""Full date range of the ministack as a string, e.g. '20210101_20210202'.
Includes both compressed + normal SLCs in the range.
"""
return format_dates(*self.full_date_range, fmt=self.file_date_fmt)

@property
def first_real_slc_idx(self) -> int:
"""Index of the first real SLC in the ministack."""
Expand Down Expand Up @@ -151,10 +120,8 @@ def __rich_repr__(self):
yield "file_list", self.file_list
yield "dates", self.dates
yield "is_compressed", self.is_compressed
yield "reference_date", self.reference_date
yield "file_date_fmt", self.file_date_fmt
yield "output_folder", self.output_folder
yield "reference_idx", self.reference_idx


class CompressedSlcInfo(BaseModel):
Expand Down Expand Up @@ -363,6 +330,32 @@ class MiniStackInfo(BaseStack):
Used for planning the processing of a batch of SLCs.
"""

output_reference_idx: int = Field(
0,
description="Index of the SLC to use as reference during phase linking",
)
compressed_reference_idx: int = Field(
0,
description=(
"Index of the SLC to use as during compressed SLC creation. May be"
" different than `output_reference_idx`."
),
)

@property
def output_reference_date(self):
"""Date of the reference phase of the stack."""
# Note this works for either a length-1 tuple (real SLC), or for
# the compressed SLC formate (ref, start, end)
return self.dates[self.output_reference_idx][0]

@property
def compressed_reference_date(self):
"""Date of the reference phase of the stack."""
# Note this works for either a length-1 tuple (real SLC), or for
# the compressed SLC formate (ref, start, end)
return self.dates[self.compressed_reference_idx][0]

def get_compressed_slc_info(self) -> CompressedSlcInfo:
"""Get the compressed SLC which will come from this ministack.
Expand All @@ -379,7 +372,7 @@ def get_compressed_slc_info(self) -> CompressedSlcInfo:
real_slc_dates.append(d)

return CompressedSlcInfo(
reference_date=self.reference_date,
reference_date=self.compressed_reference_date,
start_date=real_slc_dates[0][0],
end_date=real_slc_dates[-1][0],
real_slc_file_list=real_slc_files,
Expand All @@ -394,21 +387,33 @@ class MiniStackPlanner(BaseStack):
"""Class for planning the processing of batches of SLCs."""

max_num_compressed: int = 5
output_reference_idx: int = Field(
0,
description="Index of the SLC to use as reference during phase linking",
)

def plan(self, ministack_size: int) -> list[MiniStackInfo]:
def plan(
self, ministack_size: int, compressed_idx: int | None = None
) -> list[MiniStackInfo]:
"""Create a list of ministacks to be processed."""
if ministack_size < 2:
msg = "Cannot create ministacks with size < 2"
raise ValueError(msg)

# For now, only allow `compressed_idx` when doing a single batch.
# The logic is more complicated for multiple `compressed_idx`s, and
# it's unclear who would need that
if compressed_idx is not None and ministack_size < len(self.file_list):
raise ValueError(
"Cannot set `compressed_idx` when creating multiple ministacks."
)

output_ministacks: list[MiniStackInfo] = []

# Start of with any compressed SLCs that are passed in
compressed_slc_infos: list[CompressedSlcInfo] = []
for f in self.compressed_slc_file_list:
# TODO: will we ever actually need to read the old metadata here?
# # Note: these must actually exist to be used!
# compressed_slc_infos.append(CompressedSlcInfo.from_file_metadata(f))
compressed_slc_infos.append(CompressedSlcInfo.from_filename(f))

# Solve each ministack using current chunk (and the previous compressed SLCs)
Expand Down Expand Up @@ -436,27 +441,27 @@ def plan(self, ministack_size: int) -> list[MiniStackInfo]:
combined_is_compressed = num_ccslc * [True] + list(
self.is_compressed[cur_slice]
)
# If there are any compressed SLCs, set the reference to the last one
try:
last_compressed_idx = np.where(combined_is_compressed)[0]
reference_idx = last_compressed_idx[-1]
except IndexError:
reference_idx = 0

# Make the current ministack output folder using the start/end dates
new_date_str = format_dates(
cur_dates[0][0], cur_dates[-1][-1], fmt=self.file_date_fmt
)
cur_output_folder = self.output_folder / new_date_str

if compressed_idx is None:
# TODO: To change to Ansari-style ministack references, change this
# so that a new `output_reference_idx` gets made
compressed_reference_idx = self.output_reference_idx
else:
compressed_reference_idx = compressed_idx

cur_ministack = MiniStackInfo(
file_list=combined_files,
dates=combined_dates,
is_compressed=combined_is_compressed,
reference_idx=reference_idx,
output_reference_idx=self.output_reference_idx,
compressed_reference_idx=compressed_reference_idx,
output_folder=cur_output_folder,
reference_date=self.reference_date,
# TODO: we'll need to alter logic here if we dont fix
# reference_idx=0, since this will change the reference date
)

output_ministacks.append(cur_ministack)
Expand Down
25 changes: 22 additions & 3 deletions src/dolphin/workflows/config/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
BaseModel,
ConfigDict,
Field,
NaiveDatetime,
PrivateAttr,
field_validator,
model_validator,
Expand Down Expand Up @@ -75,14 +76,21 @@ class PhaseLinkingOptions(BaseModel, extra="forbid"):
10, description="Size of the ministack for sequential estimator.", gt=1
)
max_num_compressed: int = Field(
5,
100,
description=(
"Maximum number of compressed images to use in sequential estimator."
" If there are more ministacks than this, the earliest CCSLCs will be"
" left out of the later stacks."
" left out of the later stacks. "
),
gt=0,
)
output_reference_idx: int = Field(
0,
description=(
"Index of input SLC to use for making phase linked interferograms after"
" EVD/EMI."
),
)
half_window: HalfWindow = HalfWindow()
use_evd: bool = Field(
False, description="Use EVD on the coherence instead of using the EMI algorithm"
Expand Down Expand Up @@ -155,7 +163,6 @@ class InterferogramNetwork(BaseModel, extra="forbid"):
),
)

# validation
@model_validator(mode="after")
def _check_zero_parameters(self) -> InterferogramNetwork:
ref_idx = self.reference_idx
Expand Down Expand Up @@ -318,6 +325,18 @@ class OutputOptions(BaseModel, extra="forbid"):
[4, 8, 16, 32, 64],
description="List of overview levels to create (if `add_overviews=True`).",
)
# Note: we use NaiveDatetime, since other datetime parsing results in Naive
# (no TzInfo) datetimes, which can't be compared to datetimes with timezones
extra_reference_date: Optional[NaiveDatetime] = Field(
None,
description=(
"Specify an extra reference datetime in UTC. Adding this lets you"
" to create and unwrap two single reference networks; the later resets at"
" the given date (e.g. for a large earthquake event). If passing strings,"
" formats accepted are YYYY-MM-DD[T]HH:MM[:SS[.ffffff]][Z or [±]HH[:]MM],"
" or YYYY-MM-DD"
),
)

# validators
@field_validator("bounds", mode="after")
Expand Down
6 changes: 3 additions & 3 deletions src/dolphin/workflows/displacement.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ def run(
comp_slc_dict: dict[str, list[Path]] = {}
# Now for each burst, run the wrapped phase estimation
# Try running several bursts in parallel...
# Use the Dummy one if not going parallel, as debugging is much simpler
num_parallel = min(cfg.worker_settings.n_parallel_bursts, len(grouped_slc_files))
Executor = (
ProcessPoolExecutor
if cfg.worker_settings.n_parallel_bursts > 1
else utils.DummyProcessPoolExecutor
ProcessPoolExecutor if num_parallel > 1 else utils.DummyProcessPoolExecutor
)
mw = cfg.worker_settings.n_parallel_bursts
ctx = mp.get_context("spawn")
Expand Down
Loading

0 comments on commit 78e6375

Please sign in to comment.