From 885c704bd860406c623f993df9e88570acc0a37a Mon Sep 17 00:00:00 2001 From: arunjose696 Date: Wed, 11 Oct 2023 11:45:31 +0000 Subject: [PATCH] changes --- .github/workflows/ci.yml | 28 ++--- .../test/internals/test_benchmark_mode.py | 117 +++++++++--------- requirements/env_unidist_linux.yml | 7 +- 3 files changed, 77 insertions(+), 75 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e121396a0fa..b70bde26821 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -97,7 +97,7 @@ jobs: run: | MODIN_ENGINE=dask python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))" MODIN_ENGINE=ray python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))" - MODIN_ENGINE=unidist UNIDIST_BACKEND=mpi mpiexec -n 1 python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))" + MODIN_ENGINE=unidist UNIDIST_BACKEND=mpi mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -c "import modin.pandas as pd; print(pd.DataFrame([1,2,3]))" test-internals: needs: [lint-flake8, lint-black-isort] @@ -360,26 +360,26 @@ jobs: run: | sudo docker pull postgres sudo docker run --name some-postgres -e POSTGRES_USER=sa -e POSTGRES_PASSWORD=Strong.Pwd-123 -e POSTGRES_DB=postgres -d -p 2345:5432 postgres - - run: mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_benchmark_mode.py - - run: mpiexec -n 1 python -m pytest modin/pandas/test/internals/test_repartition.py - - run: mpiexec -n 1 python -m pytest modin/test/test_partition_api.py + - run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/pandas/test/internals/test_benchmark_mode.py + - run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/pandas/test/internals/test_repartition.py + - run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/test/test_partition_api.py - uses: ./.github/actions/run-core-tests with: - runner: mpiexec -n 1 python -m pytest + runner: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest parallel: "" - - run: mpiexec -n 1 python -m pytest modin/numpy/test + - run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/numpy/test - run: chmod +x ./.github/workflows/sql_server/set_up_sql_server.sh - run: ./.github/workflows/sql_server/set_up_sql_server.sh - # need an extra argument "genv" to set environment variables for mpiexec. We need + # need an extra argument "genv" to set environment variables for mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist. We need # these variables to test writing to the mock s3 filesystem. - - run: mpiexec -n 1 -genv AWS_ACCESS_KEY_ID foobar_key -genv AWS_SECRET_ACCESS_KEY foobar_secret python -m pytest modin/pandas/test/test_io.py --verbose - - run: mpiexec -n 1 python -m pytest modin/experimental/pandas/test/test_io_exp.py - - run: mpiexec -n 1 python -m pytest modin/experimental/sql/test/test_sql.py - - run: mpiexec -n 1 python -m pytest modin/test/interchange/dataframe_protocol/test_general.py - - run: mpiexec -n 1 python -m pytest modin/test/interchange/dataframe_protocol/pandas/test_protocol.py + - run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe -x AWS_ACCESS_KEY_ID foobar_key -x AWS_SECRET_ACCESS_KEY foobar_secret python -m pytest modin/pandas/test/test_io.py --verbose + - run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/experimental/pandas/test/test_io_exp.py + - run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/experimental/sql/test/test_sql.py + - run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/test/interchange/dataframe_protocol/test_general.py + - run: mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/test/interchange/dataframe_protocol/pandas/test_protocol.py - run: | python -m pip install lazy_import - mpiexec -n 1 python -m pytest modin/pandas/test/integrations/ + mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe python -m pytest modin/pandas/test/integrations/ - uses: ./.github/actions/upload-coverage test-all: @@ -533,7 +533,7 @@ jobs: shell-ex: "python -m pytest" if: needs.execution-filter.dask != 'true' - name: unidist - shell-ex: "mpiexec -n 1 -genv AWS_ACCESS_KEY_ID foobar_key -genv AWS_SECRET_ACCESS_KEY foobar_secret python -m pytest" + shell-ex: "mpiexec --prefix /usr/share/miniconda3/envs/modin_on_unidist -x UNIDIST_MPI_SHARED_OBJECT_STORE=True -n 1 --oversubscribe -x AWS_ACCESS_KEY_ID foobar_key -x AWS_SECRET_ACCESS_KEY foobar_secret python -m pytest" if: needs.execution-filter.unidist != 'true' runs-on: ${{ matrix.os }}-latest defaults: diff --git a/modin/pandas/test/internals/test_benchmark_mode.py b/modin/pandas/test/internals/test_benchmark_mode.py index 40e341aeb61..28cf915456c 100644 --- a/modin/pandas/test/internals/test_benchmark_mode.py +++ b/modin/pandas/test/internals/test_benchmark_mode.py @@ -1,62 +1,61 @@ -# Licensed to Modin Development Team under one or more contributor license agreements. -# See the NOTICE file distributed with this work for additional information regarding -# copyright ownership. The Modin Development Team licenses this file to you under the -# Apache License, Version 2.0 (the "License"); you may not use this file except in -# compliance with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software distributed under -# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF -# ANY KIND, either express or implied. See the License for the specific language -# governing permissions and limitations under the License. - -import unittest.mock as mock - -import pytest - -import modin.pandas as pd -from modin.config import Engine - -engine = Engine.get() - -# We have to explicitly mock subclass implementations of wait_partitions. -if engine == "Ray": - wait_method = ( - "modin.core.execution.ray.implementations." - + "pandas_on_ray.partitioning." - + "PandasOnRayDataframePartitionManager.wait_partitions" - ) -elif engine == "Dask": - wait_method = ( - "modin.core.execution.dask.implementations." - + "pandas_on_dask.partitioning." - + "PandasOnDaskDataframePartitionManager.wait_partitions" - ) -elif engine == "Unidist": - wait_method = ( - "modin.core.execution.unidist.implementations." - + "pandas_on_unidist.partitioning." - + "PandasOnUnidistDataframePartitionManager.wait_partitions" - ) -else: - wait_method = ( - "modin.core.dataframe.pandas.partitioning." - + "partition_manager.PandasDataframePartitionManager.wait_partitions" +import sys +import mpi4py + +mpi4py.rc(recv_mprobe=False, initialize=False) +from mpi4py import MPI # noqa: E402 + +######################## +# Threads initializing # +######################## + +MPI.Init_thread() + +comm = MPI.COMM_WORLD +rank = comm.Get_rank() +size = comm.Get_size() + +parent_comm = MPI.Comm.Get_parent() + +if rank == 0 and parent_comm == MPI.COMM_NULL and size == 1: + nprocs_to_spawn = 4 + args = ["reproducer.py"] + info = MPI.Info.Create() + intercomm = MPI.COMM_SELF.Spawn( + sys.executable, + args, + maxprocs=nprocs_to_spawn, + info=info, + root=rank, ) +####################### +# Merge communicators # +####################### -@pytest.mark.parametrize("set_benchmark_mode", [False], indirect=True) -def test_turn_off(set_benchmark_mode): - df = pd.DataFrame([0]) - with mock.patch(wait_method) as wait: - df.dropna() - wait.assert_not_called() - - -@pytest.mark.parametrize("set_benchmark_mode", [True], indirect=True) -def test_turn_on(set_benchmark_mode): - df = pd.DataFrame([0]) - with mock.patch(wait_method) as wait: - df.dropna() - wait.assert_called() +# Just comment the foollowing block if you want to run without communicators merging. +if parent_comm != MPI.COMM_NULL: + comm = parent_comm.Merge(high=True) +else: + comm = intercomm.Merge(high=False) + +# rank = comm.Get_rank() +# size = comm.Get_size() + +# if rank == 0: +# print(f"size = {size}") +# data = {"a": 1} +# for r in range(2, size): +# comm.send(data, dest=r, tag=11) +# comm.send(data, dest=r, tag=12) +# data2 = comm.recv(source=r, tag=11) +# data2 = comm.recv(source=r, tag=12) +# elif rank == 1: + +# else: +# data = comm.recv(source=0, tag=11) +# data = comm.recv(source=0, tag=12) +# comm.send(data, dest=0, tag=11) +# comm.send(data, dest=0, tag=12) + +if not MPI.Is_finalized(): + MPI.Finalize() diff --git a/requirements/env_unidist_linux.yml b/requirements/env_unidist_linux.yml index 7589560c2b9..50dac857bd7 100644 --- a/requirements/env_unidist_linux.yml +++ b/requirements/env_unidist_linux.yml @@ -7,8 +7,10 @@ dependencies: # required dependencies - pandas>=2.1,<2.2 - numpy>=1.22.4 - - unidist-mpi>=0.2.1 - - mpich + - cython + - openmpi + - mpi4py + - msgpack-python - fsspec>=2022.05.0 - packaging>=21.0 - psutil>=5.8.0 @@ -59,3 +61,4 @@ dependencies: - connectorx>=0.2.6a4 # The `numpydoc` version should match the version installed in the `lint-pydocstyle` job of the CI. - numpydoc==1.1.0 + - git+https://github.com/YarShev/unidist.git@dev/yigoshev-thr