From 39870ee5b1039036302458ddf47e044c9c34bdbd Mon Sep 17 00:00:00 2001 From: Pascal Bourgault Date: Tue, 16 Aug 2022 16:22:18 -0400 Subject: [PATCH 01/17] Fix func count for dtype O with numpy and numba (#138) --- flox/xarray.py | 8 ++++++-- tests/test_xarray.py | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/flox/xarray.py b/flox/xarray.py index 9302dc318..7234f8826 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -296,7 +296,10 @@ def wrapper(array, *by, func, skipna, **kwargs): if "nan" not in func and func not in ["all", "any", "count"]: func = f"nan{func}" - requires_numeric = func not in ["count", "any", "all"] + # Flox's count works with non-numeric and its faster than converting. + requires_numeric = func not in ["count", "any", "all"] or ( + func == "count" and engine != "flox" + ) if requires_numeric: is_npdatetime = array.dtype.kind in "Mm" is_cftime = _contains_cftime_datetimes(array) @@ -311,7 +314,8 @@ def wrapper(array, *by, func, skipna, **kwargs): result, *groups = groupby_reduce(array, *by, func=func, **kwargs) - if requires_numeric: + # Output of count has an int dtype. + if requires_numeric and func != "count": if is_npdatetime: return result.astype(dtype) + offset elif is_cftime: diff --git a/tests/test_xarray.py b/tests/test_xarray.py index 0a696b24a..90a2d50c4 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -420,7 +420,7 @@ def test_cache(): @pytest.mark.parametrize("use_cftime", [True, False]) @pytest.mark.parametrize("func", ["count", "mean"]) -def test_datetime_array_reduce(use_cftime, func): +def test_datetime_array_reduce(use_cftime, func, engine): time = xr.DataArray( xr.date_range("2009-01-01", "2012-12-31", use_cftime=use_cftime), @@ -428,7 +428,7 @@ def test_datetime_array_reduce(use_cftime, func): name="time", ) expected = getattr(time.resample(time="YS"), func)() - actual = resample_reduce(time.resample(time="YS"), func=func, engine="flox") + actual = resample_reduce(time.resample(time="YS"), func=func, engine=engine) assert_equal(expected, actual) From fbc2af8d6fd9f7b76d77b3199bd624fba566ffb6 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Wed, 17 Aug 2022 22:16:06 +0200 Subject: [PATCH 02/17] Add ASV benchmark CI workflow (#139) * Add ASV workflow * Apply suggestions from code review * Forgot to delete folders * Update asv_bench/benchmarks/combine.py * Restart with current benchmarks * Always run benchmark for now. * Don't test argmax until it's supported in "flox" engine * Add a skip_slow function. * try out micromamba * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add os * try adding asv to environment file Co-authored-by: Deepak Cherian Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .github/workflows/benchmarks.yml | 78 +++++++++++ asv.conf.json => asv_bench/asv.conf.json | 10 +- asv_bench/benchmarks/README_CI.md | 122 ++++++++++++++++++ asv_bench/benchmarks/__init__.py | 28 ++++ .../benchmarks}/combine.py | 0 .../benchmarks}/reduce.py | 2 +- benchmarks/__init__.py | 7 - ci/environment.yml | 1 + 8 files changed, 235 insertions(+), 13 deletions(-) create mode 100644 .github/workflows/benchmarks.yml rename asv.conf.json => asv_bench/asv.conf.json (97%) create mode 100644 asv_bench/benchmarks/README_CI.md create mode 100644 asv_bench/benchmarks/__init__.py rename {benchmarks => asv_bench/benchmarks}/combine.py (100%) rename {benchmarks => asv_bench/benchmarks}/reduce.py (96%) delete mode 100644 benchmarks/__init__.py diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml new file mode 100644 index 000000000..badf4be8c --- /dev/null +++ b/.github/workflows/benchmarks.yml @@ -0,0 +1,78 @@ +name: Benchmark + +on: + pull_request: + types: [opened, reopened, synchronize, labeled] + workflow_dispatch: + +jobs: + benchmark: + # if: ${{ contains( github.event.pull_request.labels.*.name, 'run-benchmark') && github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch' }} # Run if the PR has been labelled correctly. + if: ${{ github.event_name == 'pull_request' || github.event_name == 'workflow_dispatch' }} # Always run. + name: Linux + runs-on: ubuntu-20.04 + env: + ASV_DIR: "./asv_bench" + + steps: + # We need the full repo to avoid this issue + # https://github.com/actions/checkout/issues/23 + - uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Set up conda environment + uses: mamba-org/provision-with-micromamba@v12 + with: + environment-file: ci/environment.yml + environment-name: flox-tests + cache-env: true + # extra-specs: | + # python="${{ matrix.python-version }}" + + # - name: Setup some dependencies + # shell: bash -l {0} + # run: | + # pip install asv + # sudo apt-get update -y + + - name: Run benchmarks + shell: bash -l {0} + id: benchmark + env: + OPENBLAS_NUM_THREADS: 1 + MKL_NUM_THREADS: 1 + OMP_NUM_THREADS: 1 + ASV_FACTOR: 1.5 + ASV_SKIP_SLOW: 1 + run: | + set -x + # ID this runner + asv machine --yes + echo "Baseline: ${{ github.event.pull_request.base.sha }} (${{ github.event.pull_request.base.label }})" + echo "Contender: ${GITHUB_SHA} (${{ github.event.pull_request.head.label }})" + # Use mamba for env creation + # export CONDA_EXE=$(which mamba) + export CONDA_EXE=$(which conda) + # Run benchmarks for current commit against base + ASV_OPTIONS="--split --show-stderr --factor $ASV_FACTOR" + asv continuous $ASV_OPTIONS ${{ github.event.pull_request.base.sha }} ${GITHUB_SHA} \ + | sed "/Traceback \|failed$\|PERFORMANCE DECREASED/ s/^/::error::/" \ + | tee benchmarks.log + # Report and export results for subsequent steps + if grep "Traceback \|failed\|PERFORMANCE DECREASED" benchmarks.log > /dev/null ; then + exit 1 + fi + working-directory: ${{ env.ASV_DIR }} + + - name: Add instructions to artifact + if: always() + run: | + cp benchmarks/README_CI.md benchmarks.log .asv/results/ + working-directory: ${{ env.ASV_DIR }} + + - uses: actions/upload-artifact@v3 + if: always() + with: + name: asv-benchmark-results-${{ runner.os }} + path: ${{ env.ASV_DIR }}/.asv/results diff --git a/asv.conf.json b/asv_bench/asv.conf.json similarity index 97% rename from asv.conf.json rename to asv_bench/asv.conf.json index 5dc60322b..d01f64dd4 100644 --- a/asv.conf.json +++ b/asv_bench/asv.conf.json @@ -11,7 +11,7 @@ // The URL or local path of the source code repository for the // project being benchmarked - "repo": ".", + "repo": "..", // The Python project's subdirectory in your repo. If missing or // the empty string, the project is assumed to be located at the root @@ -37,7 +37,7 @@ // determined from "repo" by looking at the protocol in the URL // (if remote), or by looking for special directories, such as // ".git" (if local). - // "dvcs": "git", + "dvcs": "git", // The tool to use to create environments. May be "conda", // "virtualenv" or other value depending on the plugins in use. @@ -48,10 +48,10 @@ // timeout in seconds for installing any dependencies in environment // defaults to 10 min - //"install_timeout": 600, + "install_timeout": 600, // the base URL to show a commit for the project. - "show_commit_url": "http://github.com/dcherian/flox/commit/", + "show_commit_url": "http://github.com/xarray-contrib/flox/commit/", // The Pythons you'd like to test against. If not provided, defaults // to the current version of Python used to run `asv`. @@ -114,7 +114,7 @@ // The directory (relative to the current directory) that benchmarks are // stored in. If not provided, defaults to "benchmarks" - // "benchmark_dir": "benchmarks", + "benchmark_dir": "benchmarks", // The directory (relative to the current directory) to cache the Python // environments in. If not provided, defaults to "env" diff --git a/asv_bench/benchmarks/README_CI.md b/asv_bench/benchmarks/README_CI.md new file mode 100644 index 000000000..9d86cc257 --- /dev/null +++ b/asv_bench/benchmarks/README_CI.md @@ -0,0 +1,122 @@ +# Benchmark CI + + + + + +## How it works + +The `asv` suite can be run for any PR on GitHub Actions (check workflow `.github/workflows/benchmarks.yml`) by adding a `run-benchmark` label to said PR. This will trigger a job that will run the benchmarking suite for the current PR head (merged commit) against the PR base (usually `main`). + +We use `asv continuous` to run the job, which runs a relative performance measurement. This means that there's no state to be saved and that regressions are only caught in terms of performance ratio (absolute numbers are available but they are not useful since we do not use stable hardware over time). `asv continuous` will: + +* Compile `scikit-image` for _both_ commits. We use `ccache` to speed up the process, and `mamba` is used to create the build environments. +* Run the benchmark suite for both commits, _twice_ (since `processes=2` by default). +* Generate a report table with performance ratios: + * `ratio=1.0` -> performance didn't change. + * `ratio<1.0` -> PR made it slower. + * `ratio>1.0` -> PR made it faster. + +Due to the sensitivity of the test, we cannot guarantee that false positives are not produced. In practice, values between `(0.7, 1.5)` are to be considered part of the measurement noise. When in doubt, running the benchmark suite one more time will provide more information about the test being a false positive or not. + +## Running the benchmarks on GitHub Actions + +1. On a PR, add the label `run-benchmark`. +2. The CI job will be started. Checks will appear in the usual dashboard panel above the comment box. +3. If more commits are added, the label checks will be grouped with the last commit checks _before_ you added the label. +4. Alternatively, you can always go to the `Actions` tab in the repo and [filter for `workflow:Benchmark`](https://github.com/scikit-image/scikit-image/actions?query=workflow%3ABenchmark). Your username will be assigned to the `actor` field, so you can also filter the results with that if you need it. + +## The artifacts + +The CI job will also generate an artifact. This is the `.asv/results` directory compressed in a zip file. Its contents include: + +* `fv-xxxxx-xx/`. A directory for the machine that ran the suite. It contains three files: + * `.json`, `.json`: the benchmark results for each commit, with stats. + * `machine.json`: details about the hardware. +* `benchmarks.json`: metadata about the current benchmark suite. +* `benchmarks.log`: the CI logs for this run. +* This README. + +## Re-running the analysis + +Although the CI logs should be enough to get an idea of what happened (check the table at the end), one can use `asv` to run the analysis routines again. + +1. Uncompress the artifact contents in the repo, under `.asv/results`. This is, you should see `.asv/results/benchmarks.log`, not `.asv/results/something_else/benchmarks.log`. Write down the machine directory name for later. +2. Run `asv show` to see your available results. You will see something like this: + +``` +$> asv show + +Commits with results: + +Machine : Jaimes-MBP +Environment: conda-py3.9-cython-numpy1.20-scipy + + 00875e67 + +Machine : fv-az95-499 +Environment: conda-py3.7-cython-numpy1.17-pooch-scipy + + 8db28f02 + 3a305096 +``` + +3. We are interested in the commits for `fv-az95-499` (the CI machine for this run). We can compare them with `asv compare` and some extra options. `--sort ratio` will show largest ratios first, instead of alphabetical order. `--split` will produce three tables: improved, worsened, no changes. `--factor 1.5` tells `asv` to only complain if deviations are above a 1.5 ratio. `-m` is used to indicate the machine ID (use the one you wrote down in step 1). Finally, specify your commit hashes: baseline first, then contender! + +``` +$> asv compare --sort ratio --split --factor 1.5 -m fv-az95-499 8db28f02 3a305096 + +Benchmarks that have stayed the same: + + before after ratio + [8db28f02] [3a305096] + + n/a n/a n/a benchmark_restoration.RollingBall.time_rollingball_ndim + 1.23±0.04ms 1.37±0.1ms 1.12 benchmark_transform_warp.WarpSuite.time_to_float64(, 128, 3) + 5.07±0.1μs 5.59±0.4μs 1.10 benchmark_transform_warp.ResizeLocalMeanSuite.time_resize_local_mean(, (192, 192, 192), (192, 192, 192)) + 1.23±0.02ms 1.33±0.1ms 1.08 benchmark_transform_warp.WarpSuite.time_same_type(, 128, 3) + 9.45±0.2ms 10.1±0.5ms 1.07 benchmark_rank.Rank3DSuite.time_3d_filters('majority', (32, 32, 32)) + 23.0±0.9ms 24.6±1ms 1.07 benchmark_interpolation.InterpolationResize.time_resize((80, 80, 80), 0, 'symmetric', , True) + 38.7±1ms 41.1±1ms 1.06 benchmark_transform_warp.ResizeLocalMeanSuite.time_resize_local_mean(, (2048, 2048), (192, 192, 192)) + 4.97±0.2μs 5.24±0.2μs 1.05 benchmark_transform_warp.ResizeLocalMeanSuite.time_resize_local_mean(, (2048, 2048), (2048, 2048)) + 4.21±0.2ms 4.42±0.3ms 1.05 benchmark_rank.Rank3DSuite.time_3d_filters('gradient', (32, 32, 32)) + +... +``` + +If you want more details on a specific test, you can use `asv show`. Use `-b pattern` to filter which tests to show, and then specify a commit hash to inspect: + +``` +$> asv show -b time_to_float64 8db28f02 + +Commit: 8db28f02 + +benchmark_transform_warp.WarpSuite.time_to_float64 [fv-az95-499/conda-py3.7-cython-numpy1.17-pooch-scipy] + ok + =============== ============= ========== ============= ========== ============ ========== ============ ========== ============ + -- N / order + --------------- -------------------------------------------------------------------------------------------------------------- + dtype_in 128 / 0 128 / 1 128 / 3 1024 / 0 1024 / 1 1024 / 3 4096 / 0 4096 / 1 4096 / 3 + =============== ============= ========== ============= ========== ============ ========== ============ ========== ============ + numpy.uint8 2.56±0.09ms 523±30μs 1.28±0.05ms 130±3ms 28.7±2ms 81.9±3ms 2.42±0.01s 659±5ms 1.48±0.01s + numpy.uint16 2.48±0.03ms 530±10μs 1.28±0.02ms 130±1ms 30.4±0.7ms 81.1±2ms 2.44±0s 653±3ms 1.47±0.02s + numpy.float32 2.59±0.1ms 518±20μs 1.27±0.01ms 127±3ms 26.6±1ms 74.8±2ms 2.50±0.01s 546±10ms 1.33±0.02s + numpy.float64 2.48±0.04ms 513±50μs 1.23±0.04ms 134±3ms 30.7±2ms 85.4±2ms 2.55±0.01s 632±4ms 1.45±0.01s + =============== ============= ========== ============= ========== ============ ========== ============ ========== ============ + started: 2021-07-06 06:14:36, duration: 1.99m +``` + +## Other details + +### Skipping slow or demanding tests + +To minimize the time required to run the full suite, we trimmed the parameter matrix in some cases and, in others, directly skipped tests that ran for too long or require too much memory. Unlike `pytest`, `asv` does not have a notion of marks. However, you can `raise NotImplementedError` in the setup step to skip a test. In that vein, a new private function is defined at `benchmarks.__init__`: `_skip_slow`. This will check if the `ASV_SKIP_SLOW` environment variable has been defined. If set to `1`, it will raise `NotImplementedError` and skip the test. To implement this behavior in other tests, you can add the following attribute: + +```python +from . import _skip_slow # this function is defined in benchmarks.__init__ + +def time_something_slow(): + pass + +time_something.setup = _skip_slow +``` diff --git a/asv_bench/benchmarks/__init__.py b/asv_bench/benchmarks/__init__.py new file mode 100644 index 000000000..0a35e6cd7 --- /dev/null +++ b/asv_bench/benchmarks/__init__.py @@ -0,0 +1,28 @@ +import os + + +def parameterized(names, params): + def decorator(func): + func.param_names = names + func.params = params + return func + + return decorator + + +def _skip_slow(): + """ + Use this function to skip slow or highly demanding tests. + + Use it as a `Class.setup` method or a `function.setup` attribute. + + Examples + -------- + >>> from . import _skip_slow + >>> def time_something_slow(): + ... pass + ... + >>> time_something.setup = _skip_slow + """ + if os.environ.get("ASV_SKIP_SLOW", "0") == "1": + raise NotImplementedError("Skipping this test...") diff --git a/benchmarks/combine.py b/asv_bench/benchmarks/combine.py similarity index 100% rename from benchmarks/combine.py rename to asv_bench/benchmarks/combine.py diff --git a/benchmarks/reduce.py b/asv_bench/benchmarks/reduce.py similarity index 96% rename from benchmarks/reduce.py rename to asv_bench/benchmarks/reduce.py index fd5d65714..3f4a9d02f 100644 --- a/benchmarks/reduce.py +++ b/asv_bench/benchmarks/reduce.py @@ -6,7 +6,7 @@ from . import parameterized N = 1000 -funcs = ["sum", "nansum", "mean", "nanmean", "argmax", "max"] +funcs = ["sum", "nansum", "mean", "nanmean", "max"] engines = ["flox", "numpy"] diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py deleted file mode 100644 index fc74e8967..000000000 --- a/benchmarks/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -def parameterized(names, params): - def decorator(func): - func.param_names = names - func.params = params - return func - - return decorator diff --git a/ci/environment.yml b/ci/environment.yml index aff6bc911..a752bef55 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -2,6 +2,7 @@ name: flox-tests channels: - conda-forge dependencies: + - asv - cachey - codecov - dask-core From 7c70b26eb7345b60695fedbab809976cd59e1832 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 Sep 2022 11:12:47 -0600 Subject: [PATCH 03/17] Bump mamba-org/provision-with-micromamba from 12 to 13 (#141) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/benchmarks.yml | 2 +- .github/workflows/ci.yaml | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index badf4be8c..5c6e03a2e 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -22,7 +22,7 @@ jobs: fetch-depth: 0 - name: Set up conda environment - uses: mamba-org/provision-with-micromamba@v12 + uses: mamba-org/provision-with-micromamba@v13 with: environment-file: ci/environment.yml environment-name: flox-tests diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 4db8e6a8a..68859e89e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -34,7 +34,7 @@ jobs: run: | echo "PYTHON_VERSION=${{ matrix.python-version }}" >> $GITHUB_ENV - name: Set up conda environment - uses: mamba-org/provision-with-micromamba@v12 + uses: mamba-org/provision-with-micromamba@v13 with: environment-file: ci/environment.yml environment-name: flox-tests @@ -78,7 +78,7 @@ jobs: with: fetch-depth: 0 # Fetch all history for all branches and tags. - name: Set up conda environment - uses: mamba-org/provision-with-micromamba@v12 + uses: mamba-org/provision-with-micromamba@v13 with: environment-file: ci/${{ matrix.env }}.yml environment-name: flox-tests @@ -101,7 +101,7 @@ jobs: steps: - uses: actions/checkout@v3 - name: Set up conda environment - uses: mamba-org/provision-with-micromamba@v12 + uses: mamba-org/provision-with-micromamba@v13 with: environment-file: ci/upstream-dev-env.yml environment-name: flox-tests @@ -123,7 +123,7 @@ jobs: repository: 'pydata/xarray' fetch-depth: 0 # Fetch all history for all branches and tags. - name: Set up conda environment - uses: mamba-org/provision-with-micromamba@v12 + uses: mamba-org/provision-with-micromamba@v13 with: environment-file: ci/requirements/environment.yml environment-name: xarray-tests From a107eac319dbf80ddf3d7779fab5e7191688a1f8 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 19 Sep 2022 17:28:51 +0200 Subject: [PATCH 04/17] Add ci additional (#143) --- .github/workflows/ci-additional.yaml | 118 +++++++++++++++++++++++++++ ci/environment.yml | 1 + flox/xarray.py | 20 +++++ 3 files changed, 139 insertions(+) create mode 100644 .github/workflows/ci-additional.yaml diff --git a/.github/workflows/ci-additional.yaml b/.github/workflows/ci-additional.yaml new file mode 100644 index 000000000..65b790d96 --- /dev/null +++ b/.github/workflows/ci-additional.yaml @@ -0,0 +1,118 @@ +name: CI Additional +on: + push: + branches: + - "*" + pull_request: + branches: + - "*" + workflow_dispatch: # allows you to trigger manually + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + detect-ci-trigger: + name: detect ci trigger + runs-on: ubuntu-latest + if: | + github.repository == 'xarray-contrib/flox' + && (github.event_name == 'push' || github.event_name == 'pull_request') + outputs: + triggered: ${{ steps.detect-trigger.outputs.trigger-found }} + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 2 + - uses: xarray-contrib/ci-trigger@v1.1 + id: detect-trigger + with: + keyword: "[skip-ci]" + + doctest: + name: Doctests + runs-on: "ubuntu-latest" + needs: detect-ci-trigger + if: needs.detect-ci-trigger.outputs.triggered == 'false' + defaults: + run: + shell: bash -l {0} + + env: + CONDA_ENV_FILE: ci/environment.yml + PYTHON_VERSION: "3.10" + + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 # Fetch all history for all branches and tags. + + - name: set environment variables + run: | + echo "TODAY=$(date +'%Y-%m-%d')" >> $GITHUB_ENV + + - name: Setup micromamba + uses: mamba-org/provision-with-micromamba@34071ca7df4983ccd272ed0d3625818b27b70dcc + with: + environment-file: ${{env.CONDA_ENV_FILE}} + environment-name: flox-tests + extra-specs: | + python=${{env.PYTHON_VERSION}} + cache-env: true + cache-env-key: "${{runner.os}}-${{runner.arch}}-py${{env.PYTHON_VERSION}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}" + + - name: Install flox + run: | + python -m pip install --no-deps -e . + - name: Version info + run: | + conda info -a + conda list + - name: Run doctests + run: | + python -m pytest --doctest-modules flox --ignore flox/tests + + # mypy: + # name: Mypy + # runs-on: "ubuntu-latest" + # needs: detect-ci-trigger + # if: needs.detect-ci-trigger.outputs.triggered == 'false' + # defaults: + # run: + # shell: bash -l {0} + # env: + # CONDA_ENV_FILE: ci/environment.yml + # PYTHON_VERSION: "3.10" + + # steps: + # - uses: actions/checkout@v3 + # with: + # fetch-depth: 0 # Fetch all history for all branches and tags. + + # - name: set environment variables + # run: | + # echo "TODAY=$(date +'%Y-%m-%d')" >> $GITHUB_ENV + # - name: Setup micromamba + # uses: mamba-org/provision-with-micromamba@34071ca7df4983ccd272ed0d3625818b27b70dcc + # with: + # environment-file: ${{env.CONDA_ENV_FILE}} + # environment-name: xarray-tests + # extra-specs: | + # python=${{env.PYTHON_VERSION}} + # cache-env: true + # cache-env-key: "${{runner.os}}-${{runner.arch}}-py${{env.PYTHON_VERSION}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}" + # - name: Install xarray + # run: | + # python -m pip install --no-deps -e . + # - name: Version info + # run: | + # conda info -a + # conda list + # - name: Install mypy + # run: | + # python -m pip install mypy + + # - name: Run mypy + # run: | + # python -m mypy --install-types --non-interactive diff --git a/ci/environment.yml b/ci/environment.yml index a752bef55..d83bd3b9c 100644 --- a/ci/environment.yml +++ b/ci/environment.yml @@ -9,6 +9,7 @@ dependencies: - netcdf4 - pandas - numpy>=1.20 + - matplotlib - pip - pytest - pytest-cov diff --git a/flox/xarray.py b/flox/xarray.py index 7234f8826..e06723e77 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -161,6 +161,26 @@ def xarray_reduce( DataArray or Dataset Reduced object + Examples + -------- + >>> import xarray as xr + >>> from flox.xarray import xarray_reduce + + >>> # Create a group index: + >>> labels = xr.DataArray( + ... [1, 2, 3, 1, 2, 3, 0, 0, 0], + ... dims="x", + ... name="label", + ... ) + >>> # Create a DataArray to apply the group index on: + >>> da = da = xr.ones_like(labels) + >>> # Sum all values in da that matches the elements in the group index: + >>> xarray_reduce(da, labels, func="sum") + + array([3, 2, 2, 2]) + Coordinates: + * label (label) int64 0 1 2 3 + See Also -------- flox.core.groupby_reduce From 41f3a7b2a4dcf231bd32d8feed80052a0e0802a2 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Tue, 20 Sep 2022 18:30:08 +0200 Subject: [PATCH 05/17] Remove duplicate examples headers (#147) --- flox/xarray.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/flox/xarray.py b/flox/xarray.py index e06723e77..9c8fe6108 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -161,6 +161,15 @@ def xarray_reduce( DataArray or Dataset Reduced object + See Also + -------- + flox.core.groupby_reduce + + Raises + ------ + NotImplementedError + ValueError + Examples -------- >>> import xarray as xr @@ -180,19 +189,6 @@ def xarray_reduce( array([3, 2, 2, 2]) Coordinates: * label (label) int64 0 1 2 3 - - See Also - -------- - flox.core.groupby_reduce - - Raises - ------ - NotImplementedError - ValueError - - Examples - -------- - FIXME: Add docs. """ if skipna is not None and isinstance(func, Aggregation): From 75a98e74c88913fd2c7bc7157e10a6b01d4ebefc Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Tue, 20 Sep 2022 19:42:59 +0200 Subject: [PATCH 06/17] Get pre commit bot to update (#145) --- .pre-commit-config.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1d4aaa3c1..e245e6c97 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,5 @@ -ci: - autoupdate_schedule: quarterly +# ci: + # autoupdate_schedule: quarterly repos: - repo: https://github.com/pre-commit/pre-commit-hooks From 6fbfaa014d4470cf191386cbfdad23df9e21f98d Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Tue, 20 Sep 2022 19:50:11 +0200 Subject: [PATCH 07/17] Add mypy ignores (#146) * add mypy ignores * Add in the toml instead * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- pyproject.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 1259704e3..32e55d712 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,8 +39,12 @@ show_error_codes = true [[tool.mypy.overrides]] module=[ + "cachey", + "cftime", "dask.*", + "importlib_metadata", "numpy_groupies", + "matplotlib.*", "pandas", "setuptools", "toolz" From ab29d2c36ccf0b661344403bae4944025a77cd63 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Tue, 20 Sep 2022 15:43:50 -0600 Subject: [PATCH 08/17] pre-commit autoupdate (#148) --- .pre-commit-config.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e245e6c97..b76d51d01 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,5 @@ -# ci: - # autoupdate_schedule: quarterly +ci: + autoupdate_schedule: quarterly repos: - repo: https://github.com/pre-commit/pre-commit-hooks @@ -10,12 +10,12 @@ repos: - id: check-docstring-first - repo: https://github.com/psf/black - rev: 22.6.0 + rev: 22.8.0 hooks: - id: black - repo: https://github.com/PyCQA/flake8 - rev: 4.0.1 + rev: 5.0.4 hooks: - id: flake8 From af3e3ce0e804bdd7025d2cae5d040c4eb601c83d Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Thu, 22 Sep 2022 19:00:17 +0200 Subject: [PATCH 09/17] Raise error if multiple by's are used with Ellipsis (#149) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- flox/xarray.py | 13 ++++++++----- tests/test_xarray.py | 3 +++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/flox/xarray.py b/flox/xarray.py index 9c8fe6108..29b023a0e 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -194,6 +194,7 @@ def xarray_reduce( if skipna is not None and isinstance(func, Aggregation): raise ValueError("skipna must be None when func is an Aggregation.") + nby = len(by) for b in by: if isinstance(b, xr.DataArray) and b.name is None: raise ValueError("Cannot group by unnamed DataArrays.") @@ -203,11 +204,11 @@ def xarray_reduce( keep_attrs = True if isinstance(isbin, bool): - isbin = (isbin,) * len(by) + isbin = (isbin,) * nby if expected_groups is None: - expected_groups = (None,) * len(by) + expected_groups = (None,) * nby if isinstance(expected_groups, (np.ndarray, list)): # TODO: test for list - if len(by) == 1: + if nby == 1: expected_groups = (expected_groups,) else: raise ValueError("Needs better message.") @@ -239,6 +240,8 @@ def xarray_reduce( ds = ds.drop_vars([var for var in maybe_drop if var in ds.variables]) if dim is Ellipsis: + if nby > 1: + raise NotImplementedError("Multiple by are not allowed when dim is Ellipsis.") dim = tuple(obj.dims) if by[0].name in ds.dims and not isbin[0]: dim = tuple(d for d in dim if d != by[0].name) @@ -351,7 +354,7 @@ def wrapper(array, *by, func, skipna, **kwargs): missing_dim[k] = v input_core_dims = _get_input_core_dims(group_names, dim, ds, grouper_dims) - input_core_dims += [input_core_dims[-1]] * (len(by) - 1) + input_core_dims += [input_core_dims[-1]] * (nby - 1) actual = xr.apply_ufunc( wrapper, @@ -409,7 +412,7 @@ def wrapper(array, *by, func, skipna, **kwargs): if unindexed_dims: actual = actual.drop_vars(unindexed_dims) - if len(by) == 1: + if nby == 1: for var in actual: if isinstance(obj, xr.DataArray): template = obj diff --git a/tests/test_xarray.py b/tests/test_xarray.py index 90a2d50c4..6669830b5 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -159,6 +159,9 @@ def test_xarray_reduce_multiple_groupers_2(pass_expected_groups, chunk, engine): actual = xarray_reduce(da, "labels", "labels2", **kwargs) xr.testing.assert_identical(expected, actual) + with pytest.raises(NotImplementedError): + xarray_reduce(da, "labels", "labels2", dim=..., **kwargs) + @requires_dask def test_dask_groupers_error(): From 2b54c5e1353493ffc6e9922cde73b201bff836de Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Fri, 23 Sep 2022 17:22:06 +0200 Subject: [PATCH 10/17] Fix mypy errors in xarray.py, xrutils.py, cache.py (#144) * update dim typing * Fix mypy errors in xarray.py * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * start mypy ci * Use T_DataArray and T_Dataset * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add mypy ignores * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * correct typing a bit * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * test newer flake8 if ellipsis passes there * Allow ellipsis in flake8 * Update core.py * Update xarray.py * Update setup.cfg * Update xarray.py * Update xarray.py * Update xarray.py * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Update xarray.py * Update pyproject.toml * Update xarray.py * Update xarray.py * hopefully no more pytest errors. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * make sure expected_groups doesn't have None * Update flox/xarray.py Co-authored-by: Deepak Cherian * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * ds_broad and longer comment * Use same for loop for similar things. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix xrutils.py * fix errors in cache.py * Turn off mypy check * Update flox/xarray.py Co-authored-by: Deepak Cherian * Update flox/xarray.py Co-authored-by: Deepak Cherian * Use if else format to avoid tuple creation * Update xarray.py Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Deepak Cherian --- flox/cache.py | 2 +- flox/core.py | 15 ++++- flox/xarray.py | 151 +++++++++++++++++++++++++++--------------------- flox/xrutils.py | 2 +- setup.cfg | 2 + 5 files changed, 102 insertions(+), 70 deletions(-) diff --git a/flox/cache.py b/flox/cache.py index eaac3f360..4f8de8b59 100644 --- a/flox/cache.py +++ b/flox/cache.py @@ -8,4 +8,4 @@ cache = cachey.Cache(1e6) memoize = partial(cache.memoize, key=dask.base.tokenize) except ImportError: - memoize = lambda x: x + memoize = lambda x: x # type: ignore diff --git a/flox/core.py b/flox/core.py index 943fd029e..58b89bf17 100644 --- a/flox/core.py +++ b/flox/core.py @@ -5,7 +5,16 @@ import operator from collections import namedtuple from functools import partial, reduce -from typing import TYPE_CHECKING, Any, Callable, Dict, Mapping, Sequence, Union +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + Iterable, + Mapping, + Sequence, + Union, +) import numpy as np import numpy_groupies as npg @@ -1282,8 +1291,8 @@ def _assert_by_is_aligned(shape, by): def _convert_expected_groups_to_index( - expected_groups: tuple, isbin: bool, sort: bool -) -> pd.Index | None: + expected_groups: Iterable, isbin: Sequence[bool], sort: bool +) -> tuple[pd.Index | None]: out = [] for ex, isbin_ in zip(expected_groups, isbin): if isinstance(ex, pd.IntervalIndex) or (isinstance(ex, pd.Index) and not isbin): diff --git a/flox/xarray.py b/flox/xarray.py index 29b023a0e..c02959485 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Hashable, Iterable, Sequence +from typing import TYPE_CHECKING, Any, Hashable, Iterable, Sequence, Union import numpy as np import pandas as pd @@ -19,7 +19,10 @@ from .xrutils import _contains_cftime_datetimes, _to_pytimedelta, datetime_to_numeric if TYPE_CHECKING: - from xarray import DataArray, Dataset, Resample + from xarray.core.resample import Resample + from xarray.core.types import T_DataArray, T_Dataset + + Dims = Union[str, Iterable[Hashable], None] def _get_input_core_dims(group_names, dim, ds, grouper_dims): @@ -51,13 +54,13 @@ def lookup_order(dimension): def xarray_reduce( - obj: Dataset | DataArray, - *by: DataArray | Iterable[str] | Iterable[DataArray], + obj: T_Dataset | T_DataArray, + *by: T_DataArray | Hashable, func: str | Aggregation, expected_groups=None, isbin: bool | Sequence[bool] = False, sort: bool = True, - dim: Hashable = None, + dim: Dims | ellipsis = None, split_out: int = 1, fill_value=None, method: str = "map-reduce", @@ -203,8 +206,11 @@ def xarray_reduce( if keep_attrs is None: keep_attrs = True - if isinstance(isbin, bool): - isbin = (isbin,) * nby + if isinstance(isbin, Sequence): + isbins = isbin + else: + isbins = (isbin,) * nby + if expected_groups is None: expected_groups = (None,) * nby if isinstance(expected_groups, (np.ndarray, list)): # TODO: test for list @@ -217,78 +223,86 @@ def xarray_reduce( raise NotImplementedError # eventually drop the variables we are grouping by - maybe_drop = [b for b in by if isinstance(b, str)] + maybe_drop = [b for b in by if isinstance(b, Hashable)] unindexed_dims = tuple( b - for b, isbin_ in zip(by, isbin) - if isinstance(b, str) and not isbin_ and b in obj.dims and b not in obj.indexes + for b, isbin_ in zip(by, isbins) + if isinstance(b, Hashable) and not isbin_ and b in obj.dims and b not in obj.indexes ) - by: tuple[DataArray] = tuple(obj[g] if isinstance(g, str) else g for g in by) # type: ignore + by_da = tuple(obj[g] if isinstance(g, Hashable) else g for g in by) grouper_dims = [] - for g in by: + for g in by_da: for d in g.dims: if d not in grouper_dims: grouper_dims.append(d) - if isinstance(obj, xr.DataArray): - ds = obj._to_temp_dataset() - else: + if isinstance(obj, xr.Dataset): ds = obj + else: + ds = obj._to_temp_dataset() ds = ds.drop_vars([var for var in maybe_drop if var in ds.variables]) if dim is Ellipsis: if nby > 1: raise NotImplementedError("Multiple by are not allowed when dim is Ellipsis.") - dim = tuple(obj.dims) - if by[0].name in ds.dims and not isbin[0]: - dim = tuple(d for d in dim if d != by[0].name) + name_ = by_da[0].name + if name_ in ds.dims and not isbins[0]: + dim_tuple = tuple(d for d in obj.dims if d != name_) + else: + dim_tuple = tuple(obj.dims) elif dim is not None: - dim = _atleast_1d(dim) + dim_tuple = _atleast_1d(dim) else: - dim = tuple() + dim_tuple = tuple() # broadcast all variables against each other along all dimensions in `by` variables # don't exclude `dim` because it need not be a dimension in any of the `by` variables! # in the case where dim is Ellipsis, and by.ndim < obj.ndim # then we also broadcast `by` to all `obj.dims` # TODO: avoid this broadcasting - exclude_dims = tuple(d for d in ds.dims if d not in grouper_dims and d not in dim) - ds, *by = xr.broadcast(ds, *by, exclude=exclude_dims) + exclude_dims = tuple(d for d in ds.dims if d not in grouper_dims and d not in dim_tuple) + ds_broad, *by_broad = xr.broadcast(ds, *by_da, exclude=exclude_dims) - if not dim: - dim = tuple(by[0].dims) + # all members of by_broad have the same dimensions + # so we just pull by_broad[0].dims if dim is None + if not dim_tuple: + dim_tuple = tuple(by_broad[0].dims) - if any(d not in grouper_dims and d not in obj.dims for d in dim): + if any(d not in grouper_dims and d not in obj.dims for d in dim_tuple): raise ValueError(f"Cannot reduce over absent dimensions {dim}.") - dims_not_in_groupers = tuple(d for d in dim if d not in grouper_dims) - if dims_not_in_groupers == tuple(dim) and not any(isbin): + dims_not_in_groupers = tuple(d for d in dim_tuple if d not in grouper_dims) + if dims_not_in_groupers == tuple(dim_tuple) and not any(isbins): # reducing along a dimension along which groups do not vary # This is really just a normal reduction. # This is not right when binning so we exclude. - if skipna and isinstance(func, str): - dsfunc = func[3:] + if isinstance(func, str): + dsfunc = func[3:] if skipna else func else: - dsfunc = func + raise NotImplementedError( + "func must be a string when reducing along a dimension not present in `by`" + ) # TODO: skipna needs test - result = getattr(ds, dsfunc)(dim=dim, skipna=skipna) + result = getattr(ds_broad, dsfunc)(dim=dim_tuple, skipna=skipna) if isinstance(obj, xr.DataArray): return obj._from_temp_dataset(result) else: return result - axis = tuple(range(-len(dim), 0)) - group_names = tuple(g.name if not binned else f"{g.name}_bins" for g, binned in zip(by, isbin)) - - group_shape = [None] * len(by) - expected_groups = list(expected_groups) + axis = tuple(range(-len(dim_tuple), 0)) # Set expected_groups and convert to index since we need coords, sizes # for output xarray objects - for idx, (b, expect, isbin_) in enumerate(zip(by, expected_groups, isbin)): + expected_groups = list(expected_groups) + group_names: tuple[Any, ...] = () + group_sizes: dict[Any, int] = {} + for idx, (b_, expect, isbin_) in enumerate(zip(by_broad, expected_groups, isbins)): + group_name = b_.name if not isbin_ else f"{b_.name}_bins" + group_names += (group_name,) + if isbin_ and isinstance(expect, int): raise NotImplementedError( "flox does not support binning into an integer number of bins yet." @@ -297,13 +311,21 @@ def xarray_reduce( if isbin_: raise ValueError( f"Please provided bin edges for group variable {idx} " - f"named {group_names[idx]} in expected_groups." + f"named {group_name} in expected_groups." ) - expected_groups[idx] = _get_expected_groups(b.data, sort=sort, raise_if_dask=True) - - expected_groups = _convert_expected_groups_to_index(expected_groups, isbin, sort=sort) - group_shape = tuple(len(e) for e in expected_groups) - group_sizes = dict(zip(group_names, group_shape)) + expect_ = _get_expected_groups(b_.data, sort=sort, raise_if_dask=True) + else: + expect_ = expect + expect_index = _convert_expected_groups_to_index((expect_,), (isbin_,), sort=sort)[0] + + # The if-check is for type hinting mainly, it narrows down the return + # type of _convert_expected_groups_to_index to pure pd.Index: + if expect_index is not None: + expected_groups[idx] = expect_index + group_sizes[group_name] = len(expect_index) + else: + # This will never be reached + raise ValueError("expect_index cannot be None") def wrapper(array, *by, func, skipna, **kwargs): # Handle skipna here because I need to know dtype to make a good default choice. @@ -349,20 +371,20 @@ def wrapper(array, *by, func, skipna, **kwargs): if isinstance(obj, xr.Dataset): # broadcasting means the group dim gets added to ds, so we check the original obj for k, v in obj.data_vars.items(): - is_missing_dim = not (any(d in v.dims for d in dim)) + is_missing_dim = not (any(d in v.dims for d in dim_tuple)) if is_missing_dim: missing_dim[k] = v - input_core_dims = _get_input_core_dims(group_names, dim, ds, grouper_dims) + input_core_dims = _get_input_core_dims(group_names, dim_tuple, ds_broad, grouper_dims) input_core_dims += [input_core_dims[-1]] * (nby - 1) actual = xr.apply_ufunc( wrapper, - ds.drop_vars(tuple(missing_dim)).transpose(..., *grouper_dims), - *by, + ds_broad.drop_vars(tuple(missing_dim)).transpose(..., *grouper_dims), + *by_broad, input_core_dims=input_core_dims, # for xarray's test_groupby_duplicate_coordinate_labels - exclude_dims=set(dim), + exclude_dims=set(dim_tuple), output_core_dims=[group_names], dask="allowed", dask_gufunc_kwargs=dict(output_sizes=group_sizes), @@ -379,18 +401,18 @@ def wrapper(array, *by, func, skipna, **kwargs): "engine": engine, "reindex": reindex, "expected_groups": tuple(expected_groups), - "isbin": isbin, + "isbin": isbins, "finalize_kwargs": finalize_kwargs, }, ) # restore non-dim coord variables without the core dimension # TODO: shouldn't apply_ufunc handle this? - for var in set(ds.variables) - set(ds.dims): - if all(d not in ds[var].dims for d in dim): - actual[var] = ds[var] + for var in set(ds_broad.variables) - set(ds_broad.dims): + if all(d not in ds_broad[var].dims for d in dim_tuple): + actual[var] = ds_broad[var] - for name, expect, by_ in zip(group_names, expected_groups, by): + for name, expect, by_ in zip(group_names, expected_groups, by_broad): # Can't remove this till xarray handles IntervalIndex if isinstance(expect, pd.IntervalIndex): expect = expect.to_numpy() @@ -398,8 +420,8 @@ def wrapper(array, *by, func, skipna, **kwargs): actual = actual.drop_vars(name) # When grouping by MultiIndex, expect is an pd.Index wrapping # an object array of tuples - if name in ds.indexes and isinstance(ds.indexes[name], pd.MultiIndex): - levelnames = ds.indexes[name].names + if name in ds_broad.indexes and isinstance(ds_broad.indexes[name], pd.MultiIndex): + levelnames = ds_broad.indexes[name].names expect = pd.MultiIndex.from_tuples(expect.values, names=levelnames) actual[name] = expect if Version(xr.__version__) > Version("2022.03.0"): @@ -414,18 +436,17 @@ def wrapper(array, *by, func, skipna, **kwargs): if nby == 1: for var in actual: - if isinstance(obj, xr.DataArray): - template = obj - else: + if isinstance(obj, xr.Dataset): template = obj[var] + else: + template = obj + if actual[var].ndim > 1: - actual[var] = _restore_dim_order(actual[var], template, by[0]) + actual[var] = _restore_dim_order(actual[var], template, by_broad[0]) if missing_dim: for k, v in missing_dim.items(): - missing_group_dims = { - dim: size for dim, size in group_sizes.items() if dim not in v.dims - } + missing_group_dims = {d: size for d, size in group_sizes.items() if d not in v.dims} # The expand_dims is for backward compat with xarray's questionable behaviour if missing_group_dims: actual[k] = v.expand_dims(missing_group_dims).variable @@ -439,9 +460,9 @@ def wrapper(array, *by, func, skipna, **kwargs): def rechunk_for_cohorts( - obj: DataArray | Dataset, + obj: T_DataArray | T_Dataset, dim: str, - labels: DataArray, + labels: T_DataArray, force_new_chunk_at, chunksize: int | None = None, ignore_old_chunks: bool = False, @@ -486,7 +507,7 @@ def rechunk_for_cohorts( ) -def rechunk_for_blockwise(obj: DataArray | Dataset, dim: str, labels: DataArray): +def rechunk_for_blockwise(obj: T_DataArray | T_Dataset, dim: str, labels: T_DataArray): """ Rechunks array so that group boundaries line up with chunk boundaries, allowing embarassingly parallel group reductions. diff --git a/flox/xrutils.py b/flox/xrutils.py index 17ad2d71d..3e6edd89e 100644 --- a/flox/xrutils.py +++ b/flox/xrutils.py @@ -19,7 +19,7 @@ dask_array_type = dask.array.Array except ImportError: - dask_array_type = () + dask_array_type = () # type: ignore def asarray(data, xp=np): diff --git a/setup.cfg b/setup.cfg index f254a2f19..3645e5bc7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -57,3 +57,5 @@ per-file-ignores = exclude= .eggs doc +builtins = + ellipsis From c62ad4488e64796bdfd307cf4cfa37d241355132 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Tue, 27 Sep 2022 18:15:31 +0200 Subject: [PATCH 11/17] Remove None output from _get_expected_groups (#152) * Remove raise_if_dask * remove argument in in code. * Update core.py * Update core.py * Update core.py * Update core.py * Update core.py * Missed a if ! * Update core.py * Update core.py --- flox/core.py | 11 ++++++----- flox/xarray.py | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/flox/core.py b/flox/core.py index 58b89bf17..0ddf608ed 100644 --- a/flox/core.py +++ b/flox/core.py @@ -54,11 +54,9 @@ def _is_arg_reduction(func: str | Aggregation) -> bool: return False -def _get_expected_groups(by, sort, *, raise_if_dask=True) -> pd.Index | None: +def _get_expected_groups(by, sort: bool) -> pd.Index: if is_duck_dask_array(by): - if raise_if_dask: - raise ValueError("Please provide expected_groups if not grouping by a numpy array.") - return None + raise ValueError("Please provide expected_groups if not grouping by a numpy array.") flatby = by.reshape(-1) expected = pd.unique(flatby[~isnull(flatby)]) return _convert_expected_groups_to_index((expected,), isbin=(False,), sort=sort)[0] @@ -1152,7 +1150,10 @@ def dask_groupby_agg( else: intermediate = applied if expected_groups is None: - expected_groups = _get_expected_groups(by_input, sort=sort, raise_if_dask=False) + if is_duck_dask_array(by_input): + expected_groups = None + else: + expected_groups = _get_expected_groups(by_input, sort=sort) group_chunks = ((len(expected_groups),) if expected_groups is not None else (np.nan,),) if method == "map-reduce": diff --git a/flox/xarray.py b/flox/xarray.py index c02959485..5f87bafe6 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -313,7 +313,7 @@ def xarray_reduce( f"Please provided bin edges for group variable {idx} " f"named {group_name} in expected_groups." ) - expect_ = _get_expected_groups(b_.data, sort=sort, raise_if_dask=True) + expect_ = _get_expected_groups(b_.data, sort=sort) else: expect_ = expect expect_index = _convert_expected_groups_to_index((expect_,), (isbin_,), sort=sort)[0] From 491ff581b9a71f6e8c765fe24467de7130d6f0f4 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Wed, 28 Sep 2022 23:03:54 +0200 Subject: [PATCH 12/17] Use math.prod instead of np.prod (#157) --- flox/core.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/flox/core.py b/flox/core.py index 0ddf608ed..fe6bbe475 100644 --- a/flox/core.py +++ b/flox/core.py @@ -2,6 +2,7 @@ import copy import itertools +import math import operator from collections import namedtuple from functools import partial, reduce @@ -85,7 +86,7 @@ def _move_reduce_dims_to_end(arr: np.ndarray, axis: Sequence) -> np.ndarray: def _collapse_axis(arr: np.ndarray, naxis: int) -> np.ndarray: """Reshape so that the last `naxis` axes are collapsed to one axis.""" - newshape = arr.shape[:-naxis] + (np.prod(arr.shape[-naxis:]),) + newshape = arr.shape[:-naxis] + (math.prod(arr.shape[-naxis:]),) return arr.reshape(newshape) @@ -165,7 +166,7 @@ def find_group_cohorts(labels, chunks, merge=True, method="cohorts"): # Iterate over each block and create a new block of same shape with "chunk number" shape = tuple(array.blocks.shape[ax] for ax in axis) - blocks = np.empty(np.prod(shape), dtype=object) + blocks = np.empty(math.prod(shape), dtype=object) for idx, block in enumerate(array.blocks.ravel()): blocks[idx] = np.full(tuple(block.shape[ax] for ax in axis), idx) which_chunk = np.block(blocks.reshape(shape).tolist()).reshape(-1) @@ -382,11 +383,11 @@ def offset_labels(labels: np.ndarray, ngroups: int) -> tuple[np.ndarray, int]: """ assert labels.ndim > 1 offset: np.ndarray = ( - labels + np.arange(np.prod(labels.shape[:-1])).reshape((*labels.shape[:-1], -1)) * ngroups + labels + np.arange(math.prod(labels.shape[:-1])).reshape((*labels.shape[:-1], -1)) * ngroups ) # -1 indicates NaNs. preserve these otherwise we aggregate in the wrong groups! offset[labels == -1] = -1 - size: int = np.prod(labels.shape[:-1]) * ngroups # type: ignore + size: int = math.prod(labels.shape[:-1]) * ngroups # type: ignore return offset, size @@ -455,7 +456,7 @@ def factorize_( factorized.append(idx) grp_shape = tuple(len(grp) for grp in found_groups) - ngroups = np.prod(grp_shape) + ngroups = math.prod(grp_shape) if len(by) > 1: group_idx = np.ravel_multi_index(factorized, grp_shape, mode="wrap") # NaNs; as well as values outside the bins are coded by -1 @@ -630,7 +631,7 @@ def chunk_reduce( groups = groups[0] # always reshape to 1D along group dimensions - newshape = array.shape[: array.ndim - by.ndim] + (np.prod(array.shape[-by.ndim :]),) + newshape = array.shape[: array.ndim - by.ndim] + (math.prod(array.shape[-by.ndim :]),) array = array.reshape(newshape) assert group_idx.ndim == 1 @@ -1506,7 +1507,7 @@ def groupby_reduce( by, final_groups, grp_shape = _factorize_multiple( by, expected_groups, by_is_dask=by_is_dask, reindex=reindex ) - expected_groups = (pd.RangeIndex(np.prod(grp_shape)),) + expected_groups = (pd.RangeIndex(math.prod(grp_shape)),) assert len(by) == 1 by = by[0] @@ -1601,7 +1602,7 @@ def groupby_reduce( array_subset = array for ax, idxr in zip(range(-by.ndim, 0), indexer): array_subset = np.take(array_subset, idxr, axis=ax) - numblocks = np.prod([len(array_subset.chunks[ax]) for ax in axis]) + numblocks = math.prod([len(array_subset.chunks[ax]) for ax in axis]) # get final result for these groups r, *g = partial_agg( From 45f22b8109c7da485b551f7f9a06660f9e05655b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 1 Oct 2022 08:39:11 -0600 Subject: [PATCH 13/17] Bump codecov/codecov-action from 3.1.0 to 3.1.1 (#159) Bumps [codecov/codecov-action](https://github.com/codecov/codecov-action) from 3.1.0 to 3.1.1. - [Release notes](https://github.com/codecov/codecov-action/releases) - [Changelog](https://github.com/codecov/codecov-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/codecov/codecov-action/compare/v3.1.0...v3.1.1) --- updated-dependencies: - dependency-name: codecov/codecov-action dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 68859e89e..7133527c0 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -49,7 +49,7 @@ jobs: run: | pytest -n auto --cov=./ --cov-report=xml - name: Upload code coverage to Codecov - uses: codecov/codecov-action@v3.1.0 + uses: codecov/codecov-action@v3.1.1 with: file: ./coverage.xml flags: unittests From 6218e8836950a707d6d6d1df4acef4bda836e67f Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Mon, 3 Oct 2022 15:32:45 +0200 Subject: [PATCH 14/17] Add link to numpy_groupies (#160) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7771ab9be..d99afaa9c 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ It was motivated by 1. Dask Dataframe GroupBy [blogpost](https://blog.dask.org/2019/10/08/df-groupby) -2. numpy_groupies in Xarray +2. [numpy_groupies](https://github.com/ml31415/numpy-groupies) in Xarray [issue](https://github.com/pydata/xarray/issues/4473) (See a From 91b6e1937cca220346d33a0bb67dfb5cadaa1d92 Mon Sep 17 00:00:00 2001 From: Illviljan <14371165+Illviljan@users.noreply.github.com> Date: Thu, 6 Oct 2022 19:57:31 +0200 Subject: [PATCH 15/17] Fix mypy errors in core.py (#150) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: dcherian --- .github/workflows/ci-additional.yaml | 80 +++---- flox/aggregations.py | 2 +- flox/core.py | 313 +++++++++++++++------------ 3 files changed, 219 insertions(+), 176 deletions(-) diff --git a/.github/workflows/ci-additional.yaml b/.github/workflows/ci-additional.yaml index 65b790d96..c326429bb 100644 --- a/.github/workflows/ci-additional.yaml +++ b/.github/workflows/ci-additional.yaml @@ -73,46 +73,46 @@ jobs: run: | python -m pytest --doctest-modules flox --ignore flox/tests - # mypy: - # name: Mypy - # runs-on: "ubuntu-latest" - # needs: detect-ci-trigger - # if: needs.detect-ci-trigger.outputs.triggered == 'false' - # defaults: - # run: - # shell: bash -l {0} - # env: - # CONDA_ENV_FILE: ci/environment.yml - # PYTHON_VERSION: "3.10" + mypy: + name: Mypy + runs-on: "ubuntu-latest" + needs: detect-ci-trigger + if: needs.detect-ci-trigger.outputs.triggered == 'false' + defaults: + run: + shell: bash -l {0} + env: + CONDA_ENV_FILE: ci/environment.yml + PYTHON_VERSION: "3.10" - # steps: - # - uses: actions/checkout@v3 - # with: - # fetch-depth: 0 # Fetch all history for all branches and tags. + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 # Fetch all history for all branches and tags. - # - name: set environment variables - # run: | - # echo "TODAY=$(date +'%Y-%m-%d')" >> $GITHUB_ENV - # - name: Setup micromamba - # uses: mamba-org/provision-with-micromamba@34071ca7df4983ccd272ed0d3625818b27b70dcc - # with: - # environment-file: ${{env.CONDA_ENV_FILE}} - # environment-name: xarray-tests - # extra-specs: | - # python=${{env.PYTHON_VERSION}} - # cache-env: true - # cache-env-key: "${{runner.os}}-${{runner.arch}}-py${{env.PYTHON_VERSION}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}" - # - name: Install xarray - # run: | - # python -m pip install --no-deps -e . - # - name: Version info - # run: | - # conda info -a - # conda list - # - name: Install mypy - # run: | - # python -m pip install mypy + - name: set environment variables + run: | + echo "TODAY=$(date +'%Y-%m-%d')" >> $GITHUB_ENV + - name: Setup micromamba + uses: mamba-org/provision-with-micromamba@34071ca7df4983ccd272ed0d3625818b27b70dcc + with: + environment-file: ${{env.CONDA_ENV_FILE}} + environment-name: xarray-tests + extra-specs: | + python=${{env.PYTHON_VERSION}} + cache-env: true + cache-env-key: "${{runner.os}}-${{runner.arch}}-py${{env.PYTHON_VERSION}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}" + - name: Install xarray + run: | + python -m pip install --no-deps -e . + - name: Version info + run: | + conda info -a + conda list + - name: Install mypy + run: | + python -m pip install mypy - # - name: Run mypy - # run: | - # python -m mypy --install-types --non-interactive + - name: Run mypy + run: | + python -m mypy --install-types --non-interactive diff --git a/flox/aggregations.py b/flox/aggregations.py index fad92a975..2a1d68d6d 100644 --- a/flox/aggregations.py +++ b/flox/aggregations.py @@ -467,7 +467,7 @@ def _initialize_aggregation( func: str | Aggregation, array_dtype, fill_value, - min_count: int, + min_count: int | None, finalize_kwargs, ) -> Aggregation: if not isinstance(func, Aggregation): diff --git a/flox/core.py b/flox/core.py index fe6bbe475..a73a97398 100644 --- a/flox/core.py +++ b/flox/core.py @@ -12,6 +12,7 @@ Callable, Dict, Iterable, + Literal, Mapping, Sequence, Union, @@ -36,6 +37,18 @@ if TYPE_CHECKING: import dask.array.Array as DaskArray + T_Func = Union[str, Callable] + T_Funcs = Union[T_Func, Sequence[T_Func]] + T_Axis = int + T_Axes = tuple[T_Axis, ...] + T_AxesOpt = Union[T_Axis, T_Axes, None] + T_Dtypes = Union[np.typing.DTypeLike, Sequence[np.typing.DTypeLike], None] + T_FillValues = Union[np.typing.ArrayLike, Sequence[np.typing.ArrayLike], None] + T_Engine = Literal["flox", "numpy", "numba"] + T_MethodCohorts = Literal["cohorts", "split-reduce"] + T_Method = Literal["map-reduce", "blockwise", T_MethodCohorts] + T_IsBins = Union[bool | Sequence[bool]] + IntermediateDict = Dict[Union[str, Callable], Any] FinalResultsDict = Dict[str, Union["DaskArray", np.ndarray]] @@ -72,11 +85,11 @@ def _get_chunk_reduction(reduction_type: str) -> Callable: raise ValueError(f"Unknown reduction type: {reduction_type}") -def is_nanlen(reduction: str | Callable) -> bool: +def is_nanlen(reduction: T_Func) -> bool: return isinstance(reduction, str) and reduction == "nanlen" -def _move_reduce_dims_to_end(arr: np.ndarray, axis: Sequence) -> np.ndarray: +def _move_reduce_dims_to_end(arr: np.ndarray, axis: T_Axes) -> np.ndarray: """Transpose `arr` by moving `axis` to the end.""" axis = tuple(axis) order = tuple(ax for ax in np.arange(arr.ndim) if ax not in axis) + axis @@ -125,7 +138,7 @@ def _get_optimal_chunks_for_groups(chunks, labels): @memoize -def find_group_cohorts(labels, chunks, merge=True, method="cohorts"): +def find_group_cohorts(labels, chunks, merge=True, method: T_MethodCohorts = "cohorts"): """ Finds groups labels that occur together aka "cohorts" @@ -212,7 +225,13 @@ def find_group_cohorts(labels, chunks, merge=True, method="cohorts"): def rechunk_for_cohorts( - array, axis, labels, force_new_chunk_at, chunksize=None, ignore_old_chunks=False, debug=False + array, + axis: T_Axis, + labels, + force_new_chunk_at, + chunksize=None, + ignore_old_chunks=False, + debug=False, ): """ Rechunks array so that each new chunk contains groups that always occur together. @@ -297,7 +316,7 @@ def rechunk_for_cohorts( return array.rechunk({axis: newchunks}) -def rechunk_for_blockwise(array, axis, labels): +def rechunk_for_blockwise(array, axis: T_Axis, labels): """ Rechunks array so that group boundaries line up with chunk boundaries, allowing embarassingly parallel group reductions. @@ -330,7 +349,7 @@ def rechunk_for_blockwise(array, axis, labels): def reindex_( - array: np.ndarray, from_, to, fill_value=None, axis: int = -1, promote: bool = False + array: np.ndarray, from_, to, fill_value=None, axis: T_Axis = -1, promote: bool = False ) -> np.ndarray: if not isinstance(to, pd.Index): @@ -393,9 +412,9 @@ def offset_labels(labels: np.ndarray, ngroups: int) -> tuple[np.ndarray, int]: def factorize_( by: tuple, - axis, + axis: T_AxesOpt, expected_groups: tuple[pd.Index, ...] = None, - reindex=False, + reindex: bool = False, sort=True, fastpath=False, ): @@ -499,13 +518,13 @@ def factorize_( def chunk_argreduce( array_plus_idx: tuple[np.ndarray, ...], by: np.ndarray, - func: Sequence[str], + func: T_Funcs, expected_groups: pd.Index | None, - axis: int | Sequence[int], - fill_value: Mapping[str | Callable, Any], - dtype=None, + axis: T_AxesOpt, + fill_value: T_FillValues, + dtype: T_Dtypes = None, reindex: bool = False, - engine: str = "numpy", + engine: T_Engine = "numpy", sort: bool = True, ) -> IntermediateDict: """ @@ -551,15 +570,15 @@ def chunk_argreduce( def chunk_reduce( array: np.ndarray, by: np.ndarray, - func: str | Callable | Sequence[str] | Sequence[Callable], + func: T_Funcs, expected_groups: pd.Index | None, - axis: int | Sequence[int] = None, - fill_value: Mapping[str | Callable, Any] = None, - dtype=None, + axis: T_AxesOpt = None, + fill_value: T_FillValues = None, + dtype: T_Dtypes = None, reindex: bool = False, - engine: str = "numpy", - kwargs=None, - sort=True, + engine: T_Engine = "numpy", + kwargs: Sequence[dict] | None = None, + sort: bool = True, ) -> IntermediateDict: """ Wrapper for numpy_groupies aggregate that supports nD ``array`` and @@ -590,28 +609,39 @@ def chunk_reduce( dict """ - if dtype is not None: - assert isinstance(dtype, Sequence) - if fill_value is not None: - assert isinstance(fill_value, Sequence) - - if isinstance(func, str) or callable(func): - func = (func,) # type: ignore + if not (isinstance(func, str) or callable(func)): + funcs = func + else: + funcs = (func,) + nfuncs = len(funcs) - func: Sequence[str] | Sequence[Callable] + if isinstance(dtype, Sequence): + dtypes = dtype + else: + dtypes = (dtype,) * nfuncs + assert len(dtypes) >= nfuncs - nax = len(axis) if isinstance(axis, Sequence) else by.ndim - final_array_shape = array.shape[:-nax] + (1,) * (nax - 1) - final_groups_shape = (1,) * (nax - 1) + if isinstance(fill_value, Sequence): + fill_values = fill_value + else: + fill_values = (fill_value,) * nfuncs + assert len(fill_values) >= nfuncs - if isinstance(axis, Sequence) and len(axis) == 1: - axis = next(iter(axis)) + if isinstance(kwargs, Sequence): + kwargss = kwargs + else: + kwargss = ({},) * nfuncs + assert len(kwargss) >= nfuncs - if not isinstance(fill_value, Sequence): - fill_value = (fill_value,) + if isinstance(axis, Sequence): + nax = len(axis) + if nax == 1: + axis = axis[0] + else: + nax = by.ndim - if kwargs is None: - kwargs = ({},) * len(func) + final_array_shape = array.shape[:-nax] + (1,) * (nax - 1) + final_groups_shape = (1,) * (nax - 1) # when axis is a tuple # collapse and move reduction dimensions to the end @@ -661,10 +691,8 @@ def chunk_reduce( # we commonly have func=(..., "nanlen", "nanlen") when # counts are needed for the final result as well as for masking # optimize that out. - previous_reduction = None - for param in (fill_value, kwargs, dtype): - assert len(param) >= len(func) - for reduction, fv, kw, dt in zip(func, fill_value, kwargs, dtype): + previous_reduction: T_Func = "" + for reduction, fv, kw, dt in zip(funcs, fill_values, kwargss, dtypes): if empty: result = np.full(shape=final_array_shape, fill_value=fv) else: @@ -672,16 +700,16 @@ def chunk_reduce( result = results["intermediates"][-1] # fill_value here is necessary when reducing with "offset" groups - kwargs = dict(size=size, dtype=dt, fill_value=fv) - kwargs.update(kw) + kw_func = dict(size=size, dtype=dt, fill_value=fv) + kw_func.update(kw) if callable(reduction): # passing a custom reduction for npg to apply per-group is really slow! # So this `reduction` has to do the groupby-aggregation - result = reduction(group_idx, array, **kwargs) + result = reduction(group_idx, array, **kw_func) else: result = generic_aggregate( - group_idx, array, axis=-1, engine=engine, func=reduction, **kwargs + group_idx, array, axis=-1, engine=engine, func=reduction, **kw_func ).astype(dt, copy=False) if np.any(props.nanmask): # remove NaN group label which should be last @@ -693,7 +721,7 @@ def chunk_reduce( return results -def _squeeze_results(results: IntermediateDict, axis: Sequence) -> IntermediateDict: +def _squeeze_results(results: IntermediateDict, axis: T_Axes) -> IntermediateDict: # at the end we squeeze out extra dims groups = results["groups"] newresults: IntermediateDict = {"groups": [], "intermediates": []} @@ -716,11 +744,11 @@ def _split_groups(array, j, slicer): def _finalize_results( results: IntermediateDict, agg: Aggregation, - axis: Sequence[int], + axis: T_Axes, expected_groups: pd.Index | None, fill_value: Any, reindex: bool, -): +) -> FinalResultsDict: """Finalize results by 1. Squeezing out dummy dimensions 2. Calling agg.finalize with intermediate results @@ -734,7 +762,7 @@ def _finalize_results( squeezed["intermediates"] = squeezed["intermediates"][:-1] # finalize step - finalized: dict[str, DaskArray | np.ndarray] = {} + finalized: FinalResultsDict = {} if agg.finalize is None: finalized[agg.name] = squeezed["intermediates"][0] else: @@ -770,7 +798,7 @@ def _aggregate( combine: Callable, agg: Aggregation, expected_groups: pd.Index | None, - axis: Sequence, + axis: T_Axes, keepdims, fill_value: Any, reindex: bool, @@ -788,7 +816,7 @@ def _expand_dims(results: IntermediateDict) -> IntermediateDict: def _simple_combine( - x_chunk, agg: Aggregation, axis: Sequence, keepdims: bool, is_aggregate: bool = False + x_chunk, agg: Aggregation, axis: T_Axes, keepdims: bool, is_aggregate: bool = False ) -> IntermediateDict: """ 'Simple' combination of blockwise results. @@ -802,12 +830,13 @@ def _simple_combine( """ from dask.array.core import deepfirst - results = {"groups": deepfirst(x_chunk)["groups"]} + results: IntermediateDict = {"groups": deepfirst(x_chunk)["groups"]} results["intermediates"] = [] + axis_ = axis[:-1] + (DUMMY_AXIS,) for idx, combine in enumerate(agg.combine): - array = _conc2(x_chunk, key1="intermediates", key2=idx, axis=axis[:-1] + (DUMMY_AXIS,)) + array = _conc2(x_chunk, key1="intermediates", key2=idx, axis=axis_) assert array.ndim >= 2 - result = getattr(np, combine)(array, axis=axis[:-1] + (DUMMY_AXIS,), keepdims=True) + result = getattr(np, combine)(array, axis=axis_, keepdims=True) if is_aggregate: # squeeze out DUMMY_AXIS if this is the last step i.e. called from _aggregate result = result.squeeze(axis=DUMMY_AXIS) @@ -815,7 +844,7 @@ def _simple_combine( return results -def _conc2(x_chunk, key1, key2=slice(None), axis=None) -> np.ndarray: +def _conc2(x_chunk, key1, key2=slice(None), axis: T_Axes = None) -> np.ndarray: """copied from dask.array.reductions.mean_combine""" from dask.array.core import _concatenate2 from dask.utils import deepmap @@ -849,10 +878,10 @@ def listify_groups(x): def _grouped_combine( x_chunk, agg: Aggregation, - axis: Sequence, + axis: T_Axes, keepdims: bool, - neg_axis: Sequence, - engine: str, + neg_axis: T_Axes, + engine: T_Engine, is_aggregate: bool = False, sort: bool = True, ) -> IntermediateDict: @@ -896,7 +925,7 @@ def _grouped_combine( # This happens when we are reducing along an axis with a single chunk. avoid_reduction = array_idx[0].shape[axis[0]] == 1 if avoid_reduction: - results = {"groups": groups, "intermediates": list(array_idx)} + results: IntermediateDict = {"groups": groups, "intermediates": list(array_idx)} else: results = chunk_argreduce( array_idx, @@ -995,7 +1024,9 @@ def split_blocks(applied, split_out, expected_groups, split_name): return intermediate, group_chunks -def _reduce_blockwise(array, by, agg, *, axis, expected_groups, fill_value, engine, sort, reindex): +def _reduce_blockwise( + array, by, agg, *, axis: T_Axes, expected_groups, fill_value, engine: T_Engine, sort, reindex +) -> FinalResultsDict: """ Blockwise groupby reduction that produces the final result. This code path is also used for non-dask array aggregations. @@ -1026,7 +1057,7 @@ def _reduce_blockwise(array, by, agg, *, axis, expected_groups, fill_value, engi engine=engine, sort=sort, reindex=reindex, - ) # type: ignore + ) if _is_arg_reduction(agg): results["intermediates"][0] = np.unravel_index(results["intermediates"][0], array.shape)[-1] @@ -1042,14 +1073,14 @@ def dask_groupby_agg( by: DaskArray | np.ndarray, agg: Aggregation, expected_groups: pd.Index | None, - axis: Sequence = None, + axis: T_Axes = (), split_out: int = 1, fill_value: Any = None, - method: str = "map-reduce", + method: T_Method = "map-reduce", reindex: bool = False, - engine: str = "numpy", + engine: T_Engine = "numpy", sort: bool = True, -) -> tuple[DaskArray, np.ndarray | DaskArray]: +) -> tuple[DaskArray, tuple[np.ndarray | DaskArray]]: import dask.array from dask.array.core import slices_from_chunks @@ -1161,11 +1192,11 @@ def dask_groupby_agg( # these are negative axis indices useful for concatenating the intermediates neg_axis = tuple(range(-len(axis), 0)) - combine = ( - _simple_combine - if do_simple_combine - else partial(_grouped_combine, engine=engine, neg_axis=neg_axis, sort=sort) - ) + combine: Callable[..., IntermediateDict] + if do_simple_combine: + combine = _simple_combine + else: + combine = partial(_grouped_combine, engine=engine, neg_axis=neg_axis, sort=sort) # reduced is really a dict mapping reduction name to array # and "groups" to an array of group labels @@ -1204,13 +1235,12 @@ def dask_groupby_agg( groups_in_block = tuple( np.intersect1d(by_input[slc], expected_groups) for slc in slices ) - ngroups_per_block = tuple(len(groups) for groups in groups_in_block) + ngroups_per_block = tuple(len(grp) for grp in groups_in_block) output_chunks = reduced.chunks[: -(len(axis))] + (ngroups_per_block,) else: raise ValueError(f"Unknown method={method}.") # extract results from the dict - result: dict = {} layer: dict[tuple, tuple] = {} ochunks = tuple(range(len(chunks_v)) for chunks_v in output_chunks) if is_duck_dask_array(by_input) and expected_groups is None: @@ -1222,7 +1252,7 @@ def dask_groupby_agg( (reduced.name, *first_block), "groups", ) - groups = ( + groups: tuple[np.ndarray | DaskArray] = ( dask.array.Array( HighLevelGraph.from_collections(groups_name, layer, dependencies=[reduced]), groups_name, @@ -1233,12 +1263,14 @@ def dask_groupby_agg( else: if method == "map-reduce": if expected_groups is None: - expected_groups = _get_expected_groups(by_input, sort=sort) - groups = (expected_groups.to_numpy(),) + expected_groups_ = _get_expected_groups(by_input, sort=sort) + else: + expected_groups_ = expected_groups + groups = (expected_groups_.to_numpy(),) else: groups = (np.concatenate(groups_in_block),) - layer: dict[tuple, tuple] = {} # type: ignore + layer2: dict[tuple, tuple] = {} agg_name = f"{name}-{token}" for ochunk in itertools.product(*ochunks): if method == "blockwise": @@ -1249,24 +1281,24 @@ def dask_groupby_agg( inchunk = ochunk[:-1] + np.unravel_index(ochunk[-1], nblocks) else: inchunk = ochunk[:-1] + (0,) * len(axis) + (ochunk[-1],) * int(split_out > 1) - layer[(agg_name, *ochunk)] = (operator.getitem, (reduced.name, *inchunk), agg.name) + layer2[(agg_name, *ochunk)] = (operator.getitem, (reduced.name, *inchunk), agg.name) result = dask.array.Array( - HighLevelGraph.from_collections(agg_name, layer, dependencies=[reduced]), + HighLevelGraph.from_collections(agg_name, layer2, dependencies=[reduced]), agg_name, chunks=output_chunks, dtype=agg.dtype[agg.name], ) - return (result, *groups) - + return (result, groups) -def _validate_reindex(reindex: bool, func, method, expected_groups) -> bool: - if reindex is True and _is_arg_reduction(func): - raise NotImplementedError - if method == "blockwise" and reindex is True: - raise NotImplementedError +def _validate_reindex(reindex: bool | None, func, method: T_Method, expected_groups) -> bool | None: + if reindex is True: + if _is_arg_reduction(func): + raise NotImplementedError + if method == "blockwise": + raise NotImplementedError if method == "blockwise" or _is_arg_reduction(func): reindex = False @@ -1277,6 +1309,8 @@ def _validate_reindex(reindex: bool, func, method, expected_groups) -> bool: if method in ["split-reduce", "cohorts"] and reindex is False: raise NotImplementedError + # TODO: Should reindex be a bool-only at this point? Would've been nice but + # None's are relied on after this function as well. return reindex @@ -1294,8 +1328,8 @@ def _assert_by_is_aligned(shape, by): def _convert_expected_groups_to_index( expected_groups: Iterable, isbin: Sequence[bool], sort: bool -) -> tuple[pd.Index | None]: - out = [] +) -> tuple[pd.Index | None, ...]: + out: list[pd.Index | None] = [] for ex, isbin_ in zip(expected_groups, isbin): if isinstance(ex, pd.IntervalIndex) or (isinstance(ex, pd.Index) and not isbin): if sort: @@ -1358,13 +1392,13 @@ def groupby_reduce( func: str | Aggregation, expected_groups: Sequence | np.ndarray | None = None, sort: bool = True, - isbin: bool = False, - axis=None, + isbin: T_IsBins = False, + axis: T_AxesOpt = None, fill_value=None, min_count: int | None = None, split_out: int = 1, - method: str = "map-reduce", - engine: str = "numpy", + method: T_Method = "map-reduce", + engine: T_Engine = "numpy", reindex: bool | None = None, finalize_kwargs: Mapping | None = None, ) -> tuple[DaskArray, np.ndarray | DaskArray]: @@ -1469,9 +1503,9 @@ def groupby_reduce( ) reindex = _validate_reindex(reindex, func, method, expected_groups) - by: tuple = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) - nby = len(by) - by_is_dask = any(is_duck_dask_array(b) for b in by) + bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) + nby = len(bys) + by_is_dask = any(is_duck_dask_array(b) for b in bys) if method in ["split-reduce", "cohorts"] and by_is_dask: raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.") @@ -1480,54 +1514,58 @@ def groupby_reduce( array = np.asarray(array) array = array.astype(int) if np.issubdtype(array.dtype, bool) else array - if isinstance(isbin, bool): - isbin = (isbin,) * len(by) + if isinstance(isbin, Sequence): + isbins = isbin + else: + isbins = (isbin,) * nby if expected_groups is None: - expected_groups = (None,) * len(by) + expected_groups = (None,) * nby - _assert_by_is_aligned(array.shape, by) + _assert_by_is_aligned(array.shape, bys) - if len(by) == 1 and not isinstance(expected_groups, tuple): + if nby == 1 and not isinstance(expected_groups, tuple): expected_groups = (np.asarray(expected_groups),) - elif len(expected_groups) != len(by): + elif len(expected_groups) != nby: raise ValueError( f"Must have same number of `expected_groups` (received {len(expected_groups)}) " - f" and variables to group by (received {len(by)})." + f" and variables to group by (received {nby})." ) # We convert to pd.Index since that lets us know if we are binning or not # (pd.IntervalIndex or not) - expected_groups = _convert_expected_groups_to_index(expected_groups, isbin, sort) + expected_groups = _convert_expected_groups_to_index(expected_groups, isbins, sort) # TODO: could restrict this to dask-only factorize_early = (nby > 1) or ( - any(isbin) and method in ["split-reduce", "cohorts"] and is_duck_dask_array(array) + any(isbins) and method in ["split-reduce", "cohorts"] and is_duck_dask_array(array) ) if factorize_early: - by, final_groups, grp_shape = _factorize_multiple( - by, expected_groups, by_is_dask=by_is_dask, reindex=reindex + bys, final_groups, grp_shape = _factorize_multiple( + bys, expected_groups, by_is_dask=by_is_dask, reindex=reindex ) expected_groups = (pd.RangeIndex(math.prod(grp_shape)),) - assert len(by) == 1 - by = by[0] + assert len(bys) == 1 + by_ = bys[0] expected_groups = expected_groups[0] if axis is None: - axis = tuple(array.ndim + np.arange(-by.ndim, 0)) + axis_ = tuple(array.ndim + np.arange(-by_.ndim, 0)) else: - axis = np.core.numeric.normalize_axis_tuple(axis, array.ndim) # type: ignore + # TODO: How come this function doesn't exist according to mypy? + axis_ = np.core.numeric.normalize_axis_tuple(axis, array.ndim) # type: ignore + nax = len(axis_) - if method in ["blockwise", "cohorts", "split-reduce"] and len(axis) != by.ndim: + if method in ["blockwise", "cohorts", "split-reduce"] and nax != by_.ndim: raise NotImplementedError( "Must reduce along all dimensions of `by` when method != 'map-reduce'." f"Received method={method!r}" ) # TODO: make sure expected_groups is unique - if len(axis) == 1 and by.ndim > 1 and expected_groups is None: + if nax == 1 and by_.ndim > 1 and expected_groups is None: if not by_is_dask: - expected_groups = _get_expected_groups(by, sort) + expected_groups = _get_expected_groups(by_, sort) else: # When we reduce along all axes, we are guaranteed to see all # groups in the final combine stage, so everything works. @@ -1540,13 +1578,14 @@ def groupby_reduce( "Please provide ``expected_groups`` when not reducing along all axes." ) - assert len(axis) <= by.ndim - if len(axis) < by.ndim: - by = _move_reduce_dims_to_end(by, -array.ndim + np.array(axis) + by.ndim) - array = _move_reduce_dims_to_end(array, axis) - axis = tuple(array.ndim + np.arange(-len(axis), 0)) + assert nax <= by_.ndim + if nax < by_.ndim: + by_ = _move_reduce_dims_to_end(by_, tuple(-array.ndim + ax + by_.ndim for ax in axis_)) + array = _move_reduce_dims_to_end(array, axis_) + axis_ = tuple(array.ndim + np.arange(-nax, 0)) + nax = len(axis_) - has_dask = is_duck_dask_array(array) or is_duck_dask_array(by) + has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) # When axis is a subset of possible values; then npg will # apply it to groups that don't exist along a particular axis (for e.g.) @@ -1555,7 +1594,7 @@ def groupby_reduce( # The only way to do this consistently is mask out using min_count # Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0 if min_count is None: - if len(axis) < by.ndim or fill_value is not None: + if nax < by_.ndim or fill_value is not None: min_count = 1 # TODO: set in xarray? @@ -1564,20 +1603,24 @@ def groupby_reduce( # overwrite than when min_count is set fill_value = np.nan - kwargs = dict(axis=axis, fill_value=fill_value, engine=engine) + kwargs = dict(axis=axis_, fill_value=fill_value, engine=engine) agg = _initialize_aggregation(func, array.dtype, fill_value, min_count, finalize_kwargs) if not has_dask: results = _reduce_blockwise( - array, by, agg, expected_groups=expected_groups, reindex=reindex, sort=sort, **kwargs + array, by_, agg, expected_groups=expected_groups, reindex=reindex, sort=sort, **kwargs ) groups = (results["groups"],) result = results[agg.name] else: + if TYPE_CHECKING: + # TODO: How else to narrow that array.chunks is there? + assert isinstance(array, DaskArray) + if agg.chunk[0] is None and method != "blockwise": raise NotImplementedError( - f"Aggregation {func.name!r} is only implemented for dask arrays when method='blockwise'." + f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'." f"\n\n Received: {func}" ) @@ -1589,25 +1632,25 @@ def groupby_reduce( if method in ["split-reduce", "cohorts"]: cohorts = find_group_cohorts( - by, [array.chunks[ax] for ax in axis], merge=True, method=method + by_, [array.chunks[ax] for ax in axis_], merge=True, method=method ) - results = [] + results_ = [] groups_ = [] for cohort in cohorts: cohort = sorted(cohort) # equivalent of xarray.DataArray.where(mask, drop=True) - mask = np.isin(by, cohort) + mask = np.isin(by_, cohort) indexer = [np.unique(v) for v in np.nonzero(mask)] array_subset = array - for ax, idxr in zip(range(-by.ndim, 0), indexer): + for ax, idxr in zip(range(-by_.ndim, 0), indexer): array_subset = np.take(array_subset, idxr, axis=ax) - numblocks = math.prod([len(array_subset.chunks[ax]) for ax in axis]) + numblocks = math.prod([len(array_subset.chunks[ax]) for ax in axis_]) # get final result for these groups r, *g = partial_agg( array_subset, - by[np.ix_(*indexer)], + by_[np.ix_(*indexer)], expected_groups=pd.Index(cohort), # First deep copy becasue we might be doping blockwise, # which sets agg.finalize=None, then map-reduce (GH102) @@ -1620,22 +1663,22 @@ def groupby_reduce( sort=False, # if only a single block along axis, we can just work blockwise # inspired by https://github.com/dask/dask/issues/8361 - method="blockwise" if numblocks == 1 and len(axis) == by.ndim else "map-reduce", + method="blockwise" if numblocks == 1 and nax == by_.ndim else "map-reduce", ) - results.append(r) + results_.append(r) groups_.append(cohort) # concatenate results together, # sort to make sure we match expected output groups = (np.hstack(groups_),) - result = np.concatenate(results, axis=-1) + result = np.concatenate(results_, axis=-1) else: - if method == "blockwise" and by.ndim == 1: - array = rechunk_for_blockwise(array, axis=-1, labels=by) + if method == "blockwise" and by_.ndim == 1: + array = rechunk_for_blockwise(array, axis=-1, labels=by_) - result, *groups = partial_agg( + result, groups = partial_agg( array, - by, + by_, expected_groups=None if method == "blockwise" else expected_groups, agg=agg, reindex=reindex, From 8fc197741a3eb4d33d2126c8cee39cb8c4fa2746 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 7 Oct 2022 13:47:12 -0600 Subject: [PATCH 16/17] Refactor before redoing cohorts (#164) --- asv_bench/benchmarks/combine.py | 2 +- flox/core.py | 81 ++++++++++++++++++--------------- 2 files changed, 46 insertions(+), 37 deletions(-) diff --git a/asv_bench/benchmarks/combine.py b/asv_bench/benchmarks/combine.py index 8d86f56db..2da0b1392 100644 --- a/asv_bench/benchmarks/combine.py +++ b/asv_bench/benchmarks/combine.py @@ -58,4 +58,4 @@ def construct_member(groups): ] self.x_chunk_cohorts = [construct_member(groups) for groups in [np.array((1, 2, 3, 4))] * 4] - self.kwargs = {"agg": flox.aggregations.mean, "axis": (3,), "neg_axis": (-1,)} + self.kwargs = {"agg": flox.aggregations.mean, "axis": (3,)} diff --git a/flox/core.py b/flox/core.py index a73a97398..10a9197a9 100644 --- a/flox/core.py +++ b/flox/core.py @@ -880,7 +880,6 @@ def _grouped_combine( agg: Aggregation, axis: T_Axes, keepdims: bool, - neg_axis: T_Axes, engine: T_Engine, is_aggregate: bool = False, sort: bool = True, @@ -906,6 +905,9 @@ def _grouped_combine( partial(reindex_intermediates, agg=agg, unique_groups=unique_groups), x_chunk ) + # these are negative axis indices useful for concatenating the intermediates + neg_axis = tuple(range(-len(axis), 0)) + groups = _conc2(x_chunk, "groups", axis=neg_axis) if agg.reduction_type == "argreduce": @@ -1068,6 +1070,30 @@ def _reduce_blockwise( return result +def _extract_unknown_groups(reduced, group_chunks, dtype) -> tuple[DaskArray]: + import dask.array + from dask.highlevelgraph import HighLevelGraph + + layer: dict[tuple, tuple] = {} + groups_token = f"group-{reduced.name}" + first_block = reduced.ndim * (0,) + layer[(groups_token, *first_block)] = ( + operator.getitem, + (reduced.name, *first_block), + "groups", + ) + groups: tuple[DaskArray] = ( + dask.array.Array( + HighLevelGraph.from_collections(groups_token, layer, dependencies=[reduced]), + groups_token, + chunks=group_chunks, + meta=np.array([], dtype=dtype), + ), + ) + + return groups + + def dask_groupby_agg( array: DaskArray, by: DaskArray | np.ndarray, @@ -1189,14 +1215,11 @@ def dask_groupby_agg( group_chunks = ((len(expected_groups),) if expected_groups is not None else (np.nan,),) if method == "map-reduce": - # these are negative axis indices useful for concatenating the intermediates - neg_axis = tuple(range(-len(axis), 0)) - combine: Callable[..., IntermediateDict] if do_simple_combine: combine = _simple_combine else: - combine = partial(_grouped_combine, engine=engine, neg_axis=neg_axis, sort=sort) + combine = partial(_grouped_combine, engine=engine, sort=sort) # reduced is really a dict mapping reduction name to array # and "groups" to an array of group labels @@ -1219,10 +1242,19 @@ def dask_groupby_agg( keepdims=True, concatenate=False, ) - output_chunks = reduced.chunks[: -(len(axis) + int(split_out > 1))] + group_chunks + + if is_duck_dask_array(by_input) and expected_groups is None: + groups = _extract_unknown_groups(reduced, group_chunks=group_chunks, dtype=by.dtype) + else: + if expected_groups is None: + expected_groups_ = _get_expected_groups(by_input, sort=sort) + else: + expected_groups_ = expected_groups + groups = (expected_groups_.to_numpy(),) + elif method == "blockwise": reduced = intermediate - # Here one input chunk → one output chunka + # Here one input chunk → one output chunks # find number of groups in each chunk, this is needed for output chunks # along the reduced axis slices = slices_from_chunks(tuple(array.chunks[ax] for ax in axis)) @@ -1235,41 +1267,17 @@ def dask_groupby_agg( groups_in_block = tuple( np.intersect1d(by_input[slc], expected_groups) for slc in slices ) + groups = (np.concatenate(groups_in_block),) + ngroups_per_block = tuple(len(grp) for grp in groups_in_block) - output_chunks = reduced.chunks[: -(len(axis))] + (ngroups_per_block,) + group_chunks = (ngroups_per_block,) + else: raise ValueError(f"Unknown method={method}.") # extract results from the dict - layer: dict[tuple, tuple] = {} + output_chunks = reduced.chunks[: -(len(axis) + int(split_out > 1))] + group_chunks ochunks = tuple(range(len(chunks_v)) for chunks_v in output_chunks) - if is_duck_dask_array(by_input) and expected_groups is None: - groups_name = f"groups-{name}-{token}" - # we've used keepdims=True, so _tree_reduce preserves some dummy dimensions - first_block = len(ochunks) * (0,) - layer[(groups_name, *first_block)] = ( - operator.getitem, - (reduced.name, *first_block), - "groups", - ) - groups: tuple[np.ndarray | DaskArray] = ( - dask.array.Array( - HighLevelGraph.from_collections(groups_name, layer, dependencies=[reduced]), - groups_name, - chunks=group_chunks, - dtype=by.dtype, - ), - ) - else: - if method == "map-reduce": - if expected_groups is None: - expected_groups_ = _get_expected_groups(by_input, sort=sort) - else: - expected_groups_ = expected_groups - groups = (expected_groups_.to_numpy(),) - else: - groups = (np.concatenate(groups_in_block),) - layer2: dict[tuple, tuple] = {} agg_name = f"{name}-{token}" for ochunk in itertools.product(*ochunks): @@ -1624,6 +1632,7 @@ def groupby_reduce( f"\n\n Received: {func}" ) + # TODO: just do this in dask_groupby_agg # we always need some fill_value (see above) so choose the default if needed if kwargs["fill_value"] is None: kwargs["fill_value"] = agg.fill_value[agg.name] From a1fb701dcb7315f8b5ab9952b0c00b9efceba845 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 7 Oct 2022 16:45:58 -0600 Subject: [PATCH 17/17] Update ci-additional.yaml (#167) --- .github/workflows/ci-additional.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-additional.yaml b/.github/workflows/ci-additional.yaml index c326429bb..605e20cf0 100644 --- a/.github/workflows/ci-additional.yaml +++ b/.github/workflows/ci-additional.yaml @@ -2,7 +2,7 @@ name: CI Additional on: push: branches: - - "*" + - "main" pull_request: branches: - "*"