Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
rgw_sal_motr, motr_gc: [CORTX-33148] Add MotrGC::GCWorker class
Browse files Browse the repository at this point in the history
For supporting concurrent IOs MotrGC need to have multiple GCWorker threads.
Introducing GCWorker class and changing the MotrGC interfaces to manage
all the worker threads.

Signed-off-by: Sumedh A. Kulkarni <sumedh.a.kulkarni@seagate.com>
  • Loading branch information
sumedhak27 committed Jul 21, 2022
1 parent 0a50508 commit 543a44d
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 34 deletions.
45 changes: 36 additions & 9 deletions src/rgw/motr/gc/gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,44 @@

#include "gc.h"

void *MotrGC::entry() {
std::unique_lock<std::mutex> lk(mtx);
ldpp_dout(dpp, 10) << __func__ << ": Motr GC started" << dendl;
void *MotrGC::GCWorker::entry() {
// std::unique_lock<std::mutex> lk(lock);
// ldpp_dout(dpp, 10) << __func__ << ": Motr GC started" << dendl;

do {
ldpp_dout(dpp, 10) << __func__ << ": In a Motr GC loop." << dendl;
cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10));
} while (! stop_signalled);
// do {
// ldpp_dout(dpp, 10) << __func__ << ": In a Motr GC loop." << dendl;
// cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10));
// } while (! stop_signalled);

ldpp_dout(dpp, 0) << __func__ << ": Stop signalled called.#"
<< stop_signalled << dendl;
// ldpp_dout(dpp, 0) << __func__ << ": Stop signalled called.#"
// << stop_signalled << dendl;
return nullptr;
}

void MotrGC::initialize(CephContext *_cct, rgw::sal::Store* _store) {
cct = _cct;
store = _store;
// fetch num_max_queue from config
// create all gc queues in motr index store
}

void MotrGC::start_processor() {
// fetch max_concurrent_io i.e. max_threads to create from config.
// start all the gc_worker threads
}

void MotrGC::stop_processor() {
// in case of stop signal,
// gracefully shutdown all the gc threads.
down_flag = true;
for (auto& worker : workers) {
worker->stop();
worker->join();
}
workers.clear();
}

void MotrGC::GCWorker::stop() {
std::lock_guard l{lock};
cv.notify_all();
}
49 changes: 34 additions & 15 deletions src/rgw/motr/gc/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,47 @@
#include "common/Thread.h"
#include <mutex>
#include <condition_variable>
#include <atomic>

class MotrGC : public Thread {
private:
const DoutPrefixProvider *dpp;
class MotrGC : public DoutPrefixProvider {
private:
CephContext *cct;
rgw::sal::Store *store;
std::mutex mtx;
std::condition_variable cv;
bool stop_signalled = false;
uint32_t gc_interval = 60*60; // default: 24*60*60 sec
uint32_t gc_obj_min_wait = 60*60; // 60*60sec default
int max_indices = 0;
std::vector<std::string> index_names;
std::atomic<bool> down_flag = false;

public:
MotrGC(const DoutPrefixProvider *_dpp, rgw::sal::Store* _store) :
dpp(_dpp), store(_store) {}
class GCWorker : public Thread {
private:
const DoutPrefixProvider *dpp;
CephContext *cct;
MotrGC *motr_gc;
int worker_id;
uint32_t gc_interval = 60*60; // default: 24*60*60 sec
std::mutex lock;
std::condition_variable cv;
public:
GCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
MotrGC *_motr_gc, int _worker_id)
: dpp(_dpp), cct(_cct), motr_gc(_motr_gc), worker_id(_worker_id) {};

void *entry() override;
void *entry() override;
void stop();
};
std::vector<std::unique_ptr<MotrGC::GCWorker>> workers;

void signal_stop() {
std::lock_guard<std::mutex> lk_guard(mtx);
stop_signalled = true;
cv.notify_all();
MotrGC() : cct(nullptr), store(nullptr) {}
~MotrGC() {
stop_processor();
finalize();
}

void initialize(CephContext *_cct, rgw::sal::Store* _store);
void finalize();

void start_processor();
void stop_processor();
};

#endif
13 changes: 6 additions & 7 deletions src/rgw/rgw_sal_motr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1485,24 +1485,23 @@ int MotrStore::initialize(CephContext *cct, const DoutPrefixProvider *dpp) {
if (use_gc_thread) {
int rc = create_gc(dpp);
if (rc != 0)
ldpp_dout(dpp, 0) << __func__ << ": Metadata cache init failed " <<
ldpp_dout(dpp, 0) << __func__ << ": Failed to Create MotrGC " <<
"with rc = " << rc << dendl;
}
return rc;
}

int MotrStore::create_gc(const DoutPrefixProvider *dpp) {
int ret = 0;
// [TODO] Create multiple GC threads as per config
gc_worker = std::make_unique<MotrGC>(dpp, this);
gc_worker->create("motr_gc");
motr_gc->initialize(cctx, this);
motr_gc->start_processor();
return ret;
}

void MotrStore::stop_gc() {
if (gc_worker) {
gc_worker->signal_stop();
gc_worker->join();
if (motr_gc) {
motr_gc->stop_processor();
motr_gc->finalize();
}
}

Expand Down
4 changes: 1 addition & 3 deletions src/rgw/rgw_sal_motr.h
Original file line number Diff line number Diff line change
Expand Up @@ -985,9 +985,7 @@ class MotrStore : public Store {
MotrMetaCache* user_cache;
MotrMetaCache* bucket_inst_cache;

// [TODO] Create vector of gc_workers
// size is input from `rgw_gc_max_concurrent_io`
std::unique_ptr<MotrGC> gc_worker;
std::unique_ptr<MotrGC> motr_gc;
bool use_gc_thread;
bool use_cache;

Expand Down

0 comments on commit 543a44d

Please sign in to comment.