From 1dff8906c3f0d562b356b2c9ca29931a4d26be20 Mon Sep 17 00:00:00 2001 From: Talley Lambert Date: Sun, 9 Jul 2023 20:21:06 -0400 Subject: [PATCH] test: bump coverage (#163) * coverage * more cover --- src/nd2/_binary.py | 4 ++-- src/nd2/_parse/_chunk_decode.py | 12 +++++++----- src/nd2/_parse/_clx_lite.py | 10 +++++++--- src/nd2/_parse/_clx_xml.py | 5 +++-- src/nd2/_util.py | 7 ++++--- src/nd2/readers/_legacy/legacy_reader.py | 16 ++++++++-------- src/nd2/readers/_modern/modern_reader.py | 10 +++++----- tests/test_reader.py | 7 ++++++- 8 files changed, 42 insertions(+), 29 deletions(-) diff --git a/src/nd2/_binary.py b/src/nd2/_binary.py index dbfd6ed..b2edb77 100644 --- a/src/nd2/_binary.py +++ b/src/nd2/_binary.py @@ -158,7 +158,7 @@ def _unpack(stream: io.BufferedIOBase, strct: struct.Struct) -> tuple: return strct.unpack(stream.read(strct.size)) -def _decode_binary_mask(data: bytes, dtype: DTypeLike = "uint16") -> np.ndarray: +def decode_binary_mask(data: bytes, dtype: DTypeLike = "uint16") -> np.ndarray: # this receives data as would be extracted from a # `CustomDataSeq|RleZipBinarySequence...` section in the metadata @@ -170,7 +170,7 @@ def _decode_binary_mask(data: bytes, dtype: DTypeLike = "uint16") -> np.ndarray: # still not sure what _q is # tot_bytes should be length of the stream remaining after this (v, ncols, nrows, nmasks, tot_bytes, _q, _zero) = _unpack(stream, I7) - if v != 3: + if v != 3: # pragma: no cover warnings.warn( f"Expected first byte to be 3 but got {v}. " "Please submit this file :) https://github.com/tlambert03/nd2/issues/.", diff --git a/src/nd2/_parse/_chunk_decode.py b/src/nd2/_parse/_chunk_decode.py index e99f11b..00201f3 100644 --- a/src/nd2/_parse/_chunk_decode.py +++ b/src/nd2/_parse/_chunk_decode.py @@ -99,9 +99,11 @@ def get_version(fh: BinaryIO | StrOrBytesPath) -> tuple[int, int]: if magic != ND2_CHUNK_MAGIC: if magic == JP2_MAGIC: return (1, 0) # legacy JP2 files are version 1.0 - raise ValueError(f"Not a valid ND2 file: {fname}. (magic: {magic!r})") + raise ValueError( # pragma: no cover + f"Not a valid ND2 file: {fname}. (magic: {magic!r})" + ) if name_length != 32 or data_length != 64 or name != ND2_FILE_SIGNATURE: - raise ValueError(f"Corrupt ND2 file header chunk: {fname}") + raise ValueError(f"Corrupt ND2 file header chunk: {fname}") # pragma: no cover # data will now be something like Ver2.0, Ver3.0, etc. return (int(chr(data[3])), int(chr(data[5]))) @@ -143,7 +145,7 @@ def get_chunkmap(fh: BinaryIO, error_radius: int | None = None) -> ChunkMap: # the last (32,8) bytes of the file contain the (signature, location) of chunkmap fh.seek(-40, 2) sig, location = SIG_CHUNKMAP_LOC.unpack(fh.read(SIG_CHUNKMAP_LOC.size)) - if sig != ND2_CHUNKMAP_SIGNATURE: + if sig != ND2_CHUNKMAP_SIGNATURE: # pragma: no cover raise ValueError(f"Invalid ChunkMap signature {sig!r} in file {fh.name!r}") # get all of the data in the chunkmap @@ -286,7 +288,7 @@ def iter_chunks(handle: BinaryIO) -> Iterator[tuple[str, int, int]]: if magic: try: name = handle.read(shift).split(b"\x00", 1)[0].decode("utf-8") - except UnicodeDecodeError: + except UnicodeDecodeError: # pragma: no cover name = "?" yield (name, pos + +CHUNK_HEADER.size + shift, length) pos += CHUNK_HEADER.size + shift + length @@ -388,7 +390,7 @@ def rescue_nd2( buffer=mm, offset=end_hdr + shift + 8, ) - except TypeError as e: + except TypeError as e: # pragma: no cover # buffer is likely too small if verbose: print(f"Error at offset {offset}: {e}") diff --git a/src/nd2/_parse/_clx_lite.py b/src/nd2/_parse/_clx_lite.py index 103e4bd..a163e6b 100644 --- a/src/nd2/_parse/_clx_lite.py +++ b/src/nd2/_parse/_clx_lite.py @@ -116,7 +116,9 @@ def _chunk_name_and_dtype( data_type, name_length = strctBB.unpack(header) if data_type in (ELxLiteVariantType.DEPRECATED, ELxLiteVariantType.UNKNOWN): - raise ValueError(f"Unknown data type in metadata header: {data_type}") + raise ValueError( # pragma: no cover + f"Unknown data type in metadata header: {data_type}" + ) elif data_type == ELxLiteVariantType.COMPRESS: name = "" else: @@ -148,7 +150,8 @@ def json_from_clx_lite_variant( return json_from_clx_lite_variant(deflated, strip_prefix) if data_type == -1: - break + # never seen this, but it's in the sdk + break # pragma: no cover value: JsonValueType if data_type == ELxLiteVariantType.LEVEL: @@ -170,7 +173,8 @@ def json_from_clx_lite_variant( elif data_type in _PARSERS: value = _PARSERS[data_type](stream) else: - value = None + # also never seen this + value = None # pragma: no cover if name == "" and name in output: # nd2 uses empty strings as keys for lists if not isinstance(output[name], list): diff --git a/src/nd2/_parse/_clx_xml.py b/src/nd2/_parse/_clx_xml.py index 47d0cab..62a7f7d 100644 --- a/src/nd2/_parse/_clx_xml.py +++ b/src/nd2/_parse/_clx_xml.py @@ -28,7 +28,7 @@ def _float_or_nan(x: str) -> float: try: return float(x) - except ValueError: + except ValueError: # pragma: no cover return float("nan") @@ -150,7 +150,8 @@ def _node_name_value( # skip empty nodes ... the sdk does this too continue cname = f"i{i:010}" - if cname in value: + if cname in value: # pragma: no cover + # don't see this in tests anymore. but just in case... warnings.warn(f"Duplicate key {cname} in {name}", stacklevel=2) value[cname] = cval diff --git a/src/nd2/_util.py b/src/nd2/_util.py index c8855aa..afe80c6 100644 --- a/src/nd2/_util.py +++ b/src/nd2/_util.py @@ -5,7 +5,7 @@ import warnings from datetime import datetime from itertools import product -from typing import TYPE_CHECKING, BinaryIO, NamedTuple +from typing import TYPE_CHECKING, BinaryIO, NamedTuple, cast if TYPE_CHECKING: from os import PathLike @@ -47,7 +47,8 @@ def is_supported_file( bool Whether the can be opened. """ - if isinstance(path, BinaryIO): + if hasattr(path, "read"): + path = cast("BinaryIO", path) path.seek(0) magic = path.read(4) else: @@ -155,7 +156,7 @@ def parse_time(time_str: str) -> datetime: return datetime.strptime(time_str, fmt_str) except ValueError: continue - raise ValueError(f"Could not parse {time_str}") + raise ValueError(f"Could not parse {time_str}") # pragma: no cover # utils for converting records to dicts, in recorded_data method diff --git a/src/nd2/readers/_legacy/legacy_reader.py b/src/nd2/readers/_legacy/legacy_reader.py index f985160..760877f 100644 --- a/src/nd2/readers/_legacy/legacy_reader.py +++ b/src/nd2/readers/_legacy/legacy_reader.py @@ -149,7 +149,7 @@ def __init__(self, path: FileOrBinaryIO, error_radius: int | None = None) -> Non self._attributes: strct.Attributes | None = None # super().__init__ called open() length, box_type = I4s.unpack(self._fh.read(I4s.size)) # type: ignore - if length != 12 and box_type == b"jP ": + if length != 12 and box_type == b"jP ": # pragma: no cover raise ValueError("File not recognized as Legacy ND2 (JPEG2000) format.") self.lock = threading.RLock() self._frame0_meta_cache: FrameMetaDict | None = None @@ -161,7 +161,7 @@ def is_legacy(self) -> bool: def chunkmap(self) -> dict[bytes, list[int]]: """Return the chunkmap for the file.""" if not self._chunkmap: - if self._fh is None: + if self._fh is None: # pragma: no cover raise OSError("File not open") self._chunkmap = legacy_nd2_chunkmap(self._fh) return self._chunkmap @@ -272,7 +272,7 @@ def _make_loop( params = cast("LoopPars6", params) return None - raise ValueError(f"unrecognized type: {type_}") + raise ValueError(f"unrecognized type: {type_}") # pragma: no cover def attributes(self) -> strct.Attributes: """Load and return the image attributes.""" @@ -351,7 +351,7 @@ def calibration(self) -> dict: return self._decode_chunk(b"ACAL") def _load_chunk(self, key: bytes, index: int = 0) -> bytes: - if not self._fh: + if not self._fh: # pragma: no cover raise ValueError("Attempt to read from closed nd2 file") pos = self.chunkmap[key][index] with self.lock: @@ -360,12 +360,12 @@ def _load_chunk(self, key: bytes, index: int = 0) -> bytes: return self._fh.read(length - I4s.size) def read_frame(self, index: int) -> np.ndarray: - if not self._fh: + if not self._fh: # pragma: no cover raise ValueError("Attempt to read from closed nd2 file") try: from imagecodecs import jpeg2k_decode - except ModuleNotFoundError as e: + except ModuleNotFoundError as e: # pragma: no cover raise ModuleNotFoundError( f"{e}\n" f"Reading legacy format nd2 {self._fh.name!r} requires imagecodecs.\n" @@ -414,7 +414,7 @@ def _frame0_meta(self) -> FrameMetaDict: def header(self) -> dict: try: pos = self.chunkmap[b"jp2h"][0] - except (KeyError, IndexError) as e: + except (KeyError, IndexError) as e: # pragma: no cover raise KeyError("No valid jp2h header found in file") from e fh = cast("BinaryIO", self._fh) fh.seek(pos + I4s.size + 4) # 4 bytes for "label" @@ -442,7 +442,7 @@ def events(self, orient: str, null_value: Any) -> list | Mapping: def legacy_nd2_chunkmap(fh: BinaryIO) -> dict[bytes, list[int]]: fh.seek(-40, 2) sig, map_start = struct.unpack("<32sQ", fh.read()) - if sig != b"LABORATORY IMAGING ND BOX MAP 00": + if sig != b"LABORATORY IMAGING ND BOX MAP 00": # pragma: no cover raise ValueError("Not a legacy ND2 file") fh.seek(-map_start, 2) n_chunks = int.from_bytes(fh.read(4), "big") diff --git a/src/nd2/readers/_modern/modern_reader.py b/src/nd2/readers/_modern/modern_reader.py index 7f73cfd..8021a80 100644 --- a/src/nd2/readers/_modern/modern_reader.py +++ b/src/nd2/readers/_modern/modern_reader.py @@ -91,7 +91,7 @@ def chunkmap(self) -> ChunkMap: } """ if not self._chunkmap: - if self._fh is None: + if self._fh is None: # pragma: no cover raise OSError("File not open") self._chunkmap = get_chunkmap(self._fh, error_radius=self._error_radius) return cast("ChunkMap", self._chunkmap) @@ -117,7 +117,7 @@ def _load_chunk(self, name: bytes) -> bytes: `name` must be a valid key in the chunkmap. """ - if self._fh is None: + if self._fh is None: # pragma: no cover raise OSError("File not open") try: @@ -295,7 +295,7 @@ def read_frame(self, index: int) -> np.ndarray: """Read a chunk directly without using SDK.""" if index > self._seq_count(): raise IndexError(f"Frame out of range: {index}") - if not self._fh: + if not self._fh: # pragma: no cover raise ValueError("Attempt to read from closed nd2 file") offset = self._frame_offsets.get(index, None) if offset is None: @@ -533,7 +533,7 @@ def _acquisition_date(self) -> datetime.datetime | str | None: return None def binary_data(self) -> BinaryLayers | None: - from nd2._binary import BinaryLayer, BinaryLayers, _decode_binary_mask + from nd2._binary import BinaryLayer, BinaryLayers, decode_binary_mask chunk_key = b"CustomDataVar|BinaryMetadata_v1!" if chunk_key not in self.chunkmap: @@ -560,7 +560,7 @@ def binary_data(self) -> BinaryLayers | None: for bs in binseqs: if key in bs: data = self._load_chunk(bs)[4:] - _masks.append(_decode_binary_mask(data) if data else None) + _masks.append(decode_binary_mask(data) if data else None) mask_items.append( BinaryLayer( data=_masks, diff --git a/tests/test_reader.py b/tests/test_reader.py index bc50811..d64e816 100644 --- a/tests/test_reader.py +++ b/tests/test_reader.py @@ -9,7 +9,8 @@ import pytest import xarray as xr from nd2 import ND2File, imread -from nd2._util import AXIS +from nd2._parse._chunk_decode import get_version +from nd2._util import AXIS, is_supported_file from resource_backed_dask_array import ResourceBackedDaskArray DATA = Path(__file__).parent / "data" @@ -269,7 +270,11 @@ def test_gc_triggers_cleanup(single_nd2): def test_file_handles(single_nd2: Path) -> None: """Test that we can open a file with a file handle also""" + # just for coverage, since usually it will use the filehandle + assert get_version(single_nd2) == (3, 0) + with open(single_nd2, "rb") as fh: + assert is_supported_file(fh) f = ND2File(fh) assert f.path == str(single_nd2) assert f.version == (3, 0)