diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 1947479b..fc470ddc 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -111,11 +111,11 @@ jobs: with: python-version: ${{ matrix.python-version }} - name: Run gtest - timeout-minutes: 30 + timeout-minutes: 15 run: bazel test --test_output=all --spawn_strategy=standalone --test_timeout=900 //tests/cpp/... - name: Run SEALAPI tests - timeout-minutes: 30 + timeout-minutes: 15 run: bazel test --test_output=all --spawn_strategy=standalone --test_timeout=900 //tests/python/sealapi/... - name: Run TenSEAL tests - timeout-minutes: 30 - run: bazel test --test_output=all --spawn_strategy=standalone --test_timeout=900 //tests/python/tenseal/... + timeout-minutes: 15 + run: bazel test --test_output=all --spawn_strategy=standalone --test_output=streamed --local_sigkill_grace_seconds=30 --test_timeout=900 //tests/python/tenseal/... diff --git a/README.md b/README.md index 845478e0..9844c019 100644 --- a/README.md +++ b/README.md @@ -134,6 +134,11 @@ load("@org_openmined_tenseal//tenseal:deps.bzl", "tenseal_deps") tenseal_deps() ``` +## Benchmarks + +You can benchmark the implementation at any point by running +```bash +$ bazel run -c opt --spawn_strategy=standalone //tests/cpp/benchmarks:benchmark ## Support For support in using this library, please join the **#lib_tenseal** Slack channel. If you’d like to follow along with any code changes to the library, please join the **#code_tenseal** Slack channel. [Click here to join our Slack community!](https://slack.openmined.org) diff --git a/tenseal/__init__.py b/tenseal/__init__.py index e8fc6f6f..59ce42e3 100644 --- a/tenseal/__init__.py +++ b/tenseal/__init__.py @@ -17,7 +17,9 @@ GaloisKeys = _ts_cpp.GaloisKeys -def context(scheme, poly_modulus_degree, plain_modulus=None, coeff_mod_bit_sizes=None): +def context( + scheme, poly_modulus_degree, plain_modulus=None, coeff_mod_bit_sizes=None, n_threads=None +): """Construct a context that holds keys and parameters needed for operating encrypted tensors using either BFV or CKKS scheme. @@ -47,12 +49,17 @@ def context(scheme, poly_modulus_degree, plain_modulus=None, coeff_mod_bit_sizes raise ValueError("Invalid scheme type, use either SCHEME_TYPE.BFV or SCHEME_TYPE.CKKS") # We can't pass None here, everything should be set prior to this call + if isinstance(n_threads, int) and n_threads > 0: + return _ts_cpp.TenSEALContext.new( + scheme, poly_modulus_degree, plain_modulus, coeff_mod_bit_sizes, n_threads + ) + return _ts_cpp.TenSEALContext.new( scheme, poly_modulus_degree, plain_modulus, coeff_mod_bit_sizes ) -def context_from(buff): +def context_from(buff, n_threads=None): """Construct a context from a serialized buffer. Args: @@ -61,6 +68,8 @@ def context_from(buff): Returns: A TenSEALContext object. """ + if n_threads: + return _ts_cpp.TenSEALContext.deserialize(buff, n_threads) return _ts_cpp.TenSEALContext.deserialize(buff) diff --git a/tenseal/binding.cpp b/tenseal/binding.cpp index ef7b5a37..27004719 100644 --- a/tenseal/binding.cpp +++ b/tenseal/binding.cpp @@ -135,13 +135,13 @@ PYBIND11_MODULE(_tenseal_cpp, m) { .def("sum", &CKKSVector::sum) .def("sum_", &CKKSVector::sum_inplace) .def("matmul", &CKKSVector::matmul_plain, py::arg("matrix"), - py::arg("n_threads") = 0) + py::arg("n_jobs") = 0) .def("matmul_", &CKKSVector::matmul_plain_inplace, py::arg("matrix"), - py::arg("n_threads") = 0) + py::arg("n_jobs") = 0) .def("mm", &CKKSVector::matmul_plain, py::arg("matrix"), - py::arg("n_threads") = 0) + py::arg("n_jobs") = 0) .def("mm_", &CKKSVector::matmul_plain_inplace, py::arg("matrix"), - py::arg("n_threads") = 0) + py::arg("n_jobs") = 0) // python arithmetic .def("__neg__", &CKKSVector::negate) .def("__pow__", &CKKSVector::power) @@ -199,9 +199,9 @@ PYBIND11_MODULE(_tenseal_cpp, m) { .def("__imul__", py::overload_cast &>( &CKKSVector::mul_plain_inplace)) .def("__matmul__", &CKKSVector::matmul_plain, py::arg("matrix"), - py::arg("n_threads") = 0) + py::arg("n_jobs") = 0) .def("__imatmul__", &CKKSVector::matmul_plain_inplace, - py::arg("matrix"), py::arg("n_threads") = 0) + py::arg("matrix"), py::arg("n_jobs") = 0) .def("context", [](const CKKSVector &obj) { return obj.tenseal_context(); }) .def("serialize", @@ -227,18 +227,20 @@ PYBIND11_MODULE(_tenseal_cpp, m) { py::overload_cast<>(&TenSEALContext::auto_mod_switch), py::overload_cast(&TenSEALContext::auto_mod_switch)) .def("new", - py::overload_cast>( - &TenSEALContext::Create), + py::overload_cast, + optional>(&TenSEALContext::Create), R"(Create a new TenSEALContext object to hold keys and parameters. Args: scheme : define the scheme to be used, either SCHEME_TYPE.BFV or SCHEME_TYPE.CKKS. poly_modulus_degree : The degree of the polynomial modulus, must be a power of two. plain_modulus : The plaintext modulus. Is not used if scheme is CKKS. coeff_mod_bit_sizes : List of bit size for each coeffecient modulus. + n_threads : Optional: number of threads to use for multiplications. Can be an empty list for BFV, a default value will be given. )", py::arg("poly_modulus_degree"), py::arg("plain_modulus"), - py::arg("coeff_mod_bit_sizes") = vector()) + py::arg("coeff_mod_bit_sizes") = vector(), + py::arg("n_threads") = get_concurrency()) .def("public_key", &TenSEALContext::public_key) .def("secret_key", &TenSEALContext::secret_key) .def("relin_keys", &TenSEALContext::relin_keys) @@ -266,8 +268,10 @@ PYBIND11_MODULE(_tenseal_cpp, m) { "Generate Relinearization keys using the secret key") .def("serialize", [](const TenSEALContext &obj) { return py::bytes(obj.save()); }) - .def_static("deserialize", py::overload_cast( - &TenSEALContext::Create)) + .def_static("deserialize", + py::overload_cast>( + &TenSEALContext::Create), + py::arg("buffer"), py::arg("n_threads") = get_concurrency()) .def("copy", &TenSEALContext::copy) .def("__copy__", [](const std::shared_ptr &self) { diff --git a/tenseal/cpp/context/sealcontext.cpp b/tenseal/cpp/context/sealcontext.cpp index 5374d464..dc10e0f3 100644 --- a/tenseal/cpp/context/sealcontext.cpp +++ b/tenseal/cpp/context/sealcontext.cpp @@ -1,7 +1,6 @@ #include "tenseal/cpp/context/sealcontext.h" #include -#include #include "seal/seal.h" diff --git a/tenseal/cpp/context/tensealcontext.cpp b/tenseal/cpp/context/tensealcontext.cpp index 49afd764..6a2ba252 100644 --- a/tenseal/cpp/context/tensealcontext.cpp +++ b/tenseal/cpp/context/tensealcontext.cpp @@ -10,17 +10,35 @@ namespace tenseal { using namespace seal; using namespace std; -TenSEALContext::TenSEALContext(EncryptionParameters parms) { +TenSEALContext::TenSEALContext(EncryptionParameters parms, + optional n_threads) { + this->dispatcher_setup(n_threads); this->base_setup(parms); this->keys_setup(); } -TenSEALContext::TenSEALContext(istream& stream) { this->load(stream); } -TenSEALContext::TenSEALContext(const std::string& input) { this->load(input); } -TenSEALContext::TenSEALContext(const TenSEALContextProto& input) { +TenSEALContext::TenSEALContext(istream& stream, optional n_threads) { + this->dispatcher_setup(n_threads); + this->load(stream); +} +TenSEALContext::TenSEALContext(const std::string& input, + optional n_threads) { + this->dispatcher_setup(n_threads); + this->load(input); +} +TenSEALContext::TenSEALContext(const TenSEALContextProto& input, + optional n_threads) { + this->dispatcher_setup(n_threads); this->load_proto(input); } +void TenSEALContext::dispatcher_setup(optional n_threads) { + this->_threads = n_threads.value_or(get_concurrency()); + if (this->_threads == 0) this->_threads = get_concurrency(); + + this->_dispatcher = make_shared(this->_threads); +} + void TenSEALContext::base_setup(EncryptionParameters parms) { this->_parms = parms; this->_context = SEALContext::Create(this->_parms); @@ -65,7 +83,7 @@ void TenSEALContext::keys_setup(optional public_key, shared_ptr TenSEALContext::Create( scheme_type scheme, size_t poly_modulus_degree, uint64_t plain_modulus, - vector coeff_mod_bit_sizes) { + vector coeff_mod_bit_sizes, optional n_threads) { EncryptionParameters parms; switch (scheme) { case scheme_type::BFV: @@ -82,20 +100,22 @@ shared_ptr TenSEALContext::Create( throw invalid_argument("invalid scheme_type"); } - return shared_ptr(new TenSEALContext(parms)); + return shared_ptr(new TenSEALContext(parms, n_threads)); } -shared_ptr TenSEALContext::Create(istream& stream) { - return shared_ptr(new TenSEALContext(stream)); +shared_ptr TenSEALContext::Create(istream& stream, + optional n_threads) { + return shared_ptr(new TenSEALContext(stream, n_threads)); } -shared_ptr TenSEALContext::Create(const std::string& input) { - return shared_ptr(new TenSEALContext(input)); +shared_ptr TenSEALContext::Create(const std::string& input, + optional n_threads) { + return shared_ptr(new TenSEALContext(input, n_threads)); } shared_ptr TenSEALContext::Create( - const TenSEALContextProto& input) { - return shared_ptr(new TenSEALContext(input)); + const TenSEALContextProto& input, optional n_threads) { + return shared_ptr(new TenSEALContext(input, n_threads)); } shared_ptr TenSEALContext::public_key() const { @@ -329,7 +349,8 @@ TenSEALContextProto TenSEALContext::save_proto() const { std::shared_ptr TenSEALContext::copy() const { TenSEALContextProto buffer = this->save_proto(); - return shared_ptr(new TenSEALContext(buffer)); + return shared_ptr( + new TenSEALContext(buffer, this->_threads)); } void TenSEALContext::load(std::istream& stream) { diff --git a/tenseal/cpp/context/tensealcontext.h b/tenseal/cpp/context/tensealcontext.h index 9bd8f21d..e2db4de3 100644 --- a/tenseal/cpp/context/tensealcontext.h +++ b/tenseal/cpp/context/tensealcontext.h @@ -4,6 +4,7 @@ #include "seal/seal.h" #include "tenseal/cpp/context/sealcontext.h" #include "tenseal/cpp/context/tensealencoder.h" +#include "tenseal/cpp/utils/threadpool.h" #include "tenseal/proto/tensealcontext.pb.h" namespace tenseal { @@ -38,32 +39,43 @@ class TenSEALContext { * @param[in] scheme: BFV or CKKS. * @param[in] poly_modulus_degree: The polynomial modulus degree. * @param[in] plain_modulus: The plaintext modulus. - * @param[in] coeff_mod_bit_sizes: The bit-lengths of the primes to be - *generated. + * @param[in] coeff_mod_bit_sizes: The bit-lengths of the primes to be/ + * @param[in] n_threads: Optional parameter for the size of the threadpool + *dispatcher. generated. * @returns shared_ptr to a new TenSEALContext object. **/ static shared_ptr Create(scheme_type scheme, size_t poly_modulus_degree, uint64_t plain_modulus, - vector coeff_mod_bit_sizes); + vector coeff_mod_bit_sizes, + optional n_threads = {}); /** * Create a context from an input stream. * @param[in] stream + * @param[in] n_threads: Optional parameter for the size of the threadpool + *dispatcher. * @returns shared_ptr to a new TenSEALContext object. **/ - static shared_ptr Create(istream& stream); + static shared_ptr Create(istream& stream, + optional n_threads = {}); /** * Create a context from a serialized protobuffer. * @param[in] input: Serialized protobuffer. + * @param[in] n_threads: Optional parameter for the size of the threadpool + *dispatcher. * @returns shared_ptr to a new TenSEALContext object. **/ - static shared_ptr Create(const std::string& input); + static shared_ptr Create(const std::string& input, + optional n_threads = {}); /** * Create a context from a protobuffer. * @param[in] input: The protobuffer. + * @param[in] n_threads: Optional parameter for the size of the threadpool + *dispatcher. * @returns shared_ptr to a new TenSEALContext object. **/ - static shared_ptr Create(const TenSEALContextProto& input); + static shared_ptr Create(const TenSEALContextProto& input, + optional n_threads = {}); /** * @returns a pointer to the public key. **/ @@ -222,6 +234,11 @@ class TenSEALContext { * @returns true if the contexts are identical. **/ bool equals(const std::shared_ptr& other); + /** + * @returns a pointer to the threadpool dispatcher + **/ + shared_ptr dispatcher() { return _dispatcher; } + size_t dispatcher_size() { return _threads; } private: EncryptionParameters _parms; @@ -231,6 +248,10 @@ class TenSEALContext { shared_ptr _relin_keys; shared_ptr _galois_keys; shared_ptr encoder_factory; + + shared_ptr _dispatcher; + uint _threads; + /** * Switches for automatic relinearization, rescaling, and modulus switching **/ @@ -242,12 +263,13 @@ class TenSEALContext { uint8_t _auto_flags = flag_auto_relin | flag_auto_rescale | flag_auto_mod_switch; - TenSEALContext(EncryptionParameters parms); - TenSEALContext(istream& stream); - TenSEALContext(const std::string& stream); - TenSEALContext(const TenSEALContextProto& proto); + TenSEALContext(EncryptionParameters parms, optional n_threads); + TenSEALContext(istream& stream, optional n_threads); + TenSEALContext(const std::string& stream, optional n_threads); + TenSEALContext(const TenSEALContextProto& proto, optional n_threads); void base_setup(EncryptionParameters parms); + void dispatcher_setup(optional n_threads); void keys_setup(optional public_key = {}, optional secret_key = {}, bool generate_relin_keys = true, diff --git a/tenseal/cpp/tensors/ckksvector.cpp b/tenseal/cpp/tensors/ckksvector.cpp index 02495c47..b6f5733c 100644 --- a/tenseal/cpp/tensors/ckksvector.cpp +++ b/tenseal/cpp/tensors/ckksvector.cpp @@ -463,22 +463,16 @@ CKKSVector& CKKSVector::sum_inplace() { } CKKSVector CKKSVector::matmul_plain(const vector>& matrix, - uint n_threads) { + size_t n_jobs) { CKKSVector new_vector = *this; - return new_vector.matmul_plain_inplace(matrix, n_threads); + return new_vector.matmul_plain_inplace(matrix, n_jobs); } CKKSVector& CKKSVector::matmul_plain_inplace( - const vector>& matrix, uint n_threads) { - if (n_threads != 1) { - this->ciphertext = - diagonal_ct_vector_matmul_parallel( - this->tenseal_context(), this->ciphertext, this->size(), matrix, - n_threads); - } else { - this->ciphertext = diagonal_ct_vector_matmul( - this->tenseal_context(), this->ciphertext, this->size(), matrix); - } + const vector>& matrix, size_t n_jobs) { + this->ciphertext = diagonal_ct_vector_matmul( + this->tenseal_context(), this->ciphertext, this->size(), matrix, + n_jobs); this->_size = matrix[0].size(); diff --git a/tenseal/cpp/tensors/ckksvector.h b/tenseal/cpp/tensors/ckksvector.h index 0b592f92..92868adf 100644 --- a/tenseal/cpp/tensors/ckksvector.h +++ b/tenseal/cpp/tensors/ckksvector.h @@ -110,9 +110,9 @@ class CKKSVector { * Matrix multiplication operations. **/ CKKSVector matmul_plain(const vector>& matrix, - uint n_threads = 0); + size_t n_jobs = 0); CKKSVector& matmul_plain_inplace(const vector>& matrix, - uint n_threads = 0); + size_t n_jobs = 0); /** * Polynomial evaluation with `this` as variable. diff --git a/tenseal/cpp/tensors/utils/matrix_ops.h b/tenseal/cpp/tensors/utils/matrix_ops.h index 184cc0a2..ed1b90d9 100644 --- a/tenseal/cpp/tensors/utils/matrix_ops.h +++ b/tenseal/cpp/tensors/utils/matrix_ops.h @@ -56,78 +56,32 @@ Cryptology Conference (pp. 554-571). Springer, Berlin, Heidelberg. */ template Ciphertext diagonal_ct_vector_matmul(shared_ptr tenseal_context, - Ciphertext& vec, size_t vector_size, - const vector>& matrix) { + Ciphertext& vec, const size_t vector_size, + const vector>& matrix, + size_t n_jobs) { // matrix is organized by rows // _check_matrix(matrix, this->size()) - size_t n_rows = matrix.size(); if (vector_size != matrix.size()) { throw invalid_argument("matrix shape doesn't match with vector size"); } - Ciphertext result; - // result should have the same scale and modulus as vec * pt_diag (ct) - tenseal_context->encryptor->encrypt_zero(vec.parms_id(), result); - result.scale() = vec.scale() * tenseal_context->global_scale(); - - for (size_t i = 0; i < n_rows; i++) { - Ciphertext ct; - Plaintext pt_diag; - vector diag; - - diag = get_diagonal(matrix, -i, tenseal_context->slot_count()); - replicate_vector(diag, tenseal_context->slot_count()); - - rotate(diag.begin(), diag.begin() + diag.size() - i, diag.end()); - - tenseal_context->encode(diag, pt_diag); - - if (vec.parms_id() != pt_diag.parms_id()) { - set_to_same_mod(tenseal_context, vec, pt_diag); - } - tenseal_context->evaluator->multiply_plain(vec, pt_diag, ct); - - tenseal_context->evaluator->rotate_vector_inplace( - ct, i, *tenseal_context->galois_keys()); - - // accumulate results - tenseal_context->evaluator->add_inplace(result, ct); + if (!tenseal_context->dispatcher() || !tenseal_context->dispatcher_size()) { + throw invalid_argument("invalid dispatcher"); } - return result; -} - -template -Ciphertext diagonal_ct_vector_matmul_parallel( - shared_ptr tenseal_context, Ciphertext& vec, - const size_t vector_size, const vector>& matrix, - uint n_threads = 0) { - // matrix is organized by rows - // _check_matrix(matrix, this->size()) - const size_t n_rows = matrix.size(); - - if (vector_size != matrix.size()) { - throw invalid_argument("matrix shape doesn't match with vector size"); - } - - mutex result_mutex; Ciphertext result; // result should have the same scale and modulus as vec * pt_diag (ct) tenseal_context->encryptor->encrypt_zero(vec.parms_id(), result); result.scale() = vec.scale() * tenseal_context->global_scale(); - atomic i = 0; - auto thread_func = [&tenseal_context, &vec, &matrix, &result, &result_mutex, - &i, n_rows]() { - while (true) { - // take next i - size_t local_i; - local_i = i.fetch_add(1); - if (local_i >= n_rows) { - break; - } + auto worker_func = [&tenseal_context, &vec, &matrix]( + size_t start, size_t end) -> Ciphertext { + Ciphertext thread_result; + tenseal_context->encryptor->encrypt_zero(vec.parms_id(), thread_result); + thread_result.scale() = vec.scale() * tenseal_context->global_scale(); + for (size_t local_i = start; local_i < end; ++local_i) { Ciphertext ct; Plaintext pt_diag; vector diag; @@ -149,21 +103,29 @@ Ciphertext diagonal_ct_vector_matmul_parallel( tenseal_context->evaluator->rotate_vector_inplace( ct, local_i, *tenseal_context->galois_keys()); - // accumulate results - result_mutex.lock(); - tenseal_context->evaluator->add_inplace(result, ct); - result_mutex.unlock(); + // accumulate thread results + tenseal_context->evaluator->add_inplace(thread_result, ct); } + return thread_result; }; - // if not specified (n_threads set to 0), detect automatically - if (n_threads == 0) n_threads = get_concurrency(); - vector threads; - threads.reserve(n_threads); - // start the threads - for (uint i = 0; i < n_threads; i++) threads.push_back(thread(thread_func)); - // wait for the threads - for (uint i = 0; i < n_threads; i++) threads[i].join(); + if (n_jobs == 0) n_jobs = tenseal_context->dispatcher_size(); + + if (n_jobs == 1) return worker_func(0, vector_size); + + std::vector> future_results; + size_t batch_size = (vector_size + n_jobs - 1) / n_jobs; + + for (size_t i = 0; i < n_jobs; i++) { + future_results.push_back(tenseal_context->dispatcher()->enqueue_task( + worker_func, i * batch_size, + std::min((i + 1) * batch_size, vector_size))); + } + + for (size_t i = 0; i < n_jobs; i++) { + tenseal_context->evaluator->add_inplace(result, + future_results[i].get()); + } return result; } diff --git a/tenseal/cpp/tensors/utils/utils.cpp b/tenseal/cpp/tensors/utils/utils.cpp index fa0b905f..8143c14e 100644 --- a/tenseal/cpp/tensors/utils/utils.cpp +++ b/tenseal/cpp/tensors/utils/utils.cpp @@ -49,12 +49,4 @@ Ciphertext& sum_vector(shared_ptr tenseal_context, return vector; } -uint get_concurrency() { - uint concurrency = thread::hardware_concurrency(); - if (concurrency != 0) return concurrency; - // TODO: need to find it another way - else - return 1; -} - } // namespace tenseal diff --git a/tenseal/cpp/tensors/utils/utils.h b/tenseal/cpp/tensors/utils/utils.h index 6d6bffb6..89724e28 100644 --- a/tenseal/cpp/tensors/utils/utils.h +++ b/tenseal/cpp/tensors/utils/utils.h @@ -92,9 +92,6 @@ T compute_polynomial_term(int degree, double coeff, return x; } -/* Compute how many threads can run in parallel */ -uint get_concurrency(); - } // namespace tenseal #endif diff --git a/tenseal/cpp/utils/BUILD b/tenseal/cpp/utils/BUILD index 378a1afc..ce694ce4 100644 --- a/tenseal/cpp/utils/BUILD +++ b/tenseal/cpp/utils/BUILD @@ -8,8 +8,10 @@ cc_library( name = "tenseal_utils_cc", hdrs = [ "proto.h", + "queue.h", "scope.h", "serialization.h", + "threadpool.h", ], copts = TENSEAL_DEFAULT_COPTS, includes = TENSEAL_DEFAULT_INCLUDES, diff --git a/tenseal/cpp/utils/queue.h b/tenseal/cpp/utils/queue.h new file mode 100644 index 00000000..c0dde21a --- /dev/null +++ b/tenseal/cpp/utils/queue.h @@ -0,0 +1,83 @@ +#ifndef TENSEAL_UTILS_QUEUE_H +#define TENSEAL_UTILS_QUEUE_H + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace tenseal { +namespace sync { + +/** + * Thread-safe queue. + **/ +template +class blocking_queue { + public: + /** + * push() appends a new item to the queue and notifies the "pop" listener + *about the event. + **/ + template + void push(Args&&... args) { + { + std::scoped_lock lock{mutex_}; + queue_.emplace(std::forward(args)...); + } + ready_.notify_one(); + } + /** + * pop() waits until an item is available in the queue, pops it out and + *assigns it to the "out" parameter. + **/ + [[nodiscard]] bool pop(T& out) { + std::unique_lock lock{mutex_}; + ready_.wait(lock, [this] { return !queue_.empty() || done_; }); + if (queue_.empty()) return false; + + out = std::move(queue_.front()); + queue_.pop(); + + return true; + } + /** + * done() notifies all listeners to shutdown. + **/ + void done() noexcept { + { + std::scoped_lock lock{mutex_}; + done_ = true; + } + ready_.notify_all(); + } + /** + * empty() returns if the queue is empty or not. + **/ + [[nodiscard]] bool empty() const noexcept { + std::scoped_lock lock{mutex_}; + return queue_.empty(); + } + /** + * size() returns the size of the queue. + **/ + [[nodiscard]] unsigned int size() const noexcept { + std::scoped_lock lock{mutex_}; + return queue_.size(); + } + + private: + std::queue queue_; + std::condition_variable ready_; + std::mutex mutex_; + bool done_{false}; +}; + +} // namespace sync +} // namespace tenseal + +#endif diff --git a/tenseal/cpp/utils/threadpool.h b/tenseal/cpp/utils/threadpool.h new file mode 100644 index 00000000..83650e5c --- /dev/null +++ b/tenseal/cpp/utils/threadpool.h @@ -0,0 +1,92 @@ +#ifndef TENSEAL_UTILS_THREADPOOL_H +#define TENSEAL_UTILS_THREADPOOL_H + +#include +#include +#include +#include +#include +#include +#include + +#include "queue.h" + +namespace tenseal { + +/* Compute how many threads can run in parallel */ +inline uint get_concurrency() { + uint concurrency = thread::hardware_concurrency(); + + if (concurrency != 0) return concurrency; + + return 1; +} + +namespace sync { + +/** + * A ThreadPool class for managing and dispatching tasks to a number of threads. + **/ +class ThreadPool { + public: + /** + * Create "n_threads" workers, each with a dedicated task queue, and execute + * the task as they arrive in the queues. + **/ + ThreadPool(unsigned int n_threads = get_concurrency()) + : m_queues(n_threads), m_count(n_threads) { + assert(n_threads != 0); + auto worker = [&](unsigned int i) { + while (true) { + Proc f; + if (!m_queues[i].pop(f)) break; + f(); + } + }; + for (unsigned int i = 0; i < n_threads; ++i) + m_workers.emplace_back(worker, i); + } + + ~ThreadPool() noexcept { + for (auto& queue : m_queues) queue.done(); + for (auto& worker : m_workers) worker.join(); + } + + /** + * enqueue_task() assigns tasks to worker queues using round robin + *scheduling. + * @returns a std::future object with the result of the task. + **/ + template + auto enqueue_task(F&& f, Args&&... args) + -> std::future::type> { + using return_type = typename std::result_of::type; + + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...)); + std::future res = task->get_future(); + auto work = [task]() { (*task)(); }; + + unsigned int i = m_index++; + m_queues[i % m_count].push(work); + + return res; + } + + private: + using Proc = std::function; + + using Queues = std::vector>; + Queues m_queues; + + using Threads = std::vector; + Threads m_workers; + + const unsigned int m_count; + std::atomic_uint m_index = 0; +}; + +} // namespace sync +} // namespace tenseal + +#endif diff --git a/tenseal/deps.bzl b/tenseal/deps.bzl index 6180aded..1deace1f 100644 --- a/tenseal/deps.bzl +++ b/tenseal/deps.bzl @@ -15,6 +15,13 @@ def tenseal_deps(): strip_prefix = "googletest-release-1.10.0", url = "https://github.com/google/googletest/archive/release-1.10.0.zip", ) + if "com_google_benchmark" not in native.existing_rules(): + http_archive( + name = "com_google_benchmark", + sha256 = "a9d41abe1bd45a707d39fdfd46c01b92e340923bc5972c0b54a48002a9a7cfa3", + strip_prefix = "benchmark-8cead007830bdbe94b7cc259e873179d0ef84da6", + url = "https://github.com/google/benchmark/archive/8cead007830bdbe94b7cc259e873179d0ef84da6.zip", + ) if "com_microsoft_seal" not in native.existing_rules(): http_archive( diff --git a/tests/cpp/BUILD b/tests/cpp/BUILD index 9fcbe3a7..05a2785b 100644 --- a/tests/cpp/BUILD +++ b/tests/cpp/BUILD @@ -16,6 +16,7 @@ cc_test( deps = [ "//tenseal/cpp/context:tenseal_context_cc", "//tenseal/cpp/tensors:tenseal_tensors_cc", + "//tenseal/cpp/utils:tenseal_utils_cc", "@com_google_googletest//:gtest", "@com_google_googletest//:gtest_main", ], diff --git a/tests/cpp/benchmarks/BUILD b/tests/cpp/benchmarks/BUILD new file mode 100644 index 00000000..c2b25640 --- /dev/null +++ b/tests/cpp/benchmarks/BUILD @@ -0,0 +1,20 @@ +package(default_visibility = ["//visibility:public"]) + +TENSEAL_DEFAULT_INCLUDES = ["."] + +TENSEAL_DEFAULT_COPTS = ["-std=c++17"] + +cc_binary( + name = "benchmark", + srcs = [ + "benchmark.cpp", + ], + copts = TENSEAL_DEFAULT_COPTS, + includes = TENSEAL_DEFAULT_INCLUDES, + linkstatic = True, + deps = [ + "//tenseal/cpp:tenseal_cc", + "@com_google_benchmark//:benchmark_main", + "@com_google_googletest//:gtest", + ], +) diff --git a/tests/cpp/benchmarks/benchmark.cpp b/tests/cpp/benchmarks/benchmark.cpp new file mode 100644 index 00000000..b9db64c1 --- /dev/null +++ b/tests/cpp/benchmarks/benchmark.cpp @@ -0,0 +1,40 @@ +#include "benchmark/benchmark.h" +#include "tenseal/cpp/tensors/ckksvector.h" + +namespace tenseal { +namespace { + +void BM_matmul_plain(benchmark::State& state) { + int threads = state.range(0); + + auto ctx = TenSEALContext::Create(scheme_type::CKKS, 8192, -1, {60, 40, 40, 60}, threads); + ctx->generate_galois_keys(); + ctx->global_scale(std::pow(2, 40)); + + std::vector data; + size_t N = 1024; + + for(size_t idx = 0; idx < N; ++idx) { + data.push_back(idx + 1); + } + vector> matrix; + for(size_t idx = 0; idx < N; ++idx) { + matrix.push_back(data); + } + + auto vec = CKKSVector(ctx, data); + + for (auto _ : state) { + auto res = vec.matmul_plain(matrix); + ::benchmark::DoNotOptimize(res); + } +} +// Range is for the number of inputs, and the captured argument is the false +// positive rate for 10k client queries. +BENCHMARK(BM_matmul_plain) + ->RangeMultiplier(2) + ->Iterations(3) + ->Range(2, 8); + +} // namespace +} // namespace tenseal diff --git a/tests/cpp/tensealcontext_test.cpp b/tests/cpp/tensealcontext_test.cpp index 60a368c1..ae3b30c5 100644 --- a/tests/cpp/tensealcontext_test.cpp +++ b/tests/cpp/tensealcontext_test.cpp @@ -1,4 +1,5 @@ #include "tenseal/cpp/context/tensealcontext.h" +#include "tenseal/cpp/utils/threadpool.h" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -63,6 +64,16 @@ TEST_F(TenSEALContextTest, TestSerialization) { ASSERT_EQ(orig_galoiskeys.size(), serial_galoiskeys.size()); } +TEST_F(TenSEALContextTest, TestDispatcher) { + auto ctx = + TenSEALContext::Create(scheme_type::CKKS, 8192, -1, {60, 40, 40, 60}); + ASSERT_EQ(ctx->dispatcher_size(), get_concurrency()); + + ctx = + TenSEALContext::Create(scheme_type::CKKS, 8192, -1, {60, 40, 40, 60}, 8); + ASSERT_EQ(ctx->dispatcher_size(), 8); +} + TEST_P(TenSEALContextTest, TestCreateBFV) { bool should_serialize_first = GetParam(); diff --git a/tests/cpp/tensors/ckksvector_test.cpp b/tests/cpp/tensors/ckksvector_test.cpp index c9389985..7a4c5f57 100644 --- a/tests/cpp/tensors/ckksvector_test.cpp +++ b/tests/cpp/tensors/ckksvector_test.cpp @@ -211,6 +211,32 @@ TEST_P(CKKSVectorTest, TestCKKSReplicateFirstSlot) { ASSERT_TRUE(are_close(result, {2, 2, 2, 2, 2, 2})); } +TEST_P(CKKSVectorTest, TestCKKSPlainMatMul) { + bool should_serialize_first = GetParam(); + + auto ctx = + TenSEALContext::Create(scheme_type::CKKS, 8192, -1, {60, 40, 40, 60}); + ASSERT_TRUE(ctx != nullptr); + + ctx->generate_galois_keys(); + ctx->global_scale(std::pow(2, 40)); + + auto vec = CKKSVector(ctx, std::vector({1, 2, 3})); + auto matrix = vector>{{1, 2, 3}, {1, 2, 3}, {1, 2, 3}}; + auto expected_result = vector{6, 12, 18}; + + auto result = vec.matmul_plain(matrix, 2); + + if (should_serialize_first) { + result = duplicate(result); + } + + auto decrypted_result = result.decrypt(); + + ASSERT_EQ(decrypted_result.size(), 3); + ASSERT_TRUE(are_close(decrypted_result, expected_result)); +} + INSTANTIATE_TEST_CASE_P(TestCKKSVector, CKKSVectorTest, ::testing::Values(false, true)); diff --git a/tests/python/tenseal/tensors/test_ckks_vector.py b/tests/python/tenseal/tensors/test_ckks_vector.py index 0fb59b6d..0cd0be38 100644 --- a/tests/python/tenseal/tensors/test_ckks_vector.py +++ b/tests/python/tenseal/tensors/test_ckks_vector.py @@ -23,6 +23,14 @@ def context(): return context +def parallel_context(n_threads): + context = ts.context( + ts.SCHEME_TYPE.CKKS, 8192, coeff_mod_bit_sizes=[60, 40, 40, 60], n_threads=n_threads + ) + context.global_scale = pow(2, 40) + return context + + # default precision is 1, otherwise it can be specified in the test-case @pytest.fixture(scope="function") def precision(): @@ -949,10 +957,12 @@ def test_mul_without_global_scale(vec1, vec2, precision): ], ) @pytest.mark.parametrize("n_threads", [0, 1, 2, 4]) -def test_vec_plain_matrix_mul(context, vec, matrix, n_threads, precision): +@pytest.mark.parametrize("n_jobs", [0, 1, 2, 4]) +def test_vec_plain_matrix_mul(vec, matrix, n_threads, n_jobs, precision): + context = parallel_context(n_threads) context.generate_galois_keys() ct = ts.ckks_vector(context, vec) - result = ct.mm(matrix, n_threads=n_threads) + result = ct.mm(matrix, n_jobs) expected = (np.array(vec) @ np.array(matrix)).tolist() assert _almost_equal( result.decrypt(), expected, precision @@ -972,10 +982,12 @@ def test_vec_plain_matrix_mul(context, vec, matrix, n_threads, precision): ], ) @pytest.mark.parametrize("n_threads", [0, 1, 2, 4]) -def test_vec_plain_matrix_mul_inplace(context, vec, matrix, n_threads, precision): +@pytest.mark.parametrize("n_jobs", [0, 1, 2, 4]) +def test_vec_plain_matrix_mul_inplace(vec, matrix, n_threads, n_jobs, precision): + context = parallel_context(n_threads) context.generate_galois_keys() ct = ts.ckks_vector(context, vec) - ct.mm_(matrix, n_threads=n_threads) + ct.mm_(matrix, n_jobs) expected = (np.array(vec) @ np.array(matrix)).tolist() assert _almost_equal(ct.decrypt(), expected, precision), "Matrix multiplciation is incorrect."