From 543a44d38b6956c4986ced1494e5e6321db83192 Mon Sep 17 00:00:00 2001 From: "Sumedh A. Kulkarni" Date: Thu, 21 Jul 2022 02:51:24 -0600 Subject: [PATCH] rgw_sal_motr, motr_gc: [CORTX-33148] Add MotrGC::GCWorker class For supporting concurrent IOs MotrGC need to have multiple GCWorker threads. Introducing GCWorker class and changing the MotrGC interfaces to manage all the worker threads. Signed-off-by: Sumedh A. Kulkarni --- src/rgw/motr/gc/gc.cc | 45 +++++++++++++++++++++++++++++-------- src/rgw/motr/gc/gc.h | 49 ++++++++++++++++++++++++++++------------- src/rgw/rgw_sal_motr.cc | 13 +++++------ src/rgw/rgw_sal_motr.h | 4 +--- 4 files changed, 77 insertions(+), 34 deletions(-) diff --git a/src/rgw/motr/gc/gc.cc b/src/rgw/motr/gc/gc.cc index 997aff0c14494..95d570b1bf732 100644 --- a/src/rgw/motr/gc/gc.cc +++ b/src/rgw/motr/gc/gc.cc @@ -15,17 +15,44 @@ #include "gc.h" -void *MotrGC::entry() { - std::unique_lock lk(mtx); - ldpp_dout(dpp, 10) << __func__ << ": Motr GC started" << dendl; +void *MotrGC::GCWorker::entry() { + // std::unique_lock lk(lock); + // ldpp_dout(dpp, 10) << __func__ << ": Motr GC started" << dendl; - do { - ldpp_dout(dpp, 10) << __func__ << ": In a Motr GC loop." << dendl; - cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10)); - } while (! stop_signalled); + // do { + // ldpp_dout(dpp, 10) << __func__ << ": In a Motr GC loop." << dendl; + // cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10)); + // } while (! stop_signalled); - ldpp_dout(dpp, 0) << __func__ << ": Stop signalled called.#" - << stop_signalled << dendl; + // ldpp_dout(dpp, 0) << __func__ << ": Stop signalled called.#" + // << stop_signalled << dendl; return nullptr; } +void MotrGC::initialize(CephContext *_cct, rgw::sal::Store* _store) { + cct = _cct; + store = _store; + // fetch num_max_queue from config + // create all gc queues in motr index store +} + +void MotrGC::start_processor() { + // fetch max_concurrent_io i.e. max_threads to create from config. + // start all the gc_worker threads +} + +void MotrGC::stop_processor() { + // in case of stop signal, + // gracefully shutdown all the gc threads. + down_flag = true; + for (auto& worker : workers) { + worker->stop(); + worker->join(); + } + workers.clear(); +} + +void MotrGC::GCWorker::stop() { + std::lock_guard l{lock}; + cv.notify_all(); +} diff --git a/src/rgw/motr/gc/gc.h b/src/rgw/motr/gc/gc.h index ae02863f31e09..86935c89d889b 100644 --- a/src/rgw/motr/gc/gc.h +++ b/src/rgw/motr/gc/gc.h @@ -20,28 +20,47 @@ #include "common/Thread.h" #include #include +#include -class MotrGC : public Thread { - private: - const DoutPrefixProvider *dpp; +class MotrGC : public DoutPrefixProvider { + private: + CephContext *cct; rgw::sal::Store *store; - std::mutex mtx; - std::condition_variable cv; - bool stop_signalled = false; - uint32_t gc_interval = 60*60; // default: 24*60*60 sec - uint32_t gc_obj_min_wait = 60*60; // 60*60sec default + int max_indices = 0; + std::vector index_names; + std::atomic down_flag = false; public: - MotrGC(const DoutPrefixProvider *_dpp, rgw::sal::Store* _store) : - dpp(_dpp), store(_store) {} + class GCWorker : public Thread { + private: + const DoutPrefixProvider *dpp; + CephContext *cct; + MotrGC *motr_gc; + int worker_id; + uint32_t gc_interval = 60*60; // default: 24*60*60 sec + std::mutex lock; + std::condition_variable cv; + public: + GCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, + MotrGC *_motr_gc, int _worker_id) + : dpp(_dpp), cct(_cct), motr_gc(_motr_gc), worker_id(_worker_id) {}; - void *entry() override; + void *entry() override; + void stop(); + }; + std::vector> workers; - void signal_stop() { - std::lock_guard lk_guard(mtx); - stop_signalled = true; - cv.notify_all(); + MotrGC() : cct(nullptr), store(nullptr) {} + ~MotrGC() { + stop_processor(); + finalize(); } + + void initialize(CephContext *_cct, rgw::sal::Store* _store); + void finalize(); + + void start_processor(); + void stop_processor(); }; #endif diff --git a/src/rgw/rgw_sal_motr.cc b/src/rgw/rgw_sal_motr.cc index c9dc62c6dea3e..f2c0bc282be5b 100644 --- a/src/rgw/rgw_sal_motr.cc +++ b/src/rgw/rgw_sal_motr.cc @@ -1485,7 +1485,7 @@ int MotrStore::initialize(CephContext *cct, const DoutPrefixProvider *dpp) { if (use_gc_thread) { int rc = create_gc(dpp); if (rc != 0) - ldpp_dout(dpp, 0) << __func__ << ": Metadata cache init failed " << + ldpp_dout(dpp, 0) << __func__ << ": Failed to Create MotrGC " << "with rc = " << rc << dendl; } return rc; @@ -1493,16 +1493,15 @@ int MotrStore::initialize(CephContext *cct, const DoutPrefixProvider *dpp) { int MotrStore::create_gc(const DoutPrefixProvider *dpp) { int ret = 0; - // [TODO] Create multiple GC threads as per config - gc_worker = std::make_unique(dpp, this); - gc_worker->create("motr_gc"); + motr_gc->initialize(cctx, this); + motr_gc->start_processor(); return ret; } void MotrStore::stop_gc() { - if (gc_worker) { - gc_worker->signal_stop(); - gc_worker->join(); + if (motr_gc) { + motr_gc->stop_processor(); + motr_gc->finalize(); } } diff --git a/src/rgw/rgw_sal_motr.h b/src/rgw/rgw_sal_motr.h index 42f82d2e7d028..e0e2f3f801a3a 100644 --- a/src/rgw/rgw_sal_motr.h +++ b/src/rgw/rgw_sal_motr.h @@ -985,9 +985,7 @@ class MotrStore : public Store { MotrMetaCache* user_cache; MotrMetaCache* bucket_inst_cache; - // [TODO] Create vector of gc_workers - // size is input from `rgw_gc_max_concurrent_io` - std::unique_ptr gc_worker; + std::unique_ptr motr_gc; bool use_gc_thread; bool use_cache;