From e5b0ce88e91daffb14fe9fe688671be989200897 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 21 Sep 2023 12:22:26 -0500 Subject: [PATCH 1/9] use fetch-depth: 0 because uproot version checks dask-awkward (#372) --- .github/workflows/awkward-main.yml | 2 ++ .github/workflows/conda-tests.yml | 2 ++ .github/workflows/coverage.yml | 2 ++ .github/workflows/pypi-tests.yml | 2 ++ 4 files changed, 8 insertions(+) diff --git a/.github/workflows/awkward-main.yml b/.github/workflows/awkward-main.yml index b8ac3c6d..15a1b00a 100644 --- a/.github/workflows/awkward-main.yml +++ b/.github/workflows/awkward-main.yml @@ -21,6 +21,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 + with: + fetch-depth: 0 - name: Setup Python uses: actions/setup-python@v4 with: diff --git a/.github/workflows/conda-tests.yml b/.github/workflows/conda-tests.yml index e5db678a..8214be7f 100644 --- a/.github/workflows/conda-tests.yml +++ b/.github/workflows/conda-tests.yml @@ -22,6 +22,8 @@ jobs: steps: - name: Checkout source uses: actions/checkout@v4 + with: + fetch-depth: 0 - name: Setup Conda Environment uses: conda-incubator/setup-miniconda@v2 with: diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index ffdf828b..fc70799b 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -21,6 +21,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 + with: + fetch-depth: 0 - name: Setup Python uses: actions/setup-python@v4 with: diff --git a/.github/workflows/pypi-tests.yml b/.github/workflows/pypi-tests.yml index 313dfc13..c974b187 100644 --- a/.github/workflows/pypi-tests.yml +++ b/.github/workflows/pypi-tests.yml @@ -23,6 +23,8 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 + with: + fetch-depth: 0 - name: setup Python ${{matrix.python-version}} uses: actions/setup-python@v4 with: From 605d84286d84c86877a7e638f121922718949525 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Mon, 25 Sep 2023 10:25:19 -0500 Subject: [PATCH 2/9] awkward main has 'None' str as possibility (#374) --- src/dask_awkward/lib/optimize.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 855c4c41..50ce634e 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -426,7 +426,7 @@ def _necessary_columns(dsk: HighLevelGraph) -> dict[str, list[str]]: necessary_columns = [] for key in sorted(touched_data_keys): - if key == name: + if key == name or key == "None": continue layer, column = key.split(".", 1) From ffa20fb750cf8f707168838baa4bc98faa58797c Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Mon, 25 Sep 2023 10:39:07 -0500 Subject: [PATCH 3/9] misc: typing (#373) --- pyproject.toml | 10 +++- src/dask_awkward/layers/layers.py | 5 +- src/dask_awkward/lib/core.py | 75 +++++++++--------------- src/dask_awkward/lib/inspect.py | 9 ++- src/dask_awkward/lib/io/io.py | 45 +++++++------- src/dask_awkward/lib/io/json.py | 66 ++++----------------- src/dask_awkward/lib/io/parquet.py | 71 +--------------------- src/dask_awkward/lib/io/text.py | 12 ++-- src/dask_awkward/lib/operations.py | 5 +- src/dask_awkward/lib/optimize.py | 42 ++++++------- src/dask_awkward/lib/str.py | 7 +-- src/dask_awkward/lib/structure.py | 4 +- src/dask_awkward/lib/unproject_layout.py | 5 +- src/dask_awkward/pickle.py | 6 +- src/dask_awkward/utils.py | 7 ++- tests/conftest.py | 6 +- tests/test_core.py | 16 +++-- tests/test_io.py | 8 +-- tests/test_io_json.py | 3 +- tests/test_optimize.py | 4 +- tests/test_parquet.py | 2 +- 21 files changed, 148 insertions(+), 260 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 193c4db3..71022383 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ classifiers = [ dependencies = [ "awkward >=2.4.0", "dask >=2023.04.0", - "typing_extensions>=4.8.0; python_version < \"3.11\"", + "typing_extensions >=4.8.0", ] dynamic = ["version"] @@ -144,6 +144,14 @@ warn_unreachable = true module = ["tlz.*"] ignore_missing_imports = true +[[tool.mypy.overrides]] + module = ["uproot.*"] + ignore_missing_imports = true + +[[tool.mypy.overrides]] + module = ["cloudpickle.*"] + ignore_missing_imports = true + [tool.pyright] include = ["src"] pythonVersion = "3.9" diff --git a/src/dask_awkward/layers/layers.py b/src/dask_awkward/layers/layers.py index 002d078b..5af5c971 100644 --- a/src/dask_awkward/layers/layers.py +++ b/src/dask_awkward/layers/layers.py @@ -214,7 +214,7 @@ def __init__( super().__init__(mapping, **kwargs) def mock(self) -> tuple[MaterializedLayer, Any | None]: - mapping = self.mapping.copy() + mapping = copy.copy(self.mapping) if not mapping: # no partitions at all return self, None @@ -256,9 +256,6 @@ def mock(self) -> tuple[MaterializedLayer, Any | None]: task = (self.fn, *name0s) return MaterializedLayer({(name, 0): task}), None - # failed to cull during column opt - return self, None - class AwkwardTreeReductionLayer(DataFrameTreeReduction): def mock(self) -> tuple[AwkwardTreeReductionLayer, Any | None]: diff --git a/src/dask_awkward/lib/core.py b/src/dask_awkward/lib/core.py index da984384..90c97f60 100644 --- a/src/dask_awkward/lib/core.py +++ b/src/dask_awkward/lib/core.py @@ -16,6 +16,7 @@ import awkward as ak import dask.config import numpy as np +from awkward._do import remove_structure as ak_do_remove_structure from awkward._nplikes.typetracer import ( MaybeNone, OneOf, @@ -833,26 +834,22 @@ def _getitem_trivial_map_partitions( label=label, ) - def _getitem_outer_bool_or_int_lazy_array( - self, where: Array | tuple[Any, ...] - ) -> Any: + def _getitem_outer_bool_or_int_lazy_array(self, where): ba = where if isinstance(where, Array) else where[0] if partition_compatibility(self, ba) == PartitionCompatibility.NO: raise IncompatiblePartitions("getitem", self, ba) - new_meta: Any | None = None - if self._meta is not None: - if isinstance(where, tuple): - raise DaskAwkwardNotImplemented( - "tuple style input boolean/int selection is not supported." - ) - elif isinstance(where, Array): - new_meta = self._meta[where._meta] - return self.map_partitions( - operator.getitem, - where, - meta=new_meta, - ) + if isinstance(where, tuple): + raise DaskAwkwardNotImplemented( + "tuple style input boolean/int selection is not supported." + ) + + new_meta = self._meta[where._meta] + return self.map_partitions( + operator.getitem, + where, + meta=new_meta, + ) def _getitem_outer_str_or_list( self, @@ -942,9 +939,9 @@ def _getitem_outer_int(self, where: int | tuple[Any, ...]) -> Any: else: return new_scalar_object(hlg, name, meta=new_meta) - def _getitem_slice_on_zero(self, where: tuple[slice, ...]): + def _getitem_slice_on_zero(self, where): # normalise - sl: slice = where[0] + sl = where[0] rest = tuple(where[1:]) step = sl.step or 1 start = sl.start or 0 @@ -1014,7 +1011,7 @@ def _getitem_slice_on_zero(self, where: tuple[slice, ...]): divisions=tuple(divisions), ) - def _getitem_tuple(self, where: tuple[Any, ...]) -> Array: + def _getitem_tuple(self, where): if isinstance(where[0], int): return self._getitem_outer_int(where) @@ -1052,7 +1049,7 @@ def _getitem_tuple(self, where: tuple[Any, ...]) -> Array: f"Array.__getitem__ doesn't support multi object: {where}" ) - def _getitem_single(self, where: Any) -> Array: + def _getitem_single(self, where): # a single string if isinstance(where, str): return self._getitem_outer_str_or_list(where, label=where) @@ -1089,17 +1086,7 @@ def _getitem_single(self, where: Any) -> Array: raise DaskAwkwardNotImplemented(f"__getitem__ doesn't support where={where}.") - @overload - def __getitem__(self, where: Array | str | Sequence[str] | slice) -> Array: - ... - - @overload - def __getitem__(self, where: int) -> Scalar: - ... - - def __getitem__( - self, where: Array | str | Sequence[str] | int | slice - ) -> Array | Scalar: + def __getitem__(self, where): """Select items from the collection. Heavily under construction. @@ -1369,9 +1356,7 @@ def head(self, nrow=10, compute=True): By default this is then processed eagerly and returned. """ - out: Array = self.partitions[0].map_partitions( - lambda x: x[:nrow], meta=self._meta - ) + out = self.partitions[0].map_partitions(lambda x: x[:nrow], meta=self._meta) if compute: return out.compute() if self.known_divisions: @@ -1727,16 +1712,13 @@ def map_partitions( ) -PartialReductionType = ak.Array - - def _chunk_reducer_non_positional( - chunk: ak.Array | PartialReductionType, + chunk: ak.Array, is_axis_none: bool, *, reducer: Callable, mask_identity: bool, -) -> PartialReductionType: +) -> ak.Array: return reducer( chunk, keepdims=True, @@ -1746,14 +1728,14 @@ def _chunk_reducer_non_positional( def _concat_reducer_non_positional( - partials: list[PartialReductionType], is_axis_none: bool + partials: list[ak.Array], is_axis_none: bool ) -> ak.Array: concat_axis = -1 if is_axis_none else 0 return ak.concatenate(partials, axis=concat_axis) def _finalise_reducer_non_positional( - partial: PartialReductionType, + partial: ak.Array, is_axis_none: bool, *, reducer: Callable, @@ -1771,7 +1753,7 @@ def _finalise_reducer_non_positional( def _prepare_axis_none_chunk(chunk: ak.Array) -> ak.Array: # TODO: this is private Awkward code. We should figure out how to export it # if needed - (layout,) = ak._do.remove_structure( + (layout,) = ak_do_remove_structure( ak.to_layout(chunk), flatten_records=False, drop_nones=False, @@ -1785,7 +1767,7 @@ def non_trivial_reduction( *, label: str, array: Array, - axis: Literal[0] | None, + axis: int | None, is_positional: bool, keepdims: bool, mask_identity: bool, @@ -1794,7 +1776,7 @@ def non_trivial_reduction( token: str | None = None, dtype: Any | None = None, split_every: int | bool | None = None, -): +) -> Array | Scalar: if is_positional: raise NotImplementedError("positional reducers at axis=0 or axis=None") @@ -1807,8 +1789,9 @@ def non_trivial_reduction( if combiner is None: combiner = reducer - if is_positional: - assert combiner is reducer + # is_positional == True is not implemented + # if is_positional: + # assert combiner is reducer # For `axis=None`, we prepare each array to have the following structure: # [[[ ... [x1 x2 x3 ... xN] ... ]]] (length-1 outer lists) diff --git a/src/dask_awkward/lib/inspect.py b/src/dask_awkward/lib/inspect.py index cf760c43..280d9162 100644 --- a/src/dask_awkward/lib/inspect.py +++ b/src/dask_awkward/lib/inspect.py @@ -87,7 +87,11 @@ def necessary_columns(*args: Any, traverse: bool = True) -> dict[str, list[str]] return out -def sample(arr, factor: int | None = None, probability: float | None = None) -> Array: +def sample( + arr: Array, + factor: int | None = None, + probability: float | None = None, +) -> Array: """Decimate the data to a smaller number of rows. Must give either `factor` or `probability`. @@ -111,5 +115,6 @@ def sample(arr, factor: int | None = None, probability: float | None = None) -> return arr.map_partitions(lambda x: x[::factor], meta=arr._meta) else: return arr.map_partitions( - lambda x: x[np.random.random(len(x)) < probability], meta=arr._meta + lambda x: x[np.random.random(len(x)) < probability], # type: ignore + meta=arr._meta, ) diff --git a/src/dask_awkward/lib/io/io.py b/src/dask_awkward/lib/io/io.py index 7307767c..baa7f655 100644 --- a/src/dask_awkward/lib/io/io.py +++ b/src/dask_awkward/lib/io/io.py @@ -93,10 +93,10 @@ def from_awkward( """ nrows = len(source) if nrows == 0: - locs = [None, None] + locs: tuple[None, ...] | tuple[int, ...] = (None, None) else: chunksize = int(math.ceil(nrows / npartitions)) - locs = list(range(0, nrows, chunksize)) + [nrows] + locs = tuple(list(range(0, nrows, chunksize)) + [nrows]) starts = locs[:-1] stops = locs[1:] meta = typetracer_array(source) @@ -106,7 +106,7 @@ def from_awkward( stops, label=label or "from-awkward", token=tokenize(source, npartitions), - divisions=tuple(locs), + divisions=locs, meta=meta, behavior=behavior, ) @@ -116,11 +116,11 @@ class _FromListsFn: def __init__(self, behavior: dict | None = None): self.behavior = behavior - def __call__(self, x, **kwargs): + def __call__(self, x: list) -> ak.Array: return ak.Array(x, behavior=self.behavior) -def from_lists(source: list[list[Any]], behavior: dict | None = None) -> Array: +def from_lists(source: list, behavior: dict | None = None) -> Array: """Create an Array collection from a list of lists. Parameters @@ -149,7 +149,7 @@ def from_lists(source: list[list[Any]], behavior: dict | None = None) -> Array: lists = list(source) divs = (0, *np.cumsum(list(map(len, lists)))) return from_map( - _FromListsFn(), + _FromListsFn(behavior=behavior), lists, meta=typetracer_array(ak.Array(lists[0])), divisions=divs, @@ -383,7 +383,7 @@ def from_dask_array(array: DaskArray, behavior: dict | None = None) -> Array: def to_dataframe( - array, + array: Array, optimize_graph: bool = True, **kwargs: Any, ) -> DaskDataFrame: @@ -463,7 +463,7 @@ def from_map( args: tuple[Any, ...] | None = None, label: str | None = None, token: str | None = None, - divisions: tuple[int, ...] | None = None, + divisions: tuple[int, ...] | tuple[None, ...] | None = None, meta: ak.Array | None = None, behavior: dict | None = None, **kwargs: Any, @@ -603,7 +603,7 @@ def _bytes_with_sample( compression: str | None, delimiter: bytes, not_zero: bool, - blocksize: str | int, + blocksize: str | int | None, sample: str | int | bool, ) -> tuple[list[list[_BytesReadingInstructions]], bytes]: """Generate instructions for reading bytes from paths in a filesystem. @@ -653,7 +653,7 @@ def _bytes_with_sample( if blocksize is None: offsets = [[0]] * len(paths) - lengths = [[None]] * len(paths) + lengths: list = [[None]] * len(paths) else: offsets = [] lengths = [] @@ -717,21 +717,16 @@ def _bytes_with_sample( sample_size = parse_bytes(sample) if isinstance(sample, str) else sample with fs.open(paths[0], compression=compression) as f: # read block without seek (because we start at zero) - if delimiter is None: - sample_bytes = f.read(sample_size) - else: - sample_buff = f.read(sample_size) - while True: - new = f.read(sample_size) - if not new: - break - if delimiter in new: - sample_buff = ( - sample_buff + new.split(delimiter, 1)[0] + delimiter - ) - break - sample_buff = sample_buff + new - sample_bytes = sample_buff + sample_buff = f.read(sample_size) + while True: + new = f.read(sample_size) + if not new: + break + if delimiter in new: + sample_buff = sample_buff + new.split(delimiter, 1)[0] + delimiter + break + sample_buff = sample_buff + new + sample_bytes = sample_buff rfind = sample_bytes.rfind(delimiter) if rfind > 0: diff --git a/src/dask_awkward/lib/io/json.py b/src/dask_awkward/lib/io/json.py index e94ad3b6..65f8ade4 100644 --- a/src/dask_awkward/lib/io/json.py +++ b/src/dask_awkward/lib/io/json.py @@ -418,13 +418,13 @@ def _from_json_bytes( ) bytes_ingredients, the_sample_bytes = _bytes_with_sample( - fs, - paths, - compression, - delimiter, - not_zero, - blocksize, - sample_bytes, + fs=fs, + paths=paths, + compression=compression, + delimiter=delimiter, + not_zero=not_zero, + blocksize=blocksize, + sample=sample_bytes, ) sample_array = ak.from_json(the_sample_bytes, line_delimited=True, **kwargs) @@ -478,7 +478,7 @@ def from_json( resize: float = 8, highlevel: bool = True, behavior: dict | None = None, - blocksize: str | None = None, + blocksize: int | str | None = None, delimiter: bytes | None = None, compression: str | None = "infer", storage_options: dict[str, Any] | None = None, @@ -526,10 +526,10 @@ def from_json( dask-awkward. behavior : dict, optional See :func:`ak.from_json` - blocksize : str, optional + blocksize : int, str, optional If ``None`` (default), the collection will be partitioned on a per-file bases. If defined, this sets the size (in bytes) of - each partition. + each partition. Can be a string of the form ``"10 MiB"``. delimiter : bytes, optional Delimiter to use for separating blocks; if ``blocksize`` is defined but this argument is not defined, the default is the @@ -701,50 +701,10 @@ def __call__(self, array: ak.Array, block_index: tuple[int]) -> None: return None -@overload def to_json( array: Array, path: str, - line_delimited: bool | str = True, - num_indent_spaces: int | None = None, - num_readability_spaces: int = 0, - nan_string: str | None = None, - posinf_string: str | None = None, - neginf_string: str | None = None, - complex_record_fields: tuple[str, str] | None = None, - convert_bytes: Callable | None = None, - convert_other: Callable | None = None, - storage_options: dict[str, Any] | None = None, - compression: str | None = None, - compute: Literal[False] = False, -) -> Scalar: - ... - - -@overload -def to_json( - array: Array, - path: str, - line_delimited: bool | str, - num_indent_spaces: int | None, - num_readability_spaces: int, - nan_string: str | None, - posinf_string: str | None, - neginf_string: str | None, - complex_record_fields: tuple[str, str] | None, - convert_bytes: Callable | None, - convert_other: Callable | None, - storage_options: dict[str, Any] | None, - compression: str | None, - compute: Literal[True], -) -> None: - ... - - -def to_json( - array: Array, - path: str, - line_delimited: bool | str = True, + line_delimited: bool = True, num_indent_spaces: int | None = None, num_readability_spaces: int = 0, nan_string: str | None = None, @@ -755,7 +715,7 @@ def to_json( convert_other: Callable | None = None, storage_options: dict[str, Any] | None = None, compression: str | None = None, - compute: bool = False, + compute: bool = True, ) -> Scalar | None: """Store Array collection in JSON text. @@ -767,7 +727,7 @@ def to_json( Root directory to save data; interpreted by filesystem-spec (can be a remote filesystem path, for example an s3 bucket: ``"s3://bucket/data"``). - line_delimited : bool | str + line_delimited : bool See docstring for :py:func:`ak.to_json`. num_indent_spaces : int, optional See docstring for :py:func:`ak.to_json`. diff --git a/src/dask_awkward/lib/io/parquet.py b/src/dask_awkward/lib/io/parquet.py index 567d639e..8b840c54 100644 --- a/src/dask_awkward/lib/io/parquet.py +++ b/src/dask_awkward/lib/io/parquet.py @@ -6,7 +6,7 @@ import math import operator from collections.abc import Sequence -from typing import Any, Literal, overload +from typing import Any, Literal import awkward as ak import awkward.operations.ak_from_parquet as ak_from_parquet @@ -463,78 +463,9 @@ def __call__(self, data, block_index): ) -@overload def to_parquet( array: Array, destination: str, - *, - list_to32: bool, - string_to32: bool, - bytestring_to32: bool, - emptyarray_to: Any | None, - categorical_as_dictionary: bool, - extensionarray: bool, - count_nulls: bool, - compression: str | dict | None, - compression_level: int | dict | None, - row_group_size: int | None, - data_page_size: int | None, - parquet_flavor: Literal["spark"] | None, - parquet_version: Literal["1.0"] | Literal["2.4"] | Literal["2.6"], - parquet_page_version: Literal["1.0"] | Literal["2.0"], - parquet_metadata_statistics: bool | dict, - parquet_dictionary_encoding: bool | dict, - parquet_byte_stream_split: bool | dict, - parquet_coerce_timestamps: Literal["ms"] | Literal["us"] | None, - parquet_old_int96_timestamps: bool | None, - parquet_compliant_nested: bool, - parquet_extra_options: dict | None, - storage_options: dict[str, Any] | None, - write_metadata: bool, - compute: Literal[True], - prefix: str | None, -) -> None: - ... - - -@overload -def to_parquet( - array: Array, - destination: str, - *, - list_to32: bool, - string_to32: bool, - bytestring_to32: bool, - emptyarray_to: Any | None, - categorical_as_dictionary: bool, - extensionarray: bool, - count_nulls: bool, - compression: str | dict | None, - compression_level: int | dict | None, - row_group_size: int | None, - data_page_size: int | None, - parquet_flavor: Literal["spark"] | None, - parquet_version: Literal["1.0"] | Literal["2.4"] | Literal["2.6"], - parquet_page_version: Literal["1.0"] | Literal["2.0"], - parquet_metadata_statistics: bool | dict, - parquet_dictionary_encoding: bool | dict, - parquet_byte_stream_split: bool | dict, - parquet_coerce_timestamps: Literal["ms"] | Literal["us"] | None, - parquet_old_int96_timestamps: bool | None, - parquet_compliant_nested: bool, - parquet_extra_options: dict | None, - storage_options: dict[str, Any] | None, - write_metadata: bool, - compute: Literal[False], - prefix: str | None, -) -> Scalar: - ... - - -def to_parquet( - array: Array, - destination: str, - *, list_to32: bool = False, string_to32: bool = True, bytestring_to32: bool = True, diff --git a/src/dask_awkward/lib/io/text.py b/src/dask_awkward/lib/io/text.py index eb82f5eb..a45996ba 100644 --- a/src/dask_awkward/lib/io/text.py +++ b/src/dask_awkward/lib/io/text.py @@ -100,12 +100,12 @@ def from_text( bytes_ingredients, _ = _bytes_with_sample( fs, - paths, - compression, - delimiter, - False, - blocksize, - False, + paths=paths, + compression=compression, + delimiter=delimiter, + not_zero=False, + blocksize=blocksize, + sample=False, ) # meta is _always_ an unknown length array of strings. diff --git a/src/dask_awkward/lib/operations.py b/src/dask_awkward/lib/operations.py index 9abbef3c..32e94479 100644 --- a/src/dask_awkward/lib/operations.py +++ b/src/dask_awkward/lib/operations.py @@ -50,14 +50,15 @@ def concatenate( i += 1 meta = ak.concatenate(metas) + assert isinstance(meta, ak.Array) prev_names = [iarr.name for iarr in arrays] - g = AwkwardMaterializedLayer( + aml = AwkwardMaterializedLayer( g, previous_layer_names=prev_names, fn=_concatenate_axis0_multiarg, ) - hlg = HighLevelGraph.from_collections(name, g, dependencies=arrays) + hlg = HighLevelGraph.from_collections(name, aml, dependencies=arrays) return new_array_object(hlg, name, meta=meta, npartitions=npartitions) if axis > 0: diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 50ce634e..58586438 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -66,7 +66,7 @@ def all_optimizations( def optimize( - dsk: Mapping, + dsk: HighLevelGraph, keys: Hashable | list[Hashable] | set[Hashable], **_: Any, ) -> Mapping: @@ -79,9 +79,9 @@ def optimize( if dask.config.get("awkward.optimization.enabled"): which = dask.config.get("awkward.optimization.which") if "columns" in which: - dsk = optimize_columns(dsk) # type: ignore + dsk = optimize_columns(dsk) if "layer-chains" in which: - dsk = rewrite_layer_chains(dsk) + dsk = rewrite_layer_chains(dsk, keys) return dsk @@ -224,12 +224,12 @@ def _touch_and_call(layer): return new_layer -def rewrite_layer_chains(dsk: HighLevelGraph) -> HighLevelGraph: +def rewrite_layer_chains(dsk: HighLevelGraph, keys: Any) -> HighLevelGraph: # dask.optimization.fuse_liner for blockwise layers import copy chains = [] - deps = dsk.dependencies.copy() + deps = copy.copy(dsk.dependencies) layers = {} # find chains; each chain list is at least two keys long @@ -285,32 +285,32 @@ def rewrite_layer_chains(dsk: HighLevelGraph) -> HighLevelGraph: outkey = chain[-1] layer0 = dsk.layers[chain[0]] outlayer = layers[outkey] - numblocks = [nb[0] for nb in layer0.numblocks.values() if nb[0] is not None][0] - deps[outkey] = deps[chain[0]] - [deps.pop(ch) for ch in chain[:-1]] + numblocks = [nb[0] for nb in layer0.numblocks.values() if nb[0] is not None][0] # type: ignore + deps[outkey] = deps[chain[0]] # type: ignore + [deps.pop(ch) for ch in chain[:-1]] # type: ignore - subgraph = layer0.dsk.copy() - indices = list(layer0.indices) + subgraph = layer0.dsk.copy() # type: ignore + indices = list(layer0.indices) # type: ignore parent = chain[0] - outlayer.io_deps = layer0.io_deps + outlayer.io_deps = layer0.io_deps # type: ignore for chain_member in chain[1:]: layer = dsk.layers[chain_member] - for k in layer.io_deps: - outlayer.io_deps[k] = layer.io_deps[k] - func, *args = layer.dsk[chain_member] + for k in layer.io_deps: # type: ignore + outlayer.io_deps[k] = layer.io_deps[k] # type: ignore + func, *args = layer.dsk[chain_member] # type: ignore args2 = _recursive_replace(args, layer, parent, indices) subgraph[chain_member] = (func,) + tuple(args2) parent = chain_member - outlayer.numblocks = {i[0]: (numblocks,) for i in indices if i[1] is not None} - outlayer.dsk = subgraph + outlayer.numblocks = {i[0]: (numblocks,) for i in indices if i[1] is not None} # type: ignore + outlayer.dsk = subgraph # type: ignore if hasattr(outlayer, "_dims"): del outlayer._dims - outlayer.indices = tuple( + outlayer.indices = tuple( # type: ignore (i[0], (".0",) if i[1] is not None else None) for i in indices ) - outlayer.output_indices = (".0",) - outlayer.inputs = getattr(layer0, "inputs", set()) + outlayer.output_indices = (".0",) # type: ignore + outlayer.inputs = getattr(layer0, "inputs", set()) # type: ignore if hasattr(outlayer, "_cached_dict"): del outlayer._cached_dict # reset, since original can be mutated return HighLevelGraph(layers, deps) @@ -356,8 +356,8 @@ def _get_column_reports(dsk: HighLevelGraph) -> dict[str, Any]: # make labelled report projectable = _projectable_input_layer_names(dsk) - for name, lay in dsk.layers.copy().items(): - if name in projectable: + for name, lay in dsk.layers.items(): + if name in projectable and hasattr(lay, "mock"): layers[name], report = lay.mock() reports[name] = report elif hasattr(lay, "mock"): diff --git a/src/dask_awkward/lib/str.py b/src/dask_awkward/lib/str.py index e71ce125..85324ae9 100644 --- a/src/dask_awkward/lib/str.py +++ b/src/dask_awkward/lib/str.py @@ -1,16 +1,11 @@ from __future__ import annotations import functools -import sys from collections.abc import Callable from typing import Any, TypeVar import awkward.operations.str as akstr - -if sys.version_info < (3, 11, 0): - from typing_extensions import ParamSpec -else: - from typing import ParamSpec +from typing_extensions import ParamSpec from dask_awkward.lib.core import Array, map_partitions diff --git a/src/dask_awkward/lib/structure.py b/src/dask_awkward/lib/structure.py index 5e601686..7dd8c1a9 100644 --- a/src/dask_awkward/lib/structure.py +++ b/src/dask_awkward/lib/structure.py @@ -820,7 +820,7 @@ def unzip( if len(fields) == 0: return (array,) else: - return tuple(array[field] for field in fields) # type: ignore + return tuple(array[field] for field in fields) @borrow_docstring(ak.values_astype) @@ -1141,7 +1141,7 @@ def _repartition_func(*stuff): return ak.concatenate(data) -def repartition_layer(arr: Array, key: str, divisions: tuple[int, ...]): +def repartition_layer(arr: Array, key: str, divisions: tuple[int, ...]) -> dict: layer = {} indivs = arr.defined_divisions diff --git a/src/dask_awkward/lib/unproject_layout.py b/src/dask_awkward/lib/unproject_layout.py index 1235a418..21c06aa5 100644 --- a/src/dask_awkward/lib/unproject_layout.py +++ b/src/dask_awkward/lib/unproject_layout.py @@ -1,6 +1,7 @@ from __future__ import annotations import math +from typing import Any import awkward as ak import numpy as np @@ -53,7 +54,7 @@ } -def dummy_index_of(typecode: str, length: int, nplike) -> ak.index.Index: +def dummy_index_of(typecode: str, length: int, nplike: Any) -> ak.index.Index: index_cls = index_of[typecode] dtype = dtype_of[typecode] return index_cls(PlaceholderArray(nplike, (length,), dtype), nplike=nplike) @@ -118,6 +119,8 @@ def compatible(form: Form, layout: Content) -> bool: else: return False + return False + def _unproject_layout(form, layout, length, backend): if layout is None: diff --git a/src/dask_awkward/pickle.py b/src/dask_awkward/pickle.py index 06fee32a..a53236c6 100644 --- a/src/dask_awkward/pickle.py +++ b/src/dask_awkward/pickle.py @@ -1,5 +1,7 @@ from __future__ import annotations +from typing import Any + __all__ = ("plugin",) from pickle import PickleBuffer @@ -8,7 +10,7 @@ from awkward.typetracer import PlaceholderArray -def maybe_make_pickle_buffer(buffer) -> PlaceholderArray | PickleBuffer: +def maybe_make_pickle_buffer(buffer: Any) -> PlaceholderArray | PickleBuffer: if isinstance(buffer, PlaceholderArray): return buffer else: @@ -65,7 +67,7 @@ def pickle_array(array: ak.Array, protocol: int) -> tuple: ) -def plugin(obj, protocol: int) -> tuple | NotImplemented: +def plugin(obj: Any, protocol: int) -> tuple: if isinstance(obj, ak.Record): return pickle_record(obj, protocol) elif isinstance(obj, ak.Array): diff --git a/src/dask_awkward/utils.py b/src/dask_awkward/utils.py index a90b12cb..a1b02bf6 100644 --- a/src/dask_awkward/utils.py +++ b/src/dask_awkward/utils.py @@ -3,11 +3,14 @@ from collections.abc import Callable, Mapping from typing import TYPE_CHECKING, Any, TypeVar +from typing_extensions import ParamSpec + if TYPE_CHECKING: from dask_awkward.lib.core import Array T = TypeVar("T") +P = ParamSpec("P") class DaskAwkwardNotImplemented(NotImplementedError): @@ -68,8 +71,8 @@ def keys(self): return ((i,) for i in range(len(self.inputs))) -def borrow_docstring(original: Callable[..., T]) -> Callable[..., T]: - def wrapper(method): +def borrow_docstring(original: Callable) -> Callable: + def wrapper(method: Callable[P, T]) -> Callable[P, T]: method.__doc__ = ( f"Partitioned version of ak.{original.__name__}\n" f"{original.__doc__}" ) diff --git a/tests/conftest.py b/tests/conftest.py index d5e8a3c8..09d629cd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -68,9 +68,9 @@ def daa_old(ndjson_points1: str) -> dak.Array: @pytest.fixture(scope="session") -def pq_points_dir(daa_old, tmp_path_factory) -> str: +def pq_points_dir(daa_old: dak.Array, tmp_path_factory: pytest.TempPathFactory) -> str: pqdir = tmp_path_factory.mktemp("pqfiles") - dak.to_parquet(daa_old, str(pqdir), compute=True) + dak.to_parquet(daa_old, str(pqdir)) return str(pqdir) @@ -168,7 +168,7 @@ def L4() -> list[list[dict[str, float]] | None]: @pytest.fixture(scope="session") def caa_parquet(caa: ak.Array, tmp_path_factory: pytest.TempPathFactory) -> str: - fname = tmp_path_factory.mktemp("parquet_data") / "caa.parquet" # type: ignore + fname = tmp_path_factory.mktemp("parquet_data") / "caa.parquet" ak.to_parquet(caa, str(fname), extensionarray=False) return str(fname) diff --git a/tests/test_core.py b/tests/test_core.py index 7d1d5ed7..a0a0789e 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -145,7 +145,7 @@ def test_partitions_divisions(ndjson_points_file: str) -> None: assert not t1.known_divisions t2 = daa.partitions[1] assert t2.known_divisions - assert t2.divisions == (0, divs[2] - divs[1]) + assert t2.divisions == (0, divs[2] - divs[1]) # type: ignore def test_array_rebuild(ndjson_points_file: str) -> None: @@ -178,7 +178,7 @@ def test_typestr(daa: Array) -> None: assert len(daa._typestr(max=20)) == 20 + extras -def test_head(daa: Array): +def test_head(daa: Array) -> None: out = daa.head(1) assert out.tolist() == daa.compute()[:1].tolist() @@ -233,7 +233,7 @@ def test_scalar_getitem_getattr() -> None: slice(None, None, 3), ], ) -def test_getitem_zero_slice_single(daa: Array, where): +def test_getitem_zero_slice_single(daa: Array, where: slice) -> None: out = daa[where] assert out.compute().tolist() == daa.compute()[where].tolist() assert len(out) == len(daa.compute()[where]) @@ -257,7 +257,11 @@ def test_getitem_zero_slice_single(daa: Array, where): ], ) @pytest.mark.parametrize("rest", [slice(None, None, None), slice(0, 1)]) -def test_getitem_zero_slice_tuple(daa: Array, where, rest): +def test_getitem_zero_slice_tuple( + daa: Array, + where: slice, + rest: slice, +) -> None: out = daa[where, rest] assert out.compute().tolist() == daa.compute()[where, rest].tolist() assert len(out) == len(daa.compute()[where, rest]) @@ -476,7 +480,7 @@ def test_compatible_partitions_after_slice() -> None: assert_eq(lazy, ccrt) # sanity - assert dak.compatible_partitions(lazy, lazy + 2) + assert dak.compatible_partitions(lazy, lazy + 2) # type: ignore assert dak.compatible_partitions(lazy, dak.num(lazy, axis=1) > 2) assert not dak.compatible_partitions(lazy[:-2], lazy) @@ -666,7 +670,7 @@ def test_optimize_chain_single(daa): arr = ((daa.points.x + 1) + 6).map_partitions(lambda x: x + 1) # first a simple test by calling the one optimisation directly - dsk2 = rewrite_layer_chains(arr.dask) + dsk2 = rewrite_layer_chains(arr.dask, arr.keys) (out,) = dask.compute(arr, optimize_graph=False) arr._dask = dsk2 (out2,) = dask.compute(arr, optimize_graph=False) diff --git a/tests/test_io.py b/tests/test_io.py index 173a3f6e..63a50f76 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -164,10 +164,10 @@ def f(a, b): dak.from_map(f, [1, 2], [3, 4, 5]) with pytest.raises(ValueError, match="must be `callable`"): - dak.from_map(5, [1], [2]) + dak.from_map(5, [1], [2]) # type: ignore with pytest.raises(ValueError, match="must be Iterable"): - dak.from_map(f, 1, [1, 2]) + dak.from_map(f, 1, [1, 2]) # type: ignore with pytest.raises(ValueError, match="non-zero length"): dak.from_map(f, [], [], []) @@ -252,7 +252,7 @@ def test_to_dataframe(daa: dak.Array, caa: ak.Array, optimize_graph: bool) -> No from dask.dataframe.utils import assert_eq daa = daa["points", ["x", "y"]] - caa = caa["points", ["x", "y"]] + caa = caa["points", ["x", "y"]] # pyright: ignore dd = dak.to_dataframe(daa, optimize_graph=optimize_graph) df = ak.to_dataframe(caa) @@ -277,7 +277,7 @@ def test_to_dataframe_str( assert_eq(dd, df, check_index=False) -def test_from_awkward_empty_array(daa) -> None: +def test_from_awkward_empty_array(daa: dak.Array) -> None: # no form c1 = ak.Array([]) assert len(c1) == 0 diff --git a/tests/test_io_json.py b/tests/test_io_json.py index 244b44aa..578358ef 100644 --- a/tests/test_io_json.py +++ b/tests/test_io_json.py @@ -190,7 +190,7 @@ def test_to_and_from_json( p1 = os.path.join(tdir, "z", "z") - dak.to_json(array=daa, path=p1, compute=True) + dak.to_json(daa, p1) paths = list((Path(tdir) / "z" / "z").glob("part*.json")) assert len(paths) == daa.npartitions arrays = ak.concatenate([ak.from_json(p, line_delimited=True) for p in paths]) @@ -205,6 +205,7 @@ def test_to_and_from_json( compression=compression, compute=False, ) + assert isinstance(s, dak.Scalar) s.compute() suffix = "gz" if compression == "gzip" else compression r = dak.from_json(os.path.join(tdir, f"*.json.{suffix}")) diff --git a/tests/test_optimize.py b/tests/test_optimize.py index 66108048..afb2da88 100644 --- a/tests/test_optimize.py +++ b/tests/test_optimize.py @@ -6,7 +6,7 @@ import dask_awkward as dak -def test_multiple_computes(pq_points_dir) -> None: +def test_multiple_computes(pq_points_dir: str) -> None: ds1 = dak.from_parquet(pq_points_dir) # add a columns= argument to force a new tokenize result in # from_parquet so we get two unique collections. @@ -26,4 +26,4 @@ def test_multiple_computes(pq_points_dir) -> None: assert len(things3[1]) < len(things3[0]) things = dask.compute(ds1.points, ds2.points.x, ds2.points.y, ds1.points.y, ds3) - assert things[-1].tolist() == ak.Array(lists[0] + lists[1]).tolist() + assert things[-1].tolist() == ak.Array(lists[0] + lists[1]).tolist() # type: ignore diff --git a/tests/test_parquet.py b/tests/test_parquet.py index b1da940e..544ab0f1 100644 --- a/tests/test_parquet.py +++ b/tests/test_parquet.py @@ -191,7 +191,7 @@ def test_to_parquet_with_prefix( tmp_path: pathlib.Path, prefix: str | None, ) -> None: - dak.to_parquet(daa, str(tmp_path), prefix=prefix, compute=True) + dak.to_parquet(daa, str(tmp_path), prefix=prefix) files = list(tmp_path.glob("*")) for ifile in files: fname = ifile.parts[-1] From a5cf0ab8526e2eabc5b145f75bfe7f9af24a0dd8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 26 Sep 2023 08:41:11 -0500 Subject: [PATCH 4/9] [pre-commit.ci] pre-commit autoupdate (#376) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/astral-sh/ruff-pre-commit: v0.0.290 → v0.0.291](https://github.com/astral-sh/ruff-pre-commit/compare/v0.0.290...v0.0.291) - [github.com/asottile/pyupgrade: v3.11.0 → v3.13.0](https://github.com/asottile/pyupgrade/compare/v3.11.0...v3.13.0) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .pre-commit-config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ffe83995..0129ad22 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,7 +21,7 @@ repos: - --target-version=py38 - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.0.290 + rev: v0.0.291 hooks: - id: ruff @@ -32,7 +32,7 @@ repos: language_version: python3 - repo: https://github.com/asottile/pyupgrade - rev: v3.11.0 + rev: v3.13.0 hooks: - id: pyupgrade args: From b2e9e415109a372c1a72e82d14a6f6cbf69109b9 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 26 Sep 2023 09:49:19 -0500 Subject: [PATCH 5/9] fix: simultaneous computes when one is encapsulated by the other doesn't over optimize (#375) --- src/dask_awkward/lib/optimize.py | 211 ++++++++++++++++++------------- tests/test_optimize.py | 49 +++++++ 2 files changed, 169 insertions(+), 91 deletions(-) diff --git a/src/dask_awkward/lib/optimize.py b/src/dask_awkward/lib/optimize.py index 58586438..f52067f6 100644 --- a/src/dask_awkward/lib/optimize.py +++ b/src/dask_awkward/lib/optimize.py @@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any import dask.config +from awkward.typetracer import touch_data from dask.blockwise import fuse_roots, optimize_blockwise from dask.core import flatten from dask.highlevelgraph import HighLevelGraph @@ -107,12 +108,12 @@ def optimize_columns(dsk: HighLevelGraph) -> HighLevelGraph: Parameters ---------- dsk : HighLevelGraph - Original high level dask graph + Task graph to optimize. Returns ------- HighLevelGraph - New dask graph with a modified ``AwkwardInputLayer``. + New, optimized task graph with column-projected ``AwkwardInputLayer``. """ layers = dsk.layers.copy() # type: ignore @@ -128,109 +129,41 @@ def optimize_columns(dsk: HighLevelGraph) -> HighLevelGraph: return HighLevelGraph(layers, deps) -def _projectable_input_layer_names(dsk: HighLevelGraph) -> list[str]: - """Get list of column-projectable AwkwardInputLayer names. - - Parameters - ---------- - dsk : HighLevelGraph - Task graph of interest - - Returns - ------- - list[str] - Names of the AwkwardInputLayers in the graph that are - column-projectable. - - """ - return [ - n - for n, v in dsk.layers.items() - if isinstance(v, AwkwardInputLayer) and hasattr(v.io_func, "project_columns") - # following condition means dep/pickled layers cannot be optimised - and hasattr(v, "_meta") - ] - - -def _layers_with_annotation(dsk: HighLevelGraph, key: str) -> list[str]: - return [n for n, v in dsk.layers.items() if (v.annotations or {}).get(key)] - - -def _ak_output_layer_names(dsk: HighLevelGraph) -> list[str]: - """Get a list output layer names. - - Output layer names are annotated with 'ak_output'. +def rewrite_layer_chains(dsk: HighLevelGraph, keys: Any) -> HighLevelGraph: + """Smush chains of blockwise layers into a single layer. + + The logic here identifies chains by popping layers (in arbitrary + order) from a set of all layers in the task graph and walking + through the dependencies (parent layers) and dependents (child + layers). If a multi layer chain is discovered we compress it into + a single layer with the second loop below (for chain in chains; + that step rewrites the graph). In the chain building logic, if a + layer exists in the `keys` argument (the keys necessary for the + compute that we are optimizing for), we shortcircuit the logic to + ensure we do not chain layers that contain a necessary key inside + (these layers are called `required_layers` below). Parameters ---------- dsk : HighLevelGraph - Graph of interest. + Task graph to optimize. + keys : Any + Keys that are requested by the compute that is being + optimized. Returns ------- - list[str] - Names of the output layers. + HighLevelGraph + New, optimized task graph. """ - return _layers_with_annotation(dsk, "ak_output") - - -def _opt_touch_all_layer_names(dsk: HighLevelGraph) -> list[str]: - return [n for n, v in dsk.layers.items() if hasattr(v, "_opt_touch_all")] - # return _layers_with_annotation(dsk, "ak_touch_all") - - -def _has_projectable_awkward_io_layer(dsk: HighLevelGraph) -> bool: - """Check if a graph at least one AwkwardInputLayer that is project-able.""" - for _, v in dsk.layers.items(): - if isinstance(v, AwkwardInputLayer) and hasattr(v.io_func, "project_columns"): - return True - return False - - -def _touch_all_data(*args, **kwargs): - """Mock writing an ak.Array to disk by touching data buffers.""" - import awkward as ak - - for arg in args + tuple(kwargs.values()): - ak.typetracer.touch_data(arg) - - -def _mock_output(layer): - """Update a layer to run the _touch_all_data.""" - assert len(layer.dsk) == 1 - - new_layer = copy.deepcopy(layer) - mp = new_layer.dsk.copy() - for k in iter(mp.keys()): - mp[k] = (_touch_all_data,) + mp[k][1:] - new_layer.dsk = mp - return new_layer - - -def _touch_and_call_fn(fn, *args, **kwargs): - _touch_all_data(*args, **kwargs) - return fn(*args, **kwargs) - - -def _touch_and_call(layer): - assert len(layer.dsk) == 1 - - new_layer = copy.deepcopy(layer) - mp = new_layer.dsk.copy() - for k in iter(mp.keys()): - mp[k] = (_touch_and_call_fn,) + mp[k] - new_layer.dsk = mp - return new_layer - - -def rewrite_layer_chains(dsk: HighLevelGraph, keys: Any) -> HighLevelGraph: # dask.optimization.fuse_liner for blockwise layers import copy chains = [] deps = copy.copy(dsk.dependencies) + required_layers = {k[0] for k in keys} layers = {} # find chains; each chain list is at least two keys long dependents = dsk.dependents @@ -250,6 +183,7 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Any) -> HighLevelGraph: and dsk.dependencies[list(children)[0]] == {lay} and isinstance(dsk.layers[list(children)[0]], AwkwardBlockwiseLayer) and len(dsk.layers[lay]) == len(dsk.layers[list(children)[0]]) + and lay not in required_layers ): # walk forwards lay = list(children)[0] @@ -263,6 +197,7 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Any) -> HighLevelGraph: and dependents[list(parents)[0]] == {lay} and isinstance(dsk.layers[list(parents)[0]], AwkwardBlockwiseLayer) and len(dsk.layers[lay]) == len(dsk.layers[list(parents)[0]]) + and list(parents)[0] not in required_layers ): # walk backwards lay = list(parents)[0] @@ -316,6 +251,100 @@ def rewrite_layer_chains(dsk: HighLevelGraph, keys: Any) -> HighLevelGraph: return HighLevelGraph(layers, deps) +def _projectable_input_layer_names(dsk: HighLevelGraph) -> list[str]: + """Get list of column-projectable AwkwardInputLayer names. + + Parameters + ---------- + dsk : HighLevelGraph + Task graph of interest + + Returns + ------- + list[str] + Names of the AwkwardInputLayers in the graph that are + column-projectable. + + """ + return [ + n + for n, v in dsk.layers.items() + if isinstance(v, AwkwardInputLayer) and hasattr(v.io_func, "project_columns") + # following condition means dep/pickled layers cannot be optimised + and hasattr(v, "_meta") + ] + + +def _layers_with_annotation(dsk: HighLevelGraph, key: str) -> list[str]: + return [n for n, v in dsk.layers.items() if (v.annotations or {}).get(key)] + + +def _ak_output_layer_names(dsk: HighLevelGraph) -> list[str]: + """Get a list output layer names. + + Output layer names are annotated with 'ak_output'. + + Parameters + ---------- + dsk : HighLevelGraph + Graph of interest. + + Returns + ------- + list[str] + Names of the output layers. + + """ + return _layers_with_annotation(dsk, "ak_output") + + +def _opt_touch_all_layer_names(dsk: HighLevelGraph) -> list[str]: + return [n for n, v in dsk.layers.items() if hasattr(v, "_opt_touch_all")] + # return _layers_with_annotation(dsk, "ak_touch_all") + + +def _has_projectable_awkward_io_layer(dsk: HighLevelGraph) -> bool: + """Check if a graph at least one AwkwardInputLayer that is project-able.""" + for _, v in dsk.layers.items(): + if isinstance(v, AwkwardInputLayer) and hasattr(v.io_func, "project_columns"): + return True + return False + + +def _touch_all_data(*args, **kwargs): + """Mock writing an ak.Array to disk by touching data buffers.""" + for arg in args + tuple(kwargs.values()): + touch_data(arg) + + +def _mock_output(layer): + """Update a layer to run the _touch_all_data.""" + assert len(layer.dsk) == 1 + + new_layer = copy.deepcopy(layer) + mp = new_layer.dsk.copy() + for k in iter(mp.keys()): + mp[k] = (_touch_all_data,) + mp[k][1:] + new_layer.dsk = mp + return new_layer + + +def _touch_and_call_fn(fn, *args, **kwargs): + _touch_all_data(*args, **kwargs) + return fn(*args, **kwargs) + + +def _touch_and_call(layer): + assert len(layer.dsk) == 1 + + new_layer = copy.deepcopy(layer) + mp = new_layer.dsk.copy() + for k in iter(mp.keys()): + mp[k] = (_touch_and_call_fn,) + mp[k] + new_layer.dsk = mp + return new_layer + + def _recursive_replace(args, layer, parent, indices): args2 = [] for arg in args: @@ -393,7 +422,7 @@ def _get_column_reports(dsk: HighLevelGraph) -> dict[str, Any]: results = get_sync(hlg, leaf_layers_keys) for out in results: if isinstance(out, (ak.Array, ak.Record)): - ak.typetracer.touch_data(out) + touch_data(out) except Exception as err: on_fail = dask.config.get("awkward.optimization.on-fail") # this is the default, throw a warning but skip the optimization. diff --git a/tests/test_optimize.py b/tests/test_optimize.py index afb2da88..3aa57531 100644 --- a/tests/test_optimize.py +++ b/tests/test_optimize.py @@ -4,6 +4,7 @@ import dask import dask_awkward as dak +from dask_awkward.lib.testutils import assert_eq def test_multiple_computes(pq_points_dir: str) -> None: @@ -27,3 +28,51 @@ def test_multiple_computes(pq_points_dir: str) -> None: things = dask.compute(ds1.points, ds2.points.x, ds2.points.y, ds1.points.y, ds3) assert things[-1].tolist() == ak.Array(lists[0] + lists[1]).tolist() # type: ignore + + +def identity(x): + return x + + +def test_multiple_compute_incapsulated(): + array = ak.Array([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])[[0, 2]] + darray = dak.from_awkward(array, 1) + darray_result = darray.map_partitions(identity) + + first, second = dask.compute(darray, darray_result) + + assert ak.almost_equal(first, second) + assert first.layout.form == second.layout.form + + +def test_multiple_computes_multiple_incapsulated(daa, caa): + dstep1 = daa.points.x + dstep2 = dstep1**2 + dstep3 = dstep2 + 2 + dstep4 = dstep3 - 1 + dstep5 = dstep4 - dstep2 + + cstep1 = caa.points.x + cstep2 = cstep1**2 + cstep3 = cstep2 + 2 + cstep4 = cstep3 - 1 + cstep5 = cstep4 - cstep2 + + # multiple computes all work and evaluate to the expected result + c5, c4, c2 = dask.compute(dstep5, dstep4, dstep2) + assert_eq(c5, cstep5) + assert_eq(c2, cstep2) + assert_eq(c4, cstep4) + + # if optimized together we still have 2 layers + opt4, opt3 = dask.optimize(dstep4, dstep3) + assert len(opt4.dask.layers) == 2 + assert len(opt3.dask.layers) == 2 + assert_eq(opt4, cstep4) + assert_eq(opt3, cstep3) + + # if optimized alone we get optimized to 1 entire chain smushed + # down to 1 layer + (opt4_alone,) = dask.optimize(dstep4) + assert len(opt4_alone.dask.layers) == 1 + assert_eq(opt4_alone, opt4) From 899b9463a9bdcaa7ec2434b5bc8514d12273d776 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 26 Sep 2023 09:49:39 -0500 Subject: [PATCH 6/9] use ak.almost_equal (#377) --- src/dask_awkward/lib/testutils.py | 14 ++++++-------- tests/test_parquet.py | 2 +- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/dask_awkward/lib/testutils.py b/src/dask_awkward/lib/testutils.py index e456338c..6d08e928 100644 --- a/src/dask_awkward/lib/testutils.py +++ b/src/dask_awkward/lib/testutils.py @@ -53,6 +53,7 @@ def assert_eq_arrays( check_forms: bool = False, check_divisions: bool = True, scheduler: Any | None = None, + convert_to_lists: bool = False, ) -> None: scheduler = scheduler or DEFAULT_SCHEDULER a_is_coll = is_dask_collection(a) @@ -86,15 +87,12 @@ def assert_eq_arrays( # finally check the values if isclose_equal_nan: - assert ak.all( - ak.isclose( - ak.from_iter(a_comp.tolist()), - ak.from_iter(b_comp.tolist()), - equal_nan=True, - ) - ) + assert ak.all(ak.isclose(a_comp, b_comp, equal_nan=True)) else: - assert a_comp.tolist() == b_comp.tolist() + if convert_to_lists: + assert a_comp.tolist() == b_comp.tolist() + else: + assert ak.almost_equal(a_comp, b_comp, dtype_exact=True) def assert_eq_records( diff --git a/tests/test_parquet.py b/tests/test_parquet.py index 544ab0f1..5eb1ee9b 100644 --- a/tests/test_parquet.py +++ b/tests/test_parquet.py @@ -182,7 +182,7 @@ def test_unnamed_root( unnamed_root_parquet_file, columns=columns, ) - assert_eq(daa, caa, check_forms=False) + assert_eq(daa, caa, convert_to_lists=True) @pytest.mark.parametrize("prefix", [None, "abc"]) From ce1955ca57b7641bf18778e9159014decae2a2a5 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 29 Sep 2023 13:32:28 -0500 Subject: [PATCH 7/9] simplify pyproject.toml --- pyproject.toml | 49 +++++++++++++------------------------------------ 1 file changed, 13 insertions(+), 36 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 71022383..c51d5b84 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ classifiers = [ "Topic :: Software Development", ] dependencies = [ - "awkward >=2.4.0", + "awkward >=2.4.3", "dask >=2023.04.0", "typing_extensions >=4.8.0", ] @@ -95,17 +95,10 @@ write_to = "src/dask_awkward/_version.py" [tool.pytest.ini_options] minversion = "6.0" testpaths = ["tests"] -addopts = [ - "-v", - "-ra", - "--showlocals", - "--strict-markers", - "--strict-config", -] +addopts = ["-v", "-ra", "--showlocals", "--strict-markers", "--strict-config"] log_cli_level = "DEBUG" -filterwarnings = [ - "ignore:There is no current event loop", -] +filterwarnings = ["ignore:There is no current event loop"] +xfail_strict = true [tool.isort] profile = "black" @@ -125,31 +118,15 @@ warn_unused_ignores = true warn_unreachable = true [[tool.mypy.overrides]] - module = ["awkward.*"] - ignore_missing_imports = true - -[[tool.mypy.overrides]] - module = ["IPython.*"] - ignore_missing_imports = true - -[[tool.mypy.overrides]] - module = ["fsspec.*"] - ignore_missing_imports = true - -[[tool.mypy.overrides]] - module = ["pyarrow.*"] - ignore_missing_imports = true - -[[tool.mypy.overrides]] - module = ["tlz.*"] - ignore_missing_imports = true - -[[tool.mypy.overrides]] - module = ["uproot.*"] - ignore_missing_imports = true - -[[tool.mypy.overrides]] - module = ["cloudpickle.*"] + module = [ + "awkward.*", + "IPython.*", + "fsspec.*", + "pyarrow.*", + "tlz.*", + "uproot.*", + "cloudpickle.*" + ] ignore_missing_imports = true [tool.pyright] From cef8d1ec8738ca728263ab73d55b0fd7e5608939 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Fri, 29 Sep 2023 13:34:53 -0500 Subject: [PATCH 8/9] require latest awkward release --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c51d5b84..a5117393 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ classifiers = [ "Topic :: Software Development", ] dependencies = [ - "awkward >=2.4.3", + "awkward >=2.4.4", "dask >=2023.04.0", "typing_extensions >=4.8.0", ] From 5981337650231e5bde00bc0cf9c8b9358e86404c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 3 Oct 2023 09:44:59 -0500 Subject: [PATCH 9/9] [pre-commit.ci] pre-commit autoupdate (#378) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit updates: - [github.com/astral-sh/ruff-pre-commit: v0.0.291 → v0.0.292](https://github.com/astral-sh/ruff-pre-commit/compare/v0.0.291...v0.0.292) - [github.com/asottile/pyupgrade: v3.13.0 → v3.14.0](https://github.com/asottile/pyupgrade/compare/v3.13.0...v3.14.0) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .pre-commit-config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0129ad22..d9c3ed00 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,7 +21,7 @@ repos: - --target-version=py38 - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.0.291 + rev: v0.0.292 hooks: - id: ruff @@ -32,7 +32,7 @@ repos: language_version: python3 - repo: https://github.com/asottile/pyupgrade - rev: v3.13.0 + rev: v3.14.0 hooks: - id: pyupgrade args: