Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
rgw_sal_motr, motr_gc: [CORTX-33148] add MotrGC, MotrGC::GCWorker inf…
Browse files Browse the repository at this point in the history
…ra code (#356)

* rgw_sal_motr, motr_gc: [CORTX-33148] add MotrGC, MotrGC::GCWorker infra code

Behaviour
- With Garbage Collector enabled, MotrGC will have GC indexes & GC worker threads.
- GC worker threads will run for the configured max processing time and then 
   will wait for the configured time between two consecutive runs.  

Additions/Changes
- Add the Garbage Collector infrastructure to support the start & stop of worker threads.
  - MotrGC class with the interfaces to initialize(), start(), stop() and finalize().
  - GCWorker class with entry() and stop() methods.
- Refactor Initialization of MotrStore
  - add setter methods to configure MotrStore
 - add initialize() method to call initialization of all the other components
   eg. MetadataCache, GC, (in future LC, QuotaHandler), etc.

Signed-off-by: Sumedh A. Kulkarni <sumedh.a.kulkarni@seagate.com>
  • Loading branch information
sumedhak27 committed Jul 26, 2022
1 parent f543f1f commit cd34def
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 6 deletions.
2 changes: 1 addition & 1 deletion src/rgw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ if(WITH_RADOSGW_DBSTORE)
list(APPEND librgw_common_srcs rgw_sal_dbstore.cc)
endif()
if(WITH_RADOSGW_MOTR)
list(APPEND librgw_common_srcs rgw_sal_motr.cc)
list(APPEND librgw_common_srcs rgw_sal_motr.cc motr/gc/gc.cc)
endif()
if(WITH_JAEGER)
list(APPEND librgw_common_srcs rgw_tracer.cc)
Expand Down
96 changes: 96 additions & 0 deletions src/rgw/motr/gc/gc.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=2 sw=2 expandtab ft=cpp

/*
* Garbage Collector implementation for the CORTX Motr backend
*
* Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#include "motr/gc/gc.h"

void *MotrGC::GCWorker::entry() {
std::unique_lock<std::mutex> lk(lock);
ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix
<< worker_id << " started." << dendl;

do {

ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix
<< worker_id << " iteration" << dendl;
cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10));

} while (! motr_gc->going_down());

ldpp_dout(dpp, 0) << __func__ << ": Stop signalled called for "
<< gc_thread_prefix << worker_id << dendl;
return nullptr;
}

void MotrGC::initialize() {
// fetch max gc indices from config
auto max_indices = std::min(cct->_conf->rgw_gc_max_objs,
GC_MAX_SHARDS_PRIME);
ldpp_dout(this, 50) << __func__ << ": max_indices = " << max_indices << dendl;

index_names.reserve(max_indices);
for (int i = 0; i < max_indices; i++) {
// Append index name to the gc index list
index_names.push_back(gc_index_prefix + std::to_string(i));

// [To be Implemented] create index in motr dix
}

}

void MotrGC::finalize() {
// [To be Implemented] undo steps from initialize stage
}

void MotrGC::start_processor() {
// fetch max_concurrent_io i.e. max_threads to create from config.
// start all the gc_worker threads
auto max_workers = cct->_conf->rgw_gc_max_concurrent_io;
ldpp_dout(this, 50) << __func__ << ": max_workers = "
<< max_workers << dendl;
workers.reserve(max_workers);
for (int ix = 0; ix < max_workers; ++ix) {
auto worker = std::make_unique<MotrGC::GCWorker>(this /* dpp */,
cct, this, ix);
worker->create((gc_thread_prefix + std::to_string(ix)).c_str());
workers.push_back(std::move(worker));
}
}

void MotrGC::stop_processor() {
// gracefully shutdown all the gc threads.
down_flag = true;
for (auto& worker : workers) {
worker->stop();
worker->join();
}
workers.clear();
}

void MotrGC::GCWorker::stop() {
std::lock_guard l{lock};
cv.notify_all();
}

bool MotrGC::going_down() {
return down_flag;
}

unsigned MotrGC::get_subsys() const {
return dout_subsys;
}

std::ostream& MotrGC::gen_prefix(std::ostream& out) const {
return out << "garbage_collector: ";
}
82 changes: 82 additions & 0 deletions src/rgw/motr/gc/gc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=2 sw=2 expandtab ft=cpp

/*
* Garbage Collector Classes for the CORTX Motr backend
*
* Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef __MOTR_GC_H__
#define __MOTR_GC_H__

#include "rgw_sal_motr.h"
#include "common/Thread.h"
#include <mutex>
#include <condition_variable>
#include <atomic>

const int64_t GC_MAX_SHARDS_PRIME = 65521;
static std::string gc_index_prefix = "gc.";
static std::string gc_thread_prefix = "gc_thread_";

class MotrGC : public DoutPrefixProvider {
private:
CephContext *cct;
rgw::sal::Store *store;
int max_indices = 0;
std::vector<std::string> index_names;
std::atomic<bool> down_flag = false;

public:
class GCWorker : public Thread {
private:
const DoutPrefixProvider *dpp;
CephContext *cct;
MotrGC *motr_gc;
int worker_id;
uint32_t gc_interval = 60*60; // default: 24*60*60 sec
std::mutex lock;
std::condition_variable cv;
public:
GCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
MotrGC *_motr_gc, int _worker_id)
: dpp(_dpp),
cct(_cct),
motr_gc(_motr_gc),
worker_id(_worker_id) {};

void *entry() override;
void stop();
};
std::vector<std::unique_ptr<MotrGC::GCWorker>> workers;

MotrGC(CephContext *_cct, rgw::sal::Store* _store)
: cct(_cct), store(_store) {}

~MotrGC() {
stop_processor();
finalize();
}

void initialize();
void finalize();

void start_processor();
void stop_processor();

bool going_down();

// Set Up logging prefix for GC
CephContext *get_cct() const override { return cct; }
unsigned get_subsys() const;
std::ostream& gen_prefix(std::ostream& out) const;
};

#endif
8 changes: 7 additions & 1 deletion src/rgw/rgw_sal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ rgw::sal::Store* StoreManager::init_storage_provider(const DoutPrefixProvider* d
ldpp_dout(dpp, 0) << "newMotrStore() failed!" << dendl;
return store;
}
((rgw::sal::MotrStore *)store)->init_metadata_cache(dpp, cct, use_cache);

if ((*(rgw::sal::MotrStore*)store).set_use_cache(use_cache)
.set_run_gc_thread(use_gc_thread)
.initialize(cct, dpp) < 0) {
delete store;
return nullptr;
}

return store;
}
Expand Down
51 changes: 48 additions & 3 deletions src/rgw/rgw_sal_motr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1473,12 +1473,57 @@ int MotrBucket::abort_multiparts(const DoutPrefixProvider *dpp, CephContext *cct
return 0;
}

void MotrStore::finalize(void)
{
void MotrStore::finalize(void) {
// stop gc worker threads
stop_gc();
// close connection with motr
m0_client_fini(this->instance, true);
}

MotrStore& MotrStore::set_run_gc_thread(bool _use_gc_thread) {
use_gc_thread = _use_gc_thread;
return *this;
}

MotrStore& MotrStore::set_use_cache(bool _use_cache) {
use_cache = _use_cache;
return *this;
}

int MotrStore::initialize(CephContext *cct, const DoutPrefixProvider *dpp) {
// Create metadata objects and set enabled=use_cache value
int rc = init_metadata_cache(dpp, cct);
if (rc != 0) {
ldpp_dout(dpp, 0) << __func__ << ": Metadata cache init failed " <<
"with rc = " << rc << dendl;
return rc;
}

if (use_gc_thread) {
// Create MotrGC object and start GCWorker threads
int rc = create_gc();
if (rc != 0)
ldpp_dout(dpp, 0) << __func__ << ": Failed to Create MotrGC " <<
"with rc = " << rc << dendl;
}
return rc;
}

int MotrStore::create_gc() {
int ret = 0;
motr_gc = std::make_unique<MotrGC>(cctx, this);
motr_gc->initialize();
motr_gc->start_processor();
return ret;
}

void MotrStore::stop_gc() {
if (motr_gc) {
motr_gc->stop_processor();
motr_gc->finalize();
}
}

uint64_t MotrStore::get_new_req_id()
{
uint64_t req_id = ceph::util::generate_random_number<uint64_t>();
Expand Down Expand Up @@ -5394,7 +5439,7 @@ std::string MotrStore::get_cluster_id(const DoutPrefixProvider* dpp, optional_y
}

int MotrStore::init_metadata_cache(const DoutPrefixProvider *dpp,
CephContext *cct, bool use_cache)
CephContext *cct)
{
this->obj_meta_cache = new MotrMetaCache(dpp, cct);
this->get_obj_meta_cache()->set_enabled(use_cache);
Expand Down
14 changes: 13 additions & 1 deletion src/rgw/rgw_sal_motr.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ extern "C" {
#include "rgw_role.h"
#include "rgw_multi.h"
#include "rgw_putobj_processor.h"
#include "motr/gc/gc.h"
typedef void (*progress_cb)(off_t, void*);

class MotrGC;

namespace rgw::sal {

class MotrStore;
Expand Down Expand Up @@ -982,6 +985,10 @@ class MotrStore : public Store {
MotrMetaCache* user_cache;
MotrMetaCache* bucket_inst_cache;

std::unique_ptr<MotrGC> motr_gc;
bool use_gc_thread;
bool use_cache;

public:
CephContext *cctx;
struct m0_client *instance;
Expand Down Expand Up @@ -1097,7 +1104,12 @@ class MotrStore : public Store {
uint64_t olh_epoch,
const std::string& unique_tag) override;

virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp);
virtual void finalize(void) override;
int create_gc();
void stop_gc();
MotrStore& set_run_gc_thread(bool _use_gc_thread);
MotrStore& set_use_cache(bool _use_cache);

virtual CephContext *ctx(void) override {
return cctx;
Expand Down Expand Up @@ -1133,7 +1145,7 @@ class MotrStore : public Store {
int delete_access_key(const DoutPrefixProvider *dpp, optional_yield y, std::string access_key);
int store_email_info(const DoutPrefixProvider *dpp, optional_yield y, MotrEmailInfo& email_info);

int init_metadata_cache(const DoutPrefixProvider *dpp, CephContext *cct, bool use_cache);
int init_metadata_cache(const DoutPrefixProvider *dpp, CephContext *cct);
MotrMetaCache* get_obj_meta_cache() {return obj_meta_cache;}
MotrMetaCache* get_user_cache() {return user_cache;}
MotrMetaCache* get_bucket_inst_cache() {return bucket_inst_cache;}
Expand Down

0 comments on commit cd34def

Please sign in to comment.