diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index cabcba988f631..efc5e1ed62b8d 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -177,7 +177,7 @@ if(WITH_RADOSGW_DBSTORE) list(APPEND librgw_common_srcs rgw_sal_dbstore.cc) endif() if(WITH_RADOSGW_MOTR) - list(APPEND librgw_common_srcs rgw_sal_motr.cc) + list(APPEND librgw_common_srcs rgw_sal_motr.cc motr/gc/gc.cc) endif() if(WITH_JAEGER) list(APPEND librgw_common_srcs rgw_tracer.cc) diff --git a/src/rgw/motr/gc/gc.cc b/src/rgw/motr/gc/gc.cc new file mode 100644 index 0000000000000..4fb3f9e8ae62f --- /dev/null +++ b/src/rgw/motr/gc/gc.cc @@ -0,0 +1,96 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=2 sw=2 expandtab ft=cpp + +/* + * Garbage Collector implementation for the CORTX Motr backend + * + * Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "motr/gc/gc.h" + +void *MotrGC::GCWorker::entry() { + std::unique_lock lk(lock); + ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix + << worker_id << " started." << dendl; + + do { + + ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix + << worker_id << " iteration" << dendl; + cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10)); + + } while (! motr_gc->going_down()); + + ldpp_dout(dpp, 0) << __func__ << ": Stop signalled called for " + << gc_thread_prefix << worker_id << dendl; + return nullptr; +} + +void MotrGC::initialize() { + // fetch max gc indices from config + auto max_indices = std::min(cct->_conf->rgw_gc_max_objs, + GC_MAX_SHARDS_PRIME); + ldpp_dout(this, 50) << __func__ << ": max_indices = " << max_indices << dendl; + + index_names.reserve(max_indices); + for (int i = 0; i < max_indices; i++) { + // Append index name to the gc index list + index_names.push_back(gc_index_prefix + std::to_string(i)); + + // [To be Implemented] create index in motr dix + } + +} + +void MotrGC::finalize() { + // [To be Implemented] undo steps from initialize stage +} + +void MotrGC::start_processor() { + // fetch max_concurrent_io i.e. max_threads to create from config. + // start all the gc_worker threads + auto max_workers = cct->_conf->rgw_gc_max_concurrent_io; + ldpp_dout(this, 50) << __func__ << ": max_workers = " + << max_workers << dendl; + workers.reserve(max_workers); + for (int ix = 0; ix < max_workers; ++ix) { + auto worker = std::make_unique(this /* dpp */, + cct, this, ix); + worker->create((gc_thread_prefix + std::to_string(ix)).c_str()); + workers.push_back(std::move(worker)); + } +} + +void MotrGC::stop_processor() { + // gracefully shutdown all the gc threads. + down_flag = true; + for (auto& worker : workers) { + worker->stop(); + worker->join(); + } + workers.clear(); +} + +void MotrGC::GCWorker::stop() { + std::lock_guard l{lock}; + cv.notify_all(); +} + +bool MotrGC::going_down() { + return down_flag; +} + +unsigned MotrGC::get_subsys() const { + return dout_subsys; +} + +std::ostream& MotrGC::gen_prefix(std::ostream& out) const { + return out << "garbage_collector: "; +} diff --git a/src/rgw/motr/gc/gc.h b/src/rgw/motr/gc/gc.h new file mode 100644 index 0000000000000..eecfe0a223c29 --- /dev/null +++ b/src/rgw/motr/gc/gc.h @@ -0,0 +1,82 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=2 sw=2 expandtab ft=cpp + +/* + * Garbage Collector Classes for the CORTX Motr backend + * + * Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MOTR_GC_H__ +#define __MOTR_GC_H__ + +#include "rgw_sal_motr.h" +#include "common/Thread.h" +#include +#include +#include + +const int64_t GC_MAX_SHARDS_PRIME = 65521; +static std::string gc_index_prefix = "gc."; +static std::string gc_thread_prefix = "gc_thread_"; + +class MotrGC : public DoutPrefixProvider { + private: + CephContext *cct; + rgw::sal::Store *store; + int max_indices = 0; + std::vector index_names; + std::atomic down_flag = false; + + public: + class GCWorker : public Thread { + private: + const DoutPrefixProvider *dpp; + CephContext *cct; + MotrGC *motr_gc; + int worker_id; + uint32_t gc_interval = 60*60; // default: 24*60*60 sec + std::mutex lock; + std::condition_variable cv; + public: + GCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, + MotrGC *_motr_gc, int _worker_id) + : dpp(_dpp), + cct(_cct), + motr_gc(_motr_gc), + worker_id(_worker_id) {}; + + void *entry() override; + void stop(); + }; + std::vector> workers; + + MotrGC(CephContext *_cct, rgw::sal::Store* _store) + : cct(_cct), store(_store) {} + + ~MotrGC() { + stop_processor(); + finalize(); + } + + void initialize(); + void finalize(); + + void start_processor(); + void stop_processor(); + + bool going_down(); + + // Set Up logging prefix for GC + CephContext *get_cct() const override { return cct; } + unsigned get_subsys() const; + std::ostream& gen_prefix(std::ostream& out) const; +}; + +#endif diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index 88dae299a8cc8..3eb1e2e05eeb4 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -117,7 +117,13 @@ rgw::sal::Store* StoreManager::init_storage_provider(const DoutPrefixProvider* d ldpp_dout(dpp, 0) << "newMotrStore() failed!" << dendl; return store; } - ((rgw::sal::MotrStore *)store)->init_metadata_cache(dpp, cct, use_cache); + + if ((*(rgw::sal::MotrStore*)store).set_use_cache(use_cache) + .set_run_gc_thread(use_gc_thread) + .initialize(cct, dpp) < 0) { + delete store; + return nullptr; + } return store; } diff --git a/src/rgw/rgw_sal_motr.cc b/src/rgw/rgw_sal_motr.cc index 2c43fc144e342..584743773b561 100644 --- a/src/rgw/rgw_sal_motr.cc +++ b/src/rgw/rgw_sal_motr.cc @@ -1508,12 +1508,57 @@ int MotrBucket::abort_multiparts(const DoutPrefixProvider *dpp, CephContext *cct return 0; } -void MotrStore::finalize(void) -{ +void MotrStore::finalize(void) { + // stop gc worker threads + stop_gc(); // close connection with motr m0_client_fini(this->instance, true); } +MotrStore& MotrStore::set_run_gc_thread(bool _use_gc_thread) { + use_gc_thread = _use_gc_thread; + return *this; +} + +MotrStore& MotrStore::set_use_cache(bool _use_cache) { + use_cache = _use_cache; + return *this; +} + +int MotrStore::initialize(CephContext *cct, const DoutPrefixProvider *dpp) { + // Create metadata objects and set enabled=use_cache value + int rc = init_metadata_cache(dpp, cct); + if (rc != 0) { + ldpp_dout(dpp, 0) << __func__ << ": Metadata cache init failed " << + "with rc = " << rc << dendl; + return rc; + } + + if (use_gc_thread) { + // Create MotrGC object and start GCWorker threads + int rc = create_gc(); + if (rc != 0) + ldpp_dout(dpp, 0) << __func__ << ": Failed to Create MotrGC " << + "with rc = " << rc << dendl; + } + return rc; +} + +int MotrStore::create_gc() { + int ret = 0; + motr_gc = std::make_unique(cctx, this); + motr_gc->initialize(); + motr_gc->start_processor(); + return ret; +} + +void MotrStore::stop_gc() { + if (motr_gc) { + motr_gc->stop_processor(); + motr_gc->finalize(); + } +} + uint64_t MotrStore::get_new_req_id() { uint64_t req_id = ceph::util::generate_random_number(); @@ -5442,7 +5487,7 @@ std::string MotrStore::get_cluster_id(const DoutPrefixProvider* dpp, optional_y } int MotrStore::init_metadata_cache(const DoutPrefixProvider *dpp, - CephContext *cct, bool use_cache) + CephContext *cct) { this->obj_meta_cache = new MotrMetaCache(dpp, cct); this->get_obj_meta_cache()->set_enabled(use_cache); diff --git a/src/rgw/rgw_sal_motr.h b/src/rgw/rgw_sal_motr.h index 6bbe942572eec..f7bac0b737828 100644 --- a/src/rgw/rgw_sal_motr.h +++ b/src/rgw/rgw_sal_motr.h @@ -32,8 +32,11 @@ extern "C" { #include "rgw_role.h" #include "rgw_multi.h" #include "rgw_putobj_processor.h" +#include "motr/gc/gc.h" typedef void (*progress_cb)(off_t, void*); +class MotrGC; + namespace rgw::sal { class MotrStore; @@ -984,6 +987,10 @@ class MotrStore : public Store { MotrMetaCache* user_cache; MotrMetaCache* bucket_inst_cache; + std::unique_ptr motr_gc; + bool use_gc_thread; + bool use_cache; + public: CephContext *cctx; struct m0_client *instance; @@ -1099,7 +1106,12 @@ class MotrStore : public Store { uint64_t olh_epoch, const std::string& unique_tag) override; + virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp); virtual void finalize(void) override; + int create_gc(); + void stop_gc(); + MotrStore& set_run_gc_thread(bool _use_gc_thread); + MotrStore& set_use_cache(bool _use_cache); virtual CephContext *ctx(void) override { return cctx; @@ -1135,7 +1147,7 @@ class MotrStore : public Store { int delete_access_key(const DoutPrefixProvider *dpp, optional_yield y, std::string access_key); int store_email_info(const DoutPrefixProvider *dpp, optional_yield y, MotrEmailInfo& email_info); - int init_metadata_cache(const DoutPrefixProvider *dpp, CephContext *cct, bool use_cache); + int init_metadata_cache(const DoutPrefixProvider *dpp, CephContext *cct); MotrMetaCache* get_obj_meta_cache() {return obj_meta_cache;} MotrMetaCache* get_user_cache() {return user_cache;} MotrMetaCache* get_bucket_inst_cache() {return bucket_inst_cache;}