From e51f5d8a924ae711c8b9d899165f82d606aa3918 Mon Sep 17 00:00:00 2001 From: Aaron Lun Date: Thu, 25 Jul 2024 16:53:59 -0700 Subject: [PATCH] Exposed utilities for extracting compressed sparse data. (#109) This makes it easier for applications to extract compressed sparse data straight into their own memory allocations (e.g., Rcpp::Vectors). --- .../sparse/convert_to_compressed_sparse.hpp | 460 +++++++++++------- .../sparse/convert_to_compressed_sparse.cpp | 99 ++++ 2 files changed, 395 insertions(+), 164 deletions(-) diff --git a/include/tatami/sparse/convert_to_compressed_sparse.hpp b/include/tatami/sparse/convert_to_compressed_sparse.hpp index 95ca71b0..9326bb78 100644 --- a/include/tatami/sparse/convert_to_compressed_sparse.hpp +++ b/include/tatami/sparse/convert_to_compressed_sparse.hpp @@ -17,6 +17,272 @@ namespace tatami { +/** + * @cond + */ +namespace convert_to_compressed_sparse_internal { + +template +void count_compressed_sparse_non_zeros_consistent(const tatami::Matrix* matrix, Index_ primary, Index_ secondary, bool row, Count_* output, int threads) { + if (matrix->is_sparse()) { + Options opt; + opt.sparse_extract_value = false; + opt.sparse_extract_index = false; + opt.sparse_ordered_index = false; + + parallelize([&](size_t, Index_ start, Index_ length) -> void { + auto wrk = consecutive_extractor(matrix, row, start, length, opt); + for (Index_ x = 0; x < length; ++x) { + auto range = wrk->fetch(NULL, NULL); + output[start + x] = range.number; + } + }, primary, threads); + + } else { + parallelize([&](size_t, Index_ start, Index_ length) -> void { + std::vector buffer_v(secondary); + auto wrk = consecutive_extractor(matrix, row, start, length); + for (Index_ p = start, pe = start + length; p < pe; ++p) { + auto ptr = wrk->fetch(buffer_v.data()); + size_t count = 0; + for (Index_ s = 0; s < secondary; ++s) { + count += (ptr[s] != 0); + } + output[p] = count; + } + }, primary, threads); + } +} + +template +void count_compressed_sparse_non_zeros_inconsistent(const tatami::Matrix* matrix, Index_ primary, Index_ secondary, bool row, Count_* output, int threads) { + std::vector > nz_counts(threads - 1); + for (auto& x : nz_counts) { + x.resize(primary); + } + + if (matrix->is_sparse()) { + Options opt; + opt.sparse_extract_value = false; + opt.sparse_ordered_index = false; + + parallelize([&](size_t t, Index_ start, Index_ length) -> void { + std::vector buffer_i(primary); + auto wrk = consecutive_extractor(matrix, !row, start, length, opt); + auto my_counts = (t > 0 ? nz_counts[t - 1].data() : output); + + for (Index_ x = 0; x < length; ++x) { + auto range = wrk->fetch(NULL, buffer_i.data()); + for (Index_ i = 0; i < range.number; ++i) { + ++my_counts[range.index[i]]; + } + } + }, secondary, threads); + + } else { + parallelize([&](size_t t, Index_ start, Index_ length) -> void { + auto wrk = consecutive_extractor(matrix, !row, start, length); + std::vector buffer_v(primary); + auto my_counts = (t > 0 ? nz_counts[t - 1].data() : output); + + for (Index_ x = 0; x < length; ++x) { + auto ptr = wrk->fetch(buffer_v.data()); + for (Index_ p = 0; p < primary; ++p) { + my_counts[p] += (ptr[p] != 0); + } + } + }, secondary, threads); + } + + for (auto& y : nz_counts) { + for (Index_ p = 0; p < primary; ++p) { + output[p] += y[p]; + } + } +} + +template +void fill_compressed_sparse_matrix_consistent( + const tatami::Matrix* matrix, + InputIndex_ primary, + InputIndex_ secondary, + bool row, + const Pointer_* pointers, + StoredValue_* output_value, + StoredIndex_* output_index, + int threads) +{ + if (matrix->is_sparse()) { + Options opt; + opt.sparse_ordered_index = false; + + parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + std::vector buffer_v(secondary); + std::vector buffer_i(secondary); + auto wrk = consecutive_extractor(matrix, row, start, length, opt); + + for (InputIndex_ p = start, pe = start + length; p < pe; ++p) { + // Resist the urge to `fetch()` straight into 'output_v' + // and 'output_i', as implementations may assume that they + // have the entire 'length' length to play with, and the + // output vectors only have whatever is allocated from the + // first pass (which might be nothing for an all-zero matrix). + auto range = wrk->fetch(buffer_v.data(), buffer_i.data()); + auto offset = pointers[p]; + std::copy_n(range.value, range.number, output_value + offset); + std::copy_n(range.index, range.number, output_index + offset); + } + }, primary, threads); + + } else { + parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + std::vector buffer_v(secondary); + auto wrk = consecutive_extractor(matrix, row, start, length); + + for (InputIndex_ p = start, pe = start + length; p < pe; ++p) { + auto ptr = wrk->fetch(buffer_v.data()); + auto offset = pointers[p]; + for (InputIndex_ s = 0; s < secondary; ++s) { + auto val = ptr[s]; + if (val != 0) { + output_value[offset] = val; + output_index[offset] = s; + ++offset; + } + } + } + }, primary, threads); + } +} + +template +void fill_compressed_sparse_matrix_inconsistent( + const tatami::Matrix* matrix, + InputIndex_ primary, + InputIndex_ secondary, + bool row, + const Pointer_* pointers, + StoredValue_* output_value, + StoredIndex_* output_index, + int threads) +{ + if (matrix->is_sparse()) { + Options opt; + opt.sparse_ordered_index = false; + + parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + std::vector buffer_v(length); + std::vector buffer_i(length); + auto wrk = consecutive_extractor(matrix, !row, static_cast(0), secondary, start, length, opt); + std::vector offset_copy(pointers + start, pointers + start + length); + + for (InputIndex_ x = 0; x < secondary; ++x) { + auto range = wrk->fetch(buffer_v.data(), buffer_i.data()); + for (InputIndex_ i = 0; i < range.number; ++i) { + auto& pos = offset_copy[range.index[i] - start]; + output_value[pos] = range.value[i]; + output_index[pos] = x; + ++pos; + } + } + }, primary, threads); + + } else { + parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { + std::vector buffer_v(length); + auto wrk = consecutive_extractor(matrix, !row, static_cast(0), secondary, start, length); + std::vector offset_copy(pointers + start, pointers + start + length); + + for (InputIndex_ x = 0; x < secondary; ++x) { + auto ptr = wrk->fetch(buffer_v.data()); + for (InputIndex_ p = 0; p < length; ++p) { + auto val = ptr[p]; + if (val != 0) { + auto& pos = offset_copy[p]; + output_value[pos] = val; + output_index[pos] = x; + ++pos; + } + } + } + }, primary, threads); + } +} + +} +/** + * @endcond + */ + +/** + * @tparam Value_ Type of value in the matrix. + * @tparam Index_ Integer type of row/column index. + * @tparam Count_ Integer type for the non-zero count. + * + * @param matrix Pointer to a `tatami::Matrix`. + * @param row Whether to count structural non-zeros by row. + * @param[out] output Pointer to an array of length equal to the number of rows (if `row = true`) or columns (otherwise) of `matrix`. + * On output, this stores the number of structural non-zeros in each row (if `row = true`) or column (otherwise). + * @param threads Number of threads to use. + * + * For sparse `matrix`, all structural non-zero elements are reported, even if they have actual values of zero. + * In contrast, for dense `matrix`, only the non-zero values are counted; + * these are considered to be structural non-zeros upon conversion to a sparse matrix (e.g., in `fill_compressed_sparse_contents()`). + */ +template +void count_compressed_sparse_non_zeros(const tatami::Matrix* matrix, bool row, Count_* output, int threads) { + Index_ NR = matrix->nrow(); + Index_ NC = matrix->ncol(); + Index_ primary = (row ? NR : NC); + Index_ secondary = (row ? NC : NR); + std::fill_n(output, primary, 0); + + if (row == matrix->prefer_rows()) { + convert_to_compressed_sparse_internal::count_compressed_sparse_non_zeros_consistent(matrix, primary, secondary, row, output, threads); + } else { + convert_to_compressed_sparse_internal::count_compressed_sparse_non_zeros_inconsistent(matrix, primary, secondary, row, output, threads); + } +} + +/** + * @tparam StoredValue_ Type of data values to be stored in the output. + * @tparam StoredIndex_ Integer type for storing the indices in the output. + * @tparam InputValue_ Type of data values in the input interface. + * @tparam InputIndex_ Integer type for indices in the input interface. + * + * @param matrix Pointer to a `tatami::Matrix`. + * @param row Whether to fill `output_value` and `output_index` by row, i.e., the output represents a compressed sparse row matrix. + * @param[in] pointers Pointer to an array of length greater than or equal to the number of rows (if `row = true`) or columns (otherwise) of `matrix`. + * Each entry contains the position of the start of each row/column in `output_value` and `output_index`. + * This argument is equivalent to the array of pointers for the compressed sparse format (e.g., `CompressedSparseContents::pointers`), + * and can be obtained by taking the cumulative sum of the per-row/column counts from `count_compressed_sparse_non_zeros()`. + * @param[out] output_value Pointer to an array of length equal to the total number of structural non-zero elements. + * On output, this is used to store the values of those elements in a compressed sparse format (e.g., `CompressedSparseContents::value`). + * @param[out] output_index Pointer to an array of length equal to the total number of structural non-zero elements. + * On output, this is used to store the row/column indices of those elements in a compressed sparse format (e.g., `CompressedSparseContents::index`). + * @param threads Number of threads to use. + */ +template +void fill_compressed_sparse_contents( + const tatami::Matrix* matrix, + bool row, + const Pointer_* pointers, + StoredValue_* output_value, + StoredIndex_* output_index, + int threads) +{ + InputIndex_ NR = matrix->nrow(); + InputIndex_ NC = matrix->ncol(); + InputIndex_ primary = (row ? NR : NC); + InputIndex_ secondary = (row ? NC : NR); + + if (row == matrix->prefer_rows()) { + convert_to_compressed_sparse_internal::fill_compressed_sparse_matrix_consistent(matrix, primary, secondary, row, pointers, output_value, output_index, threads); + } else { + convert_to_compressed_sparse_internal::fill_compressed_sparse_matrix_inconsistent(matrix, primary, secondary, row, pointers, output_value, output_index, threads); + } +} + /** * @brief Compressed sparse contents. * @@ -57,6 +323,9 @@ struct CompressedSparseContents { * @param threads Number of threads to use. * * @return Contents of the sparse matrix in compressed form, see `CompressedSparseContents`. + * + * The behavior of this function can be replicated by manually calling `count_compressed_sparse_non_zeros()` followed by `fill_compressed_sparse_contents()`. + * This may be desirable for users who want to put the compressed sparse contents into pre-existing memory allocations. */ template CompressedSparseContents retrieve_compressed_sparse_contents(const Matrix* matrix, bool row, bool two_pass, int threads = 1) { @@ -91,183 +360,46 @@ CompressedSparseContents retrieve_compressed_sparse_ } else if (row == matrix->prefer_rows()) { // First pass to figure out how many non-zeros there are. output_p.resize(static_cast(primary) + 1); - - if (matrix->is_sparse()) { - Options opt; - opt.sparse_extract_value = false; - opt.sparse_extract_index = false; - opt.sparse_ordered_index = false; - - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { - auto wrk = consecutive_extractor(matrix, row, start, length, opt); - for (InputIndex_ x = 0; x < length; ++x) { - auto range = wrk->fetch(NULL, NULL); - output_p[start + x + 1] = range.number; - } - }, primary, threads); - - } else { - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { - std::vector buffer_v(secondary); - auto wrk = consecutive_extractor(matrix, row, start, length); - for (InputIndex_ p = start, pe = start + length; p < pe; ++p) { - auto ptr = wrk->fetch(buffer_v.data()); - size_t count = 0; - for (InputIndex_ s = 0; s < secondary; ++s, ++ptr) { - count += (*ptr != 0); - } - output_p[p + 1] = count; - } - }, primary, threads); - } - + convert_to_compressed_sparse_internal::count_compressed_sparse_non_zeros_consistent(matrix, primary, secondary, row, output_p.data() + 1, threads); for (InputIndex_ i = 1; i <= primary; ++i) { output_p[i] += output_p[i - 1]; } - output_v.resize(output_p.back()); - output_i.resize(output_p.back()); // Second pass to actually fill our vectors. - if (matrix->is_sparse()) { - Options opt; - opt.sparse_ordered_index = false; - - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { - std::vector buffer_v(secondary); - std::vector buffer_i(secondary); - auto wrk = consecutive_extractor(matrix, row, start, length, opt); - - for (InputIndex_ p = start, pe = start + length; p < pe; ++p) { - // Resist the urge to `fetch()` straight into 'output_p' - // and 'output_i', as implementations may assume that they - // have the entire 'length' length to play with, and the - // output vectors only have whatever is allocated from the - // first pass (which might be nothing for an all-zero matrix). - auto range = wrk->fetch(buffer_v.data(), buffer_i.data()); - auto offset = output_p[p]; - std::copy_n(range.value, range.number, output_v.data() + offset); - std::copy_n(range.index, range.number, output_i.data() + offset); - } - }, primary, threads); - - } else { - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { - std::vector buffer_v(secondary); - auto wrk = consecutive_extractor(matrix, row, start, length); - - for (InputIndex_ p = start, pe = start + length; p < pe; ++p) { - auto ptr = wrk->fetch(buffer_v.data()); - auto offset = output_p[p]; - for (InputIndex_ s = 0; s < secondary; ++s, ++ptr) { - if (*ptr != 0) { - output_v[offset] = *ptr; - output_i[offset] = s; - ++offset; - } - } - } - }, primary, threads); - } + output_v.resize(output_p.back()); + output_i.resize(output_p.back()); + convert_to_compressed_sparse_internal::fill_compressed_sparse_matrix_consistent( + matrix, + primary, + secondary, + row, + output_p.data(), + output_v.data(), + output_i.data(), + threads + ); } else { // First pass to figure out how many non-zeros there are. - std::vector > nz_counts(threads); - for (auto& x : nz_counts) { - x.resize(static_cast(primary) + 1); - } - - if (matrix->is_sparse()) { - Options opt; - opt.sparse_extract_value = false; - opt.sparse_ordered_index = false; - - parallelize([&](size_t t, InputIndex_ start, InputIndex_ length) -> void { - std::vector buffer_i(primary); - auto wrk = consecutive_extractor(matrix, !row, start, length, opt); - auto& my_counts = nz_counts[t]; - - for (InputIndex_ x = 0; x < length; ++x) { - auto range = wrk->fetch(NULL, buffer_i.data()); - for (InputIndex_ i = 0; i < range.number; ++i, ++range.index) { - ++my_counts[*range.index + 1]; - } - } - }, secondary, threads); - - } else { - parallelize([&](size_t t, InputIndex_ start, InputIndex_ length) -> void { - auto wrk = consecutive_extractor(matrix, !row, start, length); - std::vector buffer_v(primary); - auto& my_counts = nz_counts[t]; - - for (InputIndex_ x = 0; x < length; ++x) { - auto ptr = wrk->fetch(buffer_v.data()); - for (InputIndex_ p = 0; p < primary; ++p, ++ptr) { - if (*ptr) { - ++my_counts[p + 1]; - } - } - } - }, secondary, threads); - } - - output_p.swap(nz_counts.front()); // There better be at least 1 thread! - for (int i = 1; i < threads; ++i) { - auto& y = nz_counts[i]; - for (InputIndex_ p = 0; p <= primary; ++p) { - output_p[p] += y[p]; - } - y.clear(); - y.shrink_to_fit(); - } - + output_p.resize(static_cast(primary) + 1); + convert_to_compressed_sparse_internal::count_compressed_sparse_non_zeros_inconsistent(matrix, primary, secondary, row, output_p.data() + 1, threads); for (InputIndex_ i = 1; i <= primary; ++i) { output_p[i] += output_p[i - 1]; } - output_v.resize(output_p.back()); - output_i.resize(output_p.back()); // Second pass to actually fill our vectors. - if (matrix->is_sparse()) { - Options opt; - opt.sparse_ordered_index = false; - - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { - std::vector buffer_v(length); - std::vector buffer_i(length); - auto wrk = consecutive_extractor(matrix, !row, static_cast(0), secondary, start, length, opt); - std::vector offset_copy(output_p.begin() + start, output_p.begin() + start + length); - - for (InputIndex_ x = 0; x < secondary; ++x) { - auto range = wrk->fetch(buffer_v.data(), buffer_i.data()); - for (InputIndex_ i = 0; i < range.number; ++i, ++range.value, ++range.index) { - auto& pos = offset_copy[*(range.index) - start]; - output_v[pos] = *(range.value); - output_i[pos] = x; - ++pos; - } - } - }, primary, threads); - - } else { - parallelize([&](size_t, InputIndex_ start, InputIndex_ length) -> void { - std::vector buffer_v(length); - auto wrk = consecutive_extractor(matrix, !row, static_cast(0), secondary, start, length); - std::vector offset_copy(output_p.begin() + start, output_p.begin() + start + length); - - for (InputIndex_ x = 0; x < secondary; ++x) { - auto ptr = wrk->fetch(buffer_v.data()); - for (InputIndex_ p = 0; p < length; ++p, ++ptr) { - if (*ptr != 0) { - auto& pos = offset_copy[p]; - output_v[pos] = *ptr; - output_i[pos] = x; - ++pos; - } - } - } - }, primary, threads); - } + output_v.resize(output_p.back()); + output_i.resize(output_p.back()); + convert_to_compressed_sparse_internal::fill_compressed_sparse_matrix_inconsistent( + matrix, + primary, + secondary, + row, + output_p.data(), + output_v.data(), + output_i.data(), + threads + ); } return output; diff --git a/tests/src/sparse/convert_to_compressed_sparse.cpp b/tests/src/sparse/convert_to_compressed_sparse.cpp index 9a81f066..cb0473d4 100644 --- a/tests/src/sparse/convert_to_compressed_sparse.cpp +++ b/tests/src/sparse/convert_to_compressed_sparse.cpp @@ -88,3 +88,102 @@ INSTANTIATE_TEST_SUITE_P( ::testing::Values(1, 3) // number of threads ) ); + +class ConvertToCompressedSparseManualTest : public ::testing::Test { +protected: + inline static size_t NR = 120, NC = 50; + inline static std::shared_ptr > mat; + + static void SetUpTestSuite() { + auto vec = tatami_test::simulate_sparse_vector(NR * NC, 0.1); + mat.reset(new tatami::DenseRowMatrix(NR, NC, std::move(vec))); + } +}; + +TEST_F(ConvertToCompressedSparseManualTest, Consistent) { + std::vector pointers(NR + 1); + tatami::count_compressed_sparse_non_zeros(mat.get(), true, pointers.data() + 1, 1); + + { + auto wrk = mat->dense_row(); + for (size_t i = 0; i < NR; ++i) { + auto row = tatami_test::fetch(wrk.get(), static_cast(i), NC); + size_t expected = 0; + for (auto x : row) { + expected += (x != 0); + } + EXPECT_EQ(expected, pointers[i + 1]); + } + } + + for (size_t i = 1; i <= NR; ++i) { + pointers[i] += pointers[i - 1]; + } + size_t nonzeros = pointers.back(); + std::vector values(nonzeros); + std::vector indices(nonzeros); + tatami::fill_compressed_sparse_contents(mat.get(), true, pointers.data(), values.data(), indices.data(), 1); + + tatami::CompressedSparseMatrix, std::vector, std::vector > spmat( + mat->nrow(), + mat->ncol(), + std::move(values), + std::move(indices), + std::move(pointers), + true + ); + + { + auto wrk = mat->dense_row(); + auto spwrk = spmat.dense_row(); + for (size_t i = 0; i < NR; ++i) { + auto expected = tatami_test::fetch(wrk.get(), static_cast(i), NC); + auto observed = tatami_test::fetch(spwrk.get(), static_cast(i), NC); + EXPECT_EQ(expected, observed); + } + } +} + +TEST_F(ConvertToCompressedSparseManualTest, Inconsistent) { + std::vector pointers(NC + 1); + tatami::count_compressed_sparse_non_zeros(mat.get(), false, pointers.data() + 1, 1); + + { + auto wrk = mat->dense_column(); + for (size_t i = 0; i < NC; ++i) { + auto column = tatami_test::fetch(wrk.get(), static_cast(i), NR); + size_t expected = 0; + for (auto x : column) { + expected += (x != 0); + } + EXPECT_EQ(expected, pointers[i + 1]); + } + } + + for (size_t i = 1; i <= NC; ++i) { + pointers[i] += pointers[i - 1]; + } + size_t nonzeros = pointers.back(); + std::vector values(nonzeros); + std::vector indices(nonzeros); + tatami::fill_compressed_sparse_contents(mat.get(), false, pointers.data(), values.data(), indices.data(), 1); + + tatami::CompressedSparseMatrix, std::vector, std::vector > spmat( + mat->nrow(), + mat->ncol(), + std::move(values), + std::move(indices), + std::move(pointers), + false + ); + + { + auto wrk = mat->dense_column(); + auto spwrk = spmat.dense_column(); + for (size_t i = 0; i < NC; ++i) { + auto expected= tatami_test::fetch(wrk.get(), static_cast(i), NR); + auto observed = tatami_test::fetch(spwrk.get(), static_cast(i), NR); + EXPECT_EQ(expected, observed); + } + } +}