From 97dbc52258d2b311e5c51ae06c3aba57e539a64c Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 7 Nov 2024 14:14:03 -0500 Subject: [PATCH] ingest `_{{test_,}eager_iter,fast_csr}.py` from SOMA core --- apis/python/src/tiledbsoma/_eager_iter.py | 51 ++++ apis/python/src/tiledbsoma/_fast_csr.py | 276 ++++++++++++++++++++++ apis/python/src/tiledbsoma/_query.py | 20 +- apis/python/src/tiledbsoma/_read_iters.py | 2 +- apis/python/src/tiledbsoma/_types.py | 2 + apis/python/tests/test_eager_iter.py | 64 +++++ 6 files changed, 405 insertions(+), 10 deletions(-) create mode 100644 apis/python/src/tiledbsoma/_eager_iter.py create mode 100644 apis/python/src/tiledbsoma/_fast_csr.py create mode 100644 apis/python/tests/test_eager_iter.py diff --git a/apis/python/src/tiledbsoma/_eager_iter.py b/apis/python/src/tiledbsoma/_eager_iter.py new file mode 100644 index 0000000000..c42e52c1ab --- /dev/null +++ b/apis/python/src/tiledbsoma/_eager_iter.py @@ -0,0 +1,51 @@ +from concurrent import futures +from typing import Iterator, Optional, TypeVar + +_T = TypeVar("_T") + + +class EagerIterator(Iterator[_T]): + def __init__( + self, + iterator: Iterator[_T], + pool: Optional[futures.Executor] = None, + ): + super().__init__() + self.iterator = iterator + self._pool = pool or futures.ThreadPoolExecutor() + self._own_pool = pool is None + self._preload_future = self._pool.submit(self.iterator.__next__) + + def __next__(self) -> _T: + stopped = False + try: + if self._preload_future.cancel(): + # If `.cancel` returns True, cancellation was successful. + # The self.iterator.__next__ call has not yet been started, + # and will never be started, so we can compute next ourselves. + # This prevents deadlocks if the thread pool is too small + # and we can never create a preload thread. + return next(self.iterator) + # `.cancel` returned false, so the preload is already running. + # Just wait for it. + return self._preload_future.result() + except StopIteration: + self._cleanup() + stopped = True + raise + finally: + if not stopped: + # If we have more to do, go for the next thing. + self._preload_future = self._pool.submit(self.iterator.__next__) + + def _cleanup(self) -> None: + if self._own_pool: + self._pool.shutdown() + + def __del__(self) -> None: + # Ensure the threadpool is cleaned up in the case where the + # iterator is not exhausted. For more information on __del__: + # https://docs.python.org/3/reference/datamodel.html#object.__del__ + self._cleanup() + super_del = getattr(super(), "__del__", lambda: None) + super_del() diff --git a/apis/python/src/tiledbsoma/_fast_csr.py b/apis/python/src/tiledbsoma/_fast_csr.py new file mode 100644 index 0000000000..316b597a8c --- /dev/null +++ b/apis/python/src/tiledbsoma/_fast_csr.py @@ -0,0 +1,276 @@ +import os +from concurrent.futures import Executor, ThreadPoolExecutor, wait +from typing import List, NamedTuple, Tuple, Type, Union, cast + +import numba +import numba.typed +import numpy as np +import numpy.typing as npt +import pyarrow as pa +import scipy.sparse as sp +from somacore.query.types import IndexFactory, IndexLike + +from ._eager_iter import EagerIterator +from ._sparse_nd_array import SparseNDArray +from ._types import NPIntArray, NPNDArray + + +def read_csr( + matrix: SparseNDArray, + obs_joinids: pa.IntegerArray, + var_joinids: pa.IntegerArray, + index_factory: IndexFactory, +) -> "AccumulatedCSR": + if not isinstance(matrix, SparseNDArray) or matrix.ndim != 2: + raise TypeError("Can only read from a 2D SparseNDArray") + + max_workers = (os.cpu_count() or 4) + 2 + with ThreadPoolExecutor(max_workers=max_workers) as pool: + acc = _CSRAccumulator( + obs_joinids=obs_joinids, + var_joinids=var_joinids, + pool=pool, + index_factory=index_factory, + ) + for tbl in EagerIterator( + matrix.read((obs_joinids, var_joinids)).tables(), + pool=pool, + ): + acc.append(tbl["soma_dim_0"], tbl["soma_dim_1"], tbl["soma_data"]) + + return acc.finalize() + + +class AccumulatedCSR(NamedTuple): + """ + Private. + + Return type for the _CSRAccumulator.finalize method. + Contains a sparse CSR's constituent elements. + """ + + data: NPNDArray + indptr: NPIntArray + indices: NPIntArray + shape: Tuple[int, int] + + def to_scipy(self) -> sp.csr_matrix: + """Create a Scipy ``sparse.csr_matrix`` from component elements. + + Conceptually, this is identical to:: + + sparse.csr_matrix((data, indices, indptr), shape=shape) + + This ugliness is to bypass the O(N) scan that + :meth:`sparse._cs_matrix.__init__` + does when a new compressed matrix is created. + + See `SciPy bug 11496 ` + for details. + """ + matrix = sp.csr_matrix.__new__(sp.csr_matrix) + matrix.data = self.data + matrix.indptr = self.indptr + matrix.indices = self.indices + matrix._shape = self.shape + return matrix + + +class _CSRAccumulator: + """ + Fast accumulator of a CSR, based upon COO input. + """ + + def __init__( + self, + obs_joinids: pa.IntegerArray, + var_joinids: pa.IntegerArray, + pool: Executor, + index_factory: IndexFactory, + ): + self.obs_joinids = obs_joinids + self.var_joinids = var_joinids + self.pool = pool + + self.shape: Tuple[int, int] = (len(self.obs_joinids), len(self.var_joinids)) + self.obs_indexer = index_factory(self.obs_joinids) + self.var_indexer = index_factory(self.var_joinids) + self.row_length: NPIntArray = np.zeros( + (self.shape[0],), dtype=_select_dtype(self.shape[1]) + ) + + # COO accumulated chunks, stored as list of triples (row_ind, col_ind, data) + self.coo_chunks: List[ + Tuple[ + NPIntArray, # row_ind + NPIntArray, # col_ind + NPNDArray, # data + ] + ] = [] + + def append( + self, + row_joinids: pa.Array, + col_joinids: pa.Array, + data: pa.Array, + ) -> None: + """ + At accumulation time, do several things: + + * re-index to positional indices, and if possible, cast to smaller dtype + to minimize memory footprint (at cost of some amount of time) + * accumulate column counts by row, i.e., build the basis of the indptr + * cache the tuple of data, row, col + """ + rows_future = self.pool.submit( + _reindex_and_cast, + self.obs_indexer, + row_joinids.to_numpy(), + _select_dtype(self.shape[0]), + ) + cols_future = self.pool.submit( + _reindex_and_cast, + self.var_indexer, + col_joinids.to_numpy(), + _select_dtype(self.shape[1]), + ) + row_ind = rows_future.result() + col_ind = cols_future.result() + self.coo_chunks.append((row_ind, col_ind, data.to_numpy())) # type: ignore[arg-type] + _accum_row_length(self.row_length, row_ind) + + def finalize(self) -> AccumulatedCSR: + nnz = sum(len(chunk[2]) for chunk in self.coo_chunks) + index_dtype = _select_dtype(nnz) + if nnz == 0: + # There is no way to infer matrix dtype, so use a default and return + # an empty matrix. Float32 is used as a default type, as it is most + # compatible with AnnData expectations. + empty = sp.csr_matrix((0, 0), dtype=np.float32) + return AccumulatedCSR( + data=empty.data, + indptr=empty.indptr, + indices=empty.indices, + shape=self.shape, + ) + + # cumsum row lengths to get indptr + indptr: NPIntArray = np.empty((self.shape[0] + 1,), dtype=index_dtype) + indptr[0:1] = 0 + np.cumsum(self.row_length, out=indptr[1:]) + + # Parallel copy of data and column indices + indices: NPIntArray = np.empty((nnz,), dtype=index_dtype) + data: NPNDArray = np.empty((nnz,), dtype=self.coo_chunks[0][2].dtype) + + # Empirically determined value. Needs to be large enough for reasonable + # concurrency, without excessive write cache conflict. Controls the + # number of rows that are processed in a single thread, and therefore + # is the primary tuning parameter related to concurrency. + row_rng_mask_bits = 18 + + n_jobs = (self.shape[0] >> row_rng_mask_bits) + 1 + chunk_list = numba.typed.List(self.coo_chunks) + wait( + [ + self.pool.submit( + _copy_chunklist_range, + chunk_list, + data, + indices, + indptr, + row_rng_mask_bits, + job, + ) + for job in range(n_jobs) + ] + ) + _finalize_indptr(indptr) + return AccumulatedCSR( + data=data, indptr=indptr, indices=indices, shape=self.shape + ) + + +@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined,misc] +def _accum_row_length( + row_length: npt.NDArray[np.int64], row_ind: npt.NDArray[np.int64] +) -> None: + for rind in row_ind: + row_length[rind] += 1 + + +@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined,misc] +def _copy_chunk_range( + row_ind_chunk: npt.NDArray[np.signedinteger[npt.NBitBase]], + col_ind_chunk: npt.NDArray[np.signedinteger[npt.NBitBase]], + data_chunk: NPNDArray, + data: NPNDArray, + indices: npt.NDArray[np.signedinteger[npt.NBitBase]], + indptr: npt.NDArray[np.signedinteger[npt.NBitBase]], + row_rng_mask: int, + row_rng_val: int, +) -> None: + for n in range(len(data_chunk)): + row = row_ind_chunk[n] + if (row & row_rng_mask) != row_rng_val: + continue + ptr = indptr[row] + indices[ptr] = col_ind_chunk[n] + data[ptr] = data_chunk[n] + indptr[row] += 1 + + +@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined,misc] +def _copy_chunklist_range( + chunk_list: numba.typed.List, + data: NPNDArray, + indices: npt.NDArray[np.signedinteger[npt.NBitBase]], + indptr: npt.NDArray[np.signedinteger[npt.NBitBase]], + row_rng_mask_bits: int, + job: int, +) -> None: + assert row_rng_mask_bits >= 1 and row_rng_mask_bits < 64 + row_rng_mask = (2**64 - 1) >> row_rng_mask_bits << row_rng_mask_bits + row_rng_val = job << row_rng_mask_bits + for row_ind_chunk, col_ind_chunk, data_chunk in chunk_list: + _copy_chunk_range( + row_ind_chunk, + col_ind_chunk, + data_chunk, + data, + indices, + indptr, + row_rng_mask, + row_rng_val, + ) + + +@numba.jit(nopython=True, nogil=True) # type: ignore[attr-defined,misc] +def _finalize_indptr(indptr: npt.NDArray[np.signedinteger[npt.NBitBase]]) -> None: + prev = 0 + for r in range(len(indptr)): + t = indptr[r] + indptr[r] = prev + prev = t + + +def _select_dtype( + maxval: int, +) -> Union[Type[np.int32], Type[np.int64]]: + """ + Ascertain the "best" dtype for a zero-based index. Given our + goal of minimizing memory use, "best" is currently defined as + smallest. + """ + if maxval > np.iinfo(np.int32).max: + return np.int64 + else: + return np.int32 + + +def _reindex_and_cast( + index: IndexLike, ids: npt.NDArray[np.int64], target_dtype: npt.DTypeLike +) -> npt.NDArray[np.int64]: + return cast( + npt.NDArray[np.int64], index.get_indexer(ids).astype(target_dtype, copy=False) + ) diff --git a/apis/python/src/tiledbsoma/_query.py b/apis/python/src/tiledbsoma/_query.py index 25750b9182..33fbefabec 100644 --- a/apis/python/src/tiledbsoma/_query.py +++ b/apis/python/src/tiledbsoma/_query.py @@ -6,7 +6,9 @@ """Implementation of a SOMA Experiment. """ import enum +from concurrent.futures import ThreadPoolExecutor from typing import ( + TYPE_CHECKING, Any, Callable, Dict, @@ -22,7 +24,6 @@ ) import attrs -import futures import numpy as np import numpy.typing as npt import pandas as pd @@ -45,7 +46,6 @@ ResultOrder, ResultOrderStr, ) -from somacore.query import _fast_csr from somacore.query.query import ( AxisColumnNames, Numpyable, @@ -55,7 +55,9 @@ from tiledbsoma.pytiledbsoma import SOMACollection -from ._experiment import Experiment +if TYPE_CHECKING: + from ._experiment import Experiment +from ._fast_csr import read_csr from ._measurement import Measurement from ._sparse_nd_array import SparseNDArray @@ -219,7 +221,7 @@ class ExperimentAxisQuery: def __init__( self, - experiment: Experiment, + experiment: "Experiment", measurement_name: str, *, obs_query: AxisQuery = AxisQuery(), @@ -243,7 +245,7 @@ def __init__( index_factory=index_factory, ) self._index_factory = index_factory - self._threadpool_: Optional[futures.ThreadPoolExecutor] = None + self._threadpool_: Optional[ThreadPoolExecutor] = None def obs( self, @@ -579,7 +581,7 @@ def _read_axis_mappings( x_matrices = { _xname: ( - _fast_csr.read_csr( + read_csr( layer, obs_joinids, var_joinids, @@ -769,7 +771,7 @@ def _var_df(self) -> DataFrame: return self._ms.var @property - def _threadpool(self) -> futures.ThreadPoolExecutor: + def _threadpool(self) -> ThreadPoolExecutor: """ Returns the threadpool provided by the experiment's context. If not available, creates a thread pool just in time.""" @@ -778,7 +780,7 @@ def _threadpool(self) -> futures.ThreadPoolExecutor: return context.threadpool if self._threadpool_ is None: - self._threadpool_ = futures.ThreadPoolExecutor() + self._threadpool_ = ThreadPoolExecutor() return self._threadpool_ @@ -803,7 +805,7 @@ def _is_cached(self, axis: Axis) -> bool: field = "_cached_" + axis.value return getattr(self, field) is not None - def preload(self, pool: futures.ThreadPoolExecutor) -> None: + def preload(self, pool: ThreadPoolExecutor) -> None: if self._cached_obs is not None and self._cached_var is not None: return obs_ft = pool.submit(lambda: self.obs) diff --git a/apis/python/src/tiledbsoma/_read_iters.py b/apis/python/src/tiledbsoma/_read_iters.py index 06faca45c0..660bda6469 100644 --- a/apis/python/src/tiledbsoma/_read_iters.py +++ b/apis/python/src/tiledbsoma/_read_iters.py @@ -29,12 +29,12 @@ import somacore from scipy import sparse from somacore import options -from somacore.query._eager_iter import EagerIterator # This package's pybind11 code import tiledbsoma.pytiledbsoma as clib from . import _util +from ._eager_iter import EagerIterator from ._exception import SOMAError from ._indexer import IntIndexer from ._types import NTuple diff --git a/apis/python/src/tiledbsoma/_types.py b/apis/python/src/tiledbsoma/_types.py index 4033c0d6fd..c298c40508 100644 --- a/apis/python/src/tiledbsoma/_types.py +++ b/apis/python/src/tiledbsoma/_types.py @@ -27,6 +27,7 @@ NPInteger = np.integer[npt.NBitBase] NPFloating = np.floating[npt.NBitBase] NPNDArray = npt.NDArray[np.number[npt.NBitBase]] + NPIntArray = npt.NDArray[np.integer[npt.NBitBase]] else: # When not-type-checking, but running with `pandas>=2`, the "missing" type-params don't affect anything. PDSeries = pd.Series @@ -38,6 +39,7 @@ NPInteger = np.integer NPFloating = np.floating NPNDArray = np.ndarray + NPIntArray = np.ndarray Path = Union[str, pathlib.Path] diff --git a/apis/python/tests/test_eager_iter.py b/apis/python/tests/test_eager_iter.py new file mode 100644 index 0000000000..d5ebb925c3 --- /dev/null +++ b/apis/python/tests/test_eager_iter.py @@ -0,0 +1,64 @@ +import threading +import unittest +from concurrent import futures +from unittest import mock + +from tiledbsoma._eager_iter import EagerIterator + + +class EagerIterTest(unittest.TestCase): + def setUp(self): + super().setUp() + self.kiddie_pool = futures.ThreadPoolExecutor(1) + """Tiny thread pool for testing.""" + self.verify_pool = futures.ThreadPoolExecutor(1) + """Separate thread pool so verification is not blocked.""" + + def tearDown(self): + self.verify_pool.shutdown(wait=False) + self.kiddie_pool.shutdown(wait=False) + super().tearDown() + + def test_thread_starvation(self): + sem = threading.Semaphore() + try: + # Monopolize the threadpool. + sem.acquire() + self.kiddie_pool.submit(sem.acquire) + eager = EagerIterator(iter("abc"), pool=self.kiddie_pool) + got_a = self.verify_pool.submit(lambda: next(eager)) + self.assertEqual("a", got_a.result(0.1)) + got_b = self.verify_pool.submit(lambda: next(eager)) + self.assertEqual("b", got_b.result(0.1)) + got_c = self.verify_pool.submit(lambda: next(eager)) + self.assertEqual("c", got_c.result(0.1)) + with self.assertRaises(StopIteration): + self.verify_pool.submit(lambda: next(eager)).result(0.1) + finally: + sem.release() + + def test_nesting(self): + inner = EagerIterator(iter("abc"), pool=self.kiddie_pool) + outer = EagerIterator(inner, pool=self.kiddie_pool) + self.assertEqual( + "a, b, c", self.verify_pool.submit(", ".join, outer).result(0.1) + ) + + def test_exceptions(self): + flaky = mock.MagicMock() + flaky.__next__.side_effect = [1, 2, ValueError(), 3, 4] + + eager_flaky = EagerIterator(flaky, pool=self.kiddie_pool) + got_1 = self.verify_pool.submit(lambda: next(eager_flaky)) + self.assertEqual(1, got_1.result(0.1)) + got_2 = self.verify_pool.submit(lambda: next(eager_flaky)) + self.assertEqual(2, got_2.result(0.1)) + with self.assertRaises(ValueError): + self.verify_pool.submit(lambda: next(eager_flaky)).result(0.1) + got_3 = self.verify_pool.submit(lambda: next(eager_flaky)) + self.assertEqual(3, got_3.result(0.1)) + got_4 = self.verify_pool.submit(lambda: next(eager_flaky)) + self.assertEqual(4, got_4.result(0.1)) + for _ in range(5): + with self.assertRaises(StopIteration): + self.verify_pool.submit(lambda: next(eager_flaky)).result(0.1)