From db4f67cc4e295be9c42651a87abb8dcf7402fb2c Mon Sep 17 00:00:00 2001 From: Sumedh Anantrao Kulkarni Date: Mon, 25 Jul 2022 18:24:09 +0530 Subject: [PATCH 1/7] rgw_sal_motr, motr_gc: [CORTX-33148] add MotrGC, MotrGC::GCWorker infra code (#356) * rgw_sal_motr, motr_gc: [CORTX-33148] add MotrGC, MotrGC::GCWorker infra code Behaviour - With Garbage Collector enabled, MotrGC will have GC indexes & GC worker threads. - GC worker threads will run for the configured max processing time and then will wait for the configured time between two consecutive runs. Additions/Changes - Add the Garbage Collector infrastructure to support the start & stop of worker threads. - MotrGC class with the interfaces to initialize(), start(), stop() and finalize(). - GCWorker class with entry() and stop() methods. - Refactor Initialization of MotrStore - add setter methods to configure MotrStore - add initialize() method to call initialization of all the other components eg. MetadataCache, GC, (in future LC, QuotaHandler), etc. Signed-off-by: Sumedh Anantrao Kulkarni --- src/rgw/CMakeLists.txt | 2 +- src/rgw/motr/gc/gc.cc | 96 +++++++++++++++++++++++++++++++++++++++++ src/rgw/motr/gc/gc.h | 82 +++++++++++++++++++++++++++++++++++ src/rgw/rgw_sal.cc | 8 +++- src/rgw/rgw_sal_motr.cc | 51 ++++++++++++++++++++-- src/rgw/rgw_sal_motr.h | 14 +++++- 6 files changed, 247 insertions(+), 6 deletions(-) create mode 100644 src/rgw/motr/gc/gc.cc create mode 100644 src/rgw/motr/gc/gc.h diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index cabcba988f631..efc5e1ed62b8d 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -177,7 +177,7 @@ if(WITH_RADOSGW_DBSTORE) list(APPEND librgw_common_srcs rgw_sal_dbstore.cc) endif() if(WITH_RADOSGW_MOTR) - list(APPEND librgw_common_srcs rgw_sal_motr.cc) + list(APPEND librgw_common_srcs rgw_sal_motr.cc motr/gc/gc.cc) endif() if(WITH_JAEGER) list(APPEND librgw_common_srcs rgw_tracer.cc) diff --git a/src/rgw/motr/gc/gc.cc b/src/rgw/motr/gc/gc.cc new file mode 100644 index 0000000000000..4fb3f9e8ae62f --- /dev/null +++ b/src/rgw/motr/gc/gc.cc @@ -0,0 +1,96 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=2 sw=2 expandtab ft=cpp + +/* + * Garbage Collector implementation for the CORTX Motr backend + * + * Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "motr/gc/gc.h" + +void *MotrGC::GCWorker::entry() { + std::unique_lock lk(lock); + ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix + << worker_id << " started." << dendl; + + do { + + ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix + << worker_id << " iteration" << dendl; + cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10)); + + } while (! motr_gc->going_down()); + + ldpp_dout(dpp, 0) << __func__ << ": Stop signalled called for " + << gc_thread_prefix << worker_id << dendl; + return nullptr; +} + +void MotrGC::initialize() { + // fetch max gc indices from config + auto max_indices = std::min(cct->_conf->rgw_gc_max_objs, + GC_MAX_SHARDS_PRIME); + ldpp_dout(this, 50) << __func__ << ": max_indices = " << max_indices << dendl; + + index_names.reserve(max_indices); + for (int i = 0; i < max_indices; i++) { + // Append index name to the gc index list + index_names.push_back(gc_index_prefix + std::to_string(i)); + + // [To be Implemented] create index in motr dix + } + +} + +void MotrGC::finalize() { + // [To be Implemented] undo steps from initialize stage +} + +void MotrGC::start_processor() { + // fetch max_concurrent_io i.e. max_threads to create from config. + // start all the gc_worker threads + auto max_workers = cct->_conf->rgw_gc_max_concurrent_io; + ldpp_dout(this, 50) << __func__ << ": max_workers = " + << max_workers << dendl; + workers.reserve(max_workers); + for (int ix = 0; ix < max_workers; ++ix) { + auto worker = std::make_unique(this /* dpp */, + cct, this, ix); + worker->create((gc_thread_prefix + std::to_string(ix)).c_str()); + workers.push_back(std::move(worker)); + } +} + +void MotrGC::stop_processor() { + // gracefully shutdown all the gc threads. + down_flag = true; + for (auto& worker : workers) { + worker->stop(); + worker->join(); + } + workers.clear(); +} + +void MotrGC::GCWorker::stop() { + std::lock_guard l{lock}; + cv.notify_all(); +} + +bool MotrGC::going_down() { + return down_flag; +} + +unsigned MotrGC::get_subsys() const { + return dout_subsys; +} + +std::ostream& MotrGC::gen_prefix(std::ostream& out) const { + return out << "garbage_collector: "; +} diff --git a/src/rgw/motr/gc/gc.h b/src/rgw/motr/gc/gc.h new file mode 100644 index 0000000000000..eecfe0a223c29 --- /dev/null +++ b/src/rgw/motr/gc/gc.h @@ -0,0 +1,82 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=2 sw=2 expandtab ft=cpp + +/* + * Garbage Collector Classes for the CORTX Motr backend + * + * Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MOTR_GC_H__ +#define __MOTR_GC_H__ + +#include "rgw_sal_motr.h" +#include "common/Thread.h" +#include +#include +#include + +const int64_t GC_MAX_SHARDS_PRIME = 65521; +static std::string gc_index_prefix = "gc."; +static std::string gc_thread_prefix = "gc_thread_"; + +class MotrGC : public DoutPrefixProvider { + private: + CephContext *cct; + rgw::sal::Store *store; + int max_indices = 0; + std::vector index_names; + std::atomic down_flag = false; + + public: + class GCWorker : public Thread { + private: + const DoutPrefixProvider *dpp; + CephContext *cct; + MotrGC *motr_gc; + int worker_id; + uint32_t gc_interval = 60*60; // default: 24*60*60 sec + std::mutex lock; + std::condition_variable cv; + public: + GCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct, + MotrGC *_motr_gc, int _worker_id) + : dpp(_dpp), + cct(_cct), + motr_gc(_motr_gc), + worker_id(_worker_id) {}; + + void *entry() override; + void stop(); + }; + std::vector> workers; + + MotrGC(CephContext *_cct, rgw::sal::Store* _store) + : cct(_cct), store(_store) {} + + ~MotrGC() { + stop_processor(); + finalize(); + } + + void initialize(); + void finalize(); + + void start_processor(); + void stop_processor(); + + bool going_down(); + + // Set Up logging prefix for GC + CephContext *get_cct() const override { return cct; } + unsigned get_subsys() const; + std::ostream& gen_prefix(std::ostream& out) const; +}; + +#endif diff --git a/src/rgw/rgw_sal.cc b/src/rgw/rgw_sal.cc index 88dae299a8cc8..3eb1e2e05eeb4 100644 --- a/src/rgw/rgw_sal.cc +++ b/src/rgw/rgw_sal.cc @@ -117,7 +117,13 @@ rgw::sal::Store* StoreManager::init_storage_provider(const DoutPrefixProvider* d ldpp_dout(dpp, 0) << "newMotrStore() failed!" << dendl; return store; } - ((rgw::sal::MotrStore *)store)->init_metadata_cache(dpp, cct, use_cache); + + if ((*(rgw::sal::MotrStore*)store).set_use_cache(use_cache) + .set_run_gc_thread(use_gc_thread) + .initialize(cct, dpp) < 0) { + delete store; + return nullptr; + } return store; } diff --git a/src/rgw/rgw_sal_motr.cc b/src/rgw/rgw_sal_motr.cc index 955276905b9e1..40b55f74dc157 100644 --- a/src/rgw/rgw_sal_motr.cc +++ b/src/rgw/rgw_sal_motr.cc @@ -1507,12 +1507,57 @@ int MotrBucket::abort_multiparts(const DoutPrefixProvider *dpp, CephContext *cct return 0; } -void MotrStore::finalize(void) -{ +void MotrStore::finalize(void) { + // stop gc worker threads + stop_gc(); // close connection with motr m0_client_fini(this->instance, true); } +MotrStore& MotrStore::set_run_gc_thread(bool _use_gc_thread) { + use_gc_thread = _use_gc_thread; + return *this; +} + +MotrStore& MotrStore::set_use_cache(bool _use_cache) { + use_cache = _use_cache; + return *this; +} + +int MotrStore::initialize(CephContext *cct, const DoutPrefixProvider *dpp) { + // Create metadata objects and set enabled=use_cache value + int rc = init_metadata_cache(dpp, cct); + if (rc != 0) { + ldpp_dout(dpp, 0) << __func__ << ": Metadata cache init failed " << + "with rc = " << rc << dendl; + return rc; + } + + if (use_gc_thread) { + // Create MotrGC object and start GCWorker threads + int rc = create_gc(); + if (rc != 0) + ldpp_dout(dpp, 0) << __func__ << ": Failed to Create MotrGC " << + "with rc = " << rc << dendl; + } + return rc; +} + +int MotrStore::create_gc() { + int ret = 0; + motr_gc = std::make_unique(cctx, this); + motr_gc->initialize(); + motr_gc->start_processor(); + return ret; +} + +void MotrStore::stop_gc() { + if (motr_gc) { + motr_gc->stop_processor(); + motr_gc->finalize(); + } +} + uint64_t MotrStore::get_new_req_id() { uint64_t req_id = ceph::util::generate_random_number(); @@ -5438,7 +5483,7 @@ std::string MotrStore::get_cluster_id(const DoutPrefixProvider* dpp, optional_y } int MotrStore::init_metadata_cache(const DoutPrefixProvider *dpp, - CephContext *cct, bool use_cache) + CephContext *cct) { this->obj_meta_cache = new MotrMetaCache(dpp, cct); this->get_obj_meta_cache()->set_enabled(use_cache); diff --git a/src/rgw/rgw_sal_motr.h b/src/rgw/rgw_sal_motr.h index f79cc983b9a67..3fa0c8cc6d456 100644 --- a/src/rgw/rgw_sal_motr.h +++ b/src/rgw/rgw_sal_motr.h @@ -32,8 +32,11 @@ extern "C" { #include "rgw_role.h" #include "rgw_multi.h" #include "rgw_putobj_processor.h" +#include "motr/gc/gc.h" typedef void (*progress_cb)(off_t, void*); +class MotrGC; + namespace rgw::sal { class MotrStore; @@ -982,6 +985,10 @@ class MotrStore : public Store { MotrMetaCache* user_cache; MotrMetaCache* bucket_inst_cache; + std::unique_ptr motr_gc; + bool use_gc_thread; + bool use_cache; + public: CephContext *cctx; struct m0_client *instance; @@ -1097,7 +1104,12 @@ class MotrStore : public Store { uint64_t olh_epoch, const std::string& unique_tag) override; + virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp); virtual void finalize(void) override; + int create_gc(); + void stop_gc(); + MotrStore& set_run_gc_thread(bool _use_gc_thread); + MotrStore& set_use_cache(bool _use_cache); virtual CephContext *ctx(void) override { return cctx; @@ -1133,7 +1145,7 @@ class MotrStore : public Store { int delete_access_key(const DoutPrefixProvider *dpp, optional_yield y, std::string access_key); int store_email_info(const DoutPrefixProvider *dpp, optional_yield y, MotrEmailInfo& email_info); - int init_metadata_cache(const DoutPrefixProvider *dpp, CephContext *cct, bool use_cache); + int init_metadata_cache(const DoutPrefixProvider *dpp, CephContext *cct); MotrMetaCache* get_obj_meta_cache() {return obj_meta_cache;} MotrMetaCache* get_user_cache() {return user_cache;} MotrMetaCache* get_bucket_inst_cache() {return bucket_inst_cache;} From 7fa63387a3663dc517e86ad2667916d86080fb6c Mon Sep 17 00:00:00 2001 From: Jeet Jain Date: Wed, 27 Jul 2022 21:07:05 +0530 Subject: [PATCH 2/7] rgw: [CORTX-33179] GC Metadata Implementation (#363) Implement GC metadata structure & corresponding functions Added motr_gc_obj_info structure to hold gc metadata for an object marked for deletion. Added logic for creating GC indices based on config parameters. Signed-off-by: Jeet Jain --- src/rgw/motr/gc/gc.cc | 45 +++++++++++++++----- src/rgw/motr/gc/gc.h | 96 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 127 insertions(+), 14 deletions(-) diff --git a/src/rgw/motr/gc/gc.cc b/src/rgw/motr/gc/gc.cc index 4fb3f9e8ae62f..4c0fc2bb006e2 100644 --- a/src/rgw/motr/gc/gc.cc +++ b/src/rgw/motr/gc/gc.cc @@ -35,18 +35,26 @@ void *MotrGC::GCWorker::entry() { void MotrGC::initialize() { // fetch max gc indices from config - auto max_indices = std::min(cct->_conf->rgw_gc_max_objs, - GC_MAX_SHARDS_PRIME); - ldpp_dout(this, 50) << __func__ << ": max_indices = " << max_indices << dendl; - + uint64_t rgw_gc_max_objs = cct->_conf->rgw_gc_max_objs; + if(rgw_gc_max_objs) { + rgw_gc_max_objs = pow(2, ceil(log2(rgw_gc_max_objs))); + max_indices = static_cast(std::min(rgw_gc_max_objs, + GC_MAX_QUEUES)); + } + else { + max_indices = GC_DEFAULT_QUEUES; + } index_names.reserve(max_indices); - for (int i = 0; i < max_indices; i++) { - // Append index name to the gc index list - index_names.push_back(gc_index_prefix + std::to_string(i)); - - // [To be Implemented] create index in motr dix + ldpp_dout(this, 50) << __func__ << ": max_indices = " << max_indices << dendl; + for (int ind_suf = 0; ind_suf < max_indices; ind_suf++) { + std::string iname = gc_index_prefix + "." + std::to_string(ind_suf); + int rc = static_cast(store)->create_motr_idx_by_name(iname); + if (rc < 0 && rc != -EEXIST){ + ldout(cct, 0) << "ERROR: GC index creation failed with rc: " << rc << dendl; + break; + } + index_names.push_back(iname); } - } void MotrGC::finalize() { @@ -87,6 +95,23 @@ bool MotrGC::going_down() { return down_flag; } +int MotrGC::dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_obj_info obj) +{ + int rc; + bufferlist bl; + rc = static_cast(store)->do_idx_op_by_name(iname, + M0_IC_DEL, obj.tag, bl); + if (rc < 0){ + ldout(cct, 0) << "ERROR: failed to delete tag entry "<(store)->do_idx_op_by_name(iname, + M0_IC_DEL, std::to_string(obj.time), bl); + if (rc < 0 && rc != -EEXIST){ + ldout(cct, 0) << "ERROR: failed to delete time entry "< #include -const int64_t GC_MAX_SHARDS_PRIME = 65521; -static std::string gc_index_prefix = "gc."; -static std::string gc_thread_prefix = "gc_thread_"; +const uint64_t GC_DEFAULT_QUEUES = 64; +const uint64_t GC_MAX_QUEUES = 4096; +static std::string gc_index_prefix = "motr.rgw.gc"; +static std::string gc_thread_prefix = "motr_gc_"; + +struct Meta +{ + struct m0_uint128 oid = {}; + struct m0_fid pver = {}; + uint64_t layout_id = 0; + + void encode(bufferlist &bl) const + { + ENCODE_START(5, 5, bl); + encode(oid.u_hi, bl); + encode(oid.u_lo, bl); + encode(pver.f_container, bl); + encode(pver.f_key, bl); + encode(layout_id, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator &bl) + { + DECODE_START(5, bl); + decode(oid.u_hi, bl); + decode(oid.u_lo, bl); + decode(pver.f_container, bl); + decode(pver.f_key, bl); + decode(layout_id, bl); + DECODE_FINISH(bl); + } +}; + +struct motr_gc_obj_info +{ + std::string tag; // gc obj unique identifier + std::string name; // fully qualified object name + Meta mobj; // motr obj + std::time_t time; // deletion time + std::uint64_t size; // size of obj + std::uint64_t size_actual; // size of disk + bool is_multipart; // flag to indicate if object is multipart + std::string multipart_iname; // part index name + + motr_gc_obj_info() {} + motr_gc_obj_info(std::string _tag, std::string _name, Meta _mobj, + std::time_t _time, std::uint64_t _size, std::uint64_t _size_actual, + bool _is_multipart, std::string _multipart_iname) + : tag(std::move(_tag)), name(std::move(_name)), mobj(std::move(_mobj)), + time(std::move(_time)), size(std::move(_size)), size_actual(std::move(_size_actual)), + is_multipart(std::move(_is_multipart)), multipart_iname(std::move(_multipart_iname)) {} + + void encode(bufferlist &bl) const + { + ENCODE_START(12, 2, bl); + encode(tag, bl); + encode(name, bl); + encode(mobj.oid.u_hi, bl); + encode(mobj.oid.u_lo, bl); + encode(mobj.pver.f_container, bl); + encode(mobj.pver.f_key, bl); + encode(mobj.layout_id, bl); + encode(time, bl); + encode(size, bl); + encode(size_actual, bl); + encode(is_multipart, bl); + encode(multipart_iname, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator &bl) + { + DECODE_START_LEGACY_COMPAT_LEN_32(12, 2, 2, bl); + decode(tag, bl); + decode(name, bl); + decode(mobj.oid.u_hi, bl); + decode(mobj.oid.u_lo, bl); + decode(mobj.pver.f_container, bl); + decode(mobj.pver.f_key, bl); + decode(mobj.layout_id, bl); + decode(time, bl); + decode(size, bl); + decode(size_actual, bl); + decode(is_multipart, bl); + decode(multipart_iname, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(motr_gc_obj_info); class MotrGC : public DoutPrefixProvider { private: @@ -55,6 +142,7 @@ class MotrGC : public DoutPrefixProvider { void *entry() override; void stop(); }; + std::vector> workers; MotrGC(CephContext *_cct, rgw::sal::Store* _store) @@ -70,7 +158,7 @@ class MotrGC : public DoutPrefixProvider { void start_processor(); void stop_processor(); - + int dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_obj_info obj); bool going_down(); // Set Up logging prefix for GC From e76cb450cd3ca1da3ef803d3bcd867d3e0cb2ac3 Mon Sep 17 00:00:00 2001 From: Sachin Punadikar Date: Tue, 2 Aug 2022 11:03:06 +0530 Subject: [PATCH 3/7] GC thread processing logic (#371) The GC thread will aquire GC index and process the object entries for deletion either upto the count governed by "rgw_gc_max_trim_chunk" or time allowed by "rgw_gc_processor_max_time". Signed-off-by: Sachin Punadikar --- src/rgw/motr/gc/gc.cc | 73 ++++++++++++++++++++++++++++++++++++++++--- src/rgw/motr/gc/gc.h | 9 ++++-- 2 files changed, 75 insertions(+), 7 deletions(-) diff --git a/src/rgw/motr/gc/gc.cc b/src/rgw/motr/gc/gc.cc index 4c0fc2bb006e2..e564be359d026 100644 --- a/src/rgw/motr/gc/gc.cc +++ b/src/rgw/motr/gc/gc.cc @@ -14,16 +14,61 @@ */ #include "motr/gc/gc.h" +#include void *MotrGC::GCWorker::entry() { std::unique_lock lk(lock); ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix << worker_id << " started." << dendl; + // Get random number to lock the GC index. + uint32_t my_index = ceph::util::generate_random_number(0, \ + motr_gc->max_indices); + // This is going to be endless loop do { - ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix - << worker_id << " iteration" << dendl; + << worker_id << " Iteration Started" << dendl; + + std::string iname = ""; + // Get lock on an GC index + int rc = motr_gc->get_locked_gc_index(my_index); + + // Lock has been aquired, start the timer + std::time_t start_time = std::time(nullptr); + std::time_t end_time = start_time + \ + cct->_conf->rgw_gc_processor_max_time - 10; + std::time_t current_time = std::time(nullptr); + + if (rc == 0) { + uint32_t processed_count = 0; + // form the index name + iname = gc_index_prefix + "." + std::to_string(my_index); + ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix + << worker_id << " Working on GC Queue: " << iname << dendl; + // time based while loop + do { + // fetch the next entry from index "iname" + + // check if the object is ready for deletion + // for the motr object + + // delete that object + // rc = dequeue(); + + // remove the entry from iname + + // Exit the loop if required work is complete + processed_count++; + if (processed_count >= motr_gc->max_count) break; + // Update current time + current_time = std::time(nullptr); + } while (current_time < end_time); + // unlock the GC queue + } + my_index++; + if (my_index >= motr_gc->max_indices) my_index = 0; + // sleep for remaining duration + if (end_time > current_time) sleep(end_time - current_time); cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10)); } while (! motr_gc->going_down()); @@ -35,7 +80,7 @@ void *MotrGC::GCWorker::entry() { void MotrGC::initialize() { // fetch max gc indices from config - uint64_t rgw_gc_max_objs = cct->_conf->rgw_gc_max_objs; + uint32_t rgw_gc_max_objs = cct->_conf->rgw_gc_max_objs; if(rgw_gc_max_objs) { rgw_gc_max_objs = pow(2, ceil(log2(rgw_gc_max_objs))); max_indices = static_cast(std::min(rgw_gc_max_objs, @@ -55,10 +100,15 @@ void MotrGC::initialize() { } index_names.push_back(iname); } + // Get the max count of objects to be deleted in 1 processing cycle + max_count = cct->_conf->rgw_gc_max_trim_chunk; + if (max_count == 0) max_count = GC_DEFAULT_COUNT; } void MotrGC::finalize() { - // [To be Implemented] undo steps from initialize stage + // We do not delete GC queues or related lock entry. + // GC queue & lock entries would be needed after RGW / cluster restart, + // so do not delete those. } void MotrGC::start_processor() { @@ -112,6 +162,21 @@ int MotrGC::dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_ob return rc; } +int MotrGC::get_locked_gc_index(uint32_t& rand_ind) +{ + int rc = -1; + uint32_t new_index = 0; + // attempt to lock GC starting with passed in index + for (int ind = 0; ind < max_indices; ind++) + { + new_index = (ind + rand_ind) % max_indices; + // try locking index + // on sucess mark rc as 0 + } + rand_ind = new_index; + return rc; +} + unsigned MotrGC::get_subsys() const { return dout_subsys; } diff --git a/src/rgw/motr/gc/gc.h b/src/rgw/motr/gc/gc.h index b5304a939bf9d..9a27786216aec 100644 --- a/src/rgw/motr/gc/gc.h +++ b/src/rgw/motr/gc/gc.h @@ -22,8 +22,9 @@ #include #include -const uint64_t GC_DEFAULT_QUEUES = 64; -const uint64_t GC_MAX_QUEUES = 4096; +const uint32_t GC_DEFAULT_QUEUES = 64; +const uint32_t GC_DEFAULT_COUNT = 256; +const uint32_t GC_MAX_QUEUES = 4096; static std::string gc_index_prefix = "motr.rgw.gc"; static std::string gc_thread_prefix = "motr_gc_"; @@ -117,7 +118,8 @@ class MotrGC : public DoutPrefixProvider { private: CephContext *cct; rgw::sal::Store *store; - int max_indices = 0; + uint32_t max_indices = 0; + uint32_t max_count = 0; std::vector index_names; std::atomic down_flag = false; @@ -159,6 +161,7 @@ class MotrGC : public DoutPrefixProvider { void start_processor(); void stop_processor(); int dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_obj_info obj); + int get_locked_gc_index(uint32_t& rand_ind); bool going_down(); // Set Up logging prefix for GC From 6d0fb036f597367fd647aa6d05aba0fcb15aded7 Mon Sep 17 00:00:00 2001 From: Sumedh Anantrao Kulkarni Date: Tue, 2 Aug 2022 22:42:56 +0530 Subject: [PATCH 4/7] motr_sync: [CORTX-33691] Define and implement rgw_motr synchronization (#373) * rgw_sal_motr: [CORTX-33691] Defines abstract synchronization primitives and implements it for Motr GC across multiple RGW instances. Signed-off-by: Dattaprasad Govekar * motr_sync_impl: [CORTX-33691] update lock() & remove_lock() methods Update and refactor the MotrLock::lock() and MotrKVLockProvider::remove_lock() methods. Signed-off-by: Sumedh Anantrao Kulkarni Co-authored-by: Dattaprasad Govekar --- src/rgw/CMakeLists.txt | 2 +- src/rgw/motr/gc/gc.cc | 4 +- src/rgw/motr/sync/motr_sync.h | 99 ++++++++++++++++++ src/rgw/motr/sync/motr_sync_impl.cc | 153 ++++++++++++++++++++++++++++ src/rgw/motr/sync/motr_sync_impl.h | 118 +++++++++++++++++++++ 5 files changed, 373 insertions(+), 3 deletions(-) create mode 100644 src/rgw/motr/sync/motr_sync.h create mode 100644 src/rgw/motr/sync/motr_sync_impl.cc create mode 100644 src/rgw/motr/sync/motr_sync_impl.h diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index efc5e1ed62b8d..dacb1783f9f2e 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -177,7 +177,7 @@ if(WITH_RADOSGW_DBSTORE) list(APPEND librgw_common_srcs rgw_sal_dbstore.cc) endif() if(WITH_RADOSGW_MOTR) - list(APPEND librgw_common_srcs rgw_sal_motr.cc motr/gc/gc.cc) + list(APPEND librgw_common_srcs rgw_sal_motr.cc motr/gc/gc.cc motr/sync/motr_sync_impl.cc) endif() if(WITH_JAEGER) list(APPEND librgw_common_srcs rgw_tracer.cc) diff --git a/src/rgw/motr/gc/gc.cc b/src/rgw/motr/gc/gc.cc index e564be359d026..e8cfb71ca9560 100644 --- a/src/rgw/motr/gc/gc.cc +++ b/src/rgw/motr/gc/gc.cc @@ -91,7 +91,7 @@ void MotrGC::initialize() { } index_names.reserve(max_indices); ldpp_dout(this, 50) << __func__ << ": max_indices = " << max_indices << dendl; - for (int ind_suf = 0; ind_suf < max_indices; ind_suf++) { + for (uint32_t ind_suf = 0; ind_suf < max_indices; ind_suf++) { std::string iname = gc_index_prefix + "." + std::to_string(ind_suf); int rc = static_cast(store)->create_motr_idx_by_name(iname); if (rc < 0 && rc != -EEXIST){ @@ -167,7 +167,7 @@ int MotrGC::get_locked_gc_index(uint32_t& rand_ind) int rc = -1; uint32_t new_index = 0; // attempt to lock GC starting with passed in index - for (int ind = 0; ind < max_indices; ind++) + for (uint32_t ind = 0; ind < max_indices; ind++) { new_index = (ind + rand_ind) % max_indices; // try locking index diff --git a/src/rgw/motr/sync/motr_sync.h b/src/rgw/motr/sync/motr_sync.h new file mode 100644 index 0000000000000..ed2fa5824ffde --- /dev/null +++ b/src/rgw/motr/sync/motr_sync.h @@ -0,0 +1,99 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=2 sw=2 expandtab ft=cpp + +/* + * Abstract thread/process synchronization for the CORTX Motr backend + * + * Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MOTR_SYNCHROZATION__ +#define __MOTR_SYNCHROZATION__ + +#include + +#include "include/types.h" +#include "include/utime.h" +#include "rgw_sal_motr.h" + +// Define MOTR_EXCLUSIVE_RW_LOCK only when system needs exclusive read/write lock +#define MOTR_EXCLUSIVE_RW_LOCK 1 + +class MotrLockProvider; + +struct motr_locker_info_t +{ + std::string cookie; // unique id of caller + utime_t expiration; // expiration: non-zero means epoch of locker expiration + std::string description; // description: locker description, may be empty + + motr_locker_info_t() {} + motr_locker_info_t(const utime_t& _e, std::string& _c, const std::string& _d) + : expiration(_e), cookie(_c), description(_d) {} + void encode(ceph::buffer::list &bl) const { + ENCODE_START(1, 1, bl); + encode(cookie, bl); + encode(expiration, bl); + encode(description, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator &bl) { + DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl); + decode(cookie, bl); + decode(expiration, bl); + decode(description, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(motr_locker_info_t) + +struct motr_lock_info_t { +public: + std::map lockers; + void encode(ceph::buffer::list &bl) const { + ENCODE_START(1, 1, bl); + encode(lockers, bl); + ENCODE_FINISH(bl); + } + void decode(ceph::buffer::list::const_iterator &bl) { + DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl); + decode(lockers, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(motr_lock_info_t) + +enum class MotrLockType { + NONE = 0, + EXCLUSIVE = 1, + SHARED = 2, + EXCLUSIVE_EPHEMERAL = 3, /* lock object is removed @ unlock */ +}; + +// Abstract interface for Motr named synchronization +class MotrSync { +public: + virtual void initialize(std::shared_ptr lock_provider) = 0; + virtual int lock(const std::string& lock_name, MotrLockType lock_type, + utime_t lock_duration, const std::string& locker_id) = 0; + virtual int unlock(const std::string& lock_name, MotrLockType lock_type, + const std::string& locker_id) = 0; +}; + +// Abstract interface for entity that implements backend for lock objects +class MotrLockProvider { +public: + virtual int read_lock(const std::string& lock_name, + motr_lock_info_t* lock_info) = 0; + virtual int write_lock(const std::string& lock_name, + motr_lock_info_t* lock_info, bool update) = 0; + virtual int remove_lock(const std::string& lock_name, + const std::string& locker_id) = 0; +}; +#endif __MOTR_SYNCHROZATION__ diff --git a/src/rgw/motr/sync/motr_sync_impl.cc b/src/rgw/motr/sync/motr_sync_impl.cc new file mode 100644 index 0000000000000..b0fd036043fd3 --- /dev/null +++ b/src/rgw/motr/sync/motr_sync_impl.cc @@ -0,0 +1,153 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=2 sw=2 expandtab ft=cpp + +/* + * GC thread/process synchronization for the CORTX Motr backend + * + * Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "motr/sync/motr_sync_impl.h" + +void MotrLock::initialize(std::shared_ptr lock_provider) { + _lock_provider = lock_provider; +} + +int MotrLock::lock(const std::string& lock_name, + MotrLockType lock_type, + utime_t lock_duration, + const std::string& locker_id = "") { + if (!_lock_provider || (MotrLockType::EXCLUSIVE != lock_type && + locker_id.empty())) { + return -EINVAL; + } + int rc = 0; + motr_lock_info_t lock_obj; + rc = _lock_provider->read_lock(lock_name, &lock_obj); + if (rc != 0 && rc != -ENOENT) + return rc; + if (rc == 0) { + // lock entry is present. Check if this is a stale/expired lock + utime_t now = ceph_clock_now(); + auto iter = lock_obj.lockers.begin(); + while (iter != lock_obj.lockers.end()) { + struct motr_locker_info_t &info = iter->second; + if (!info.expiration.is_zero() && info.expiration < now) { + // locker has expired; delete it + iter = lock_obj.lockers.erase(iter); + } else { + ++iter; + } + } + // remove the lock if no locker is left + if (lock_obj.lockers.empty()) + _lock_provider->remove_lock(lock_name, locker_id); + } + + if (!lock_obj.lockers.empty() && MotrLockType::EXCLUSIVE == lock_type) { + // lock is not available + return -EBUSY; + } else { + // Try to acquire lock object + struct motr_locker_info_t locker; + locker.cookie = locker_id; + locker.expiration = ceph_clock_now() + lock_duration; + locker.description = ""; + // Update lock entry with current locker and lock the resource + lock_obj.lockers.insert( + std::pair(locker_id, locker)); + rc = _lock_provider->write_lock(lock_name, &lock_obj, false); + if (rc < 0) { + // Failed to acquire lock object; possibly, already acquired + return -EBUSY; + } + } + return rc; +} + +int MotrLock::unlock(const std::string& lock_name, + MotrLockType lock_type, const std::string& locker_id) { + return _lock_provider->remove_lock(lock_name, locker_id); +} + +int MotrKVLockProvider::initialize(const DoutPrefixProvider* dpp, + rgw::sal::MotrStore* _s, + std::string& lock_index_name) { + _dpp = dpp; + _store = _s; + _lock_index = lock_index_name; + if (!_store || lock_index_name.empty()) { + return -EINVAL; + } + return _store->create_motr_idx_by_name(lock_index_name); +} + +int MotrKVLockProvider::read_lock(const std::string& lock_name, + motr_lock_info_t* lock_info) { + if (!_store || _lock_index.empty() || !lock_info) { + return -EINVAL; + } + int rc = 0; + bufferlist bl; + rc = _store->do_idx_op_by_name(_lock_index, M0_IC_GET, lock_name, bl); + if (rc != 0) { + return rc; + } + bufferlist::const_iterator bitr = bl.begin(); + lock_info->decode(bitr); + return 0; +} + +int MotrKVLockProvider::write_lock(const std::string& lock_name, + motr_lock_info_t* lock_info, bool update) { + if (!_store || _lock_index.empty() || !lock_info) { + return -EINVAL; + } + bufferlist bl; + lock_info->encode(bl); + int rc = + _store->do_idx_op_by_name(_lock_index, M0_IC_PUT, lock_name, bl, update); + return rc; +} + +int MotrKVLockProvider::remove_lock(const std::string& lock_name, + const std::string& locker_id = "") { + if (!_store || _lock_index.empty() || lock_name.empty()) { + return -EINVAL; + } + motr_lock_info_t lock_info; + bufferlist bl; + int rc = 0; + bool del_lock_entry = false; + rc = read_lock(lock_name, &lock_info); + if (rc != 0) + return (rc == -ENOENT) ? 0 : rc; + + if (locker_id.empty()) { + // this is exclusive lock + del_lock_entry = true; + } else { + auto it = lock_info.lockers.find(locker_id); + if (it != lock_info.lockers.end()) { + lock_info.lockers.erase(it); + } + if (lock_info.lockers.empty()) { + del_lock_entry = true; + } + } + if (del_lock_entry) { + // all lockers removed; now delete lock object + rc = _store->do_idx_op_by_name(_lock_index, M0_IC_DEL, lock_name, bl); + } else { + // update the lock entry + lock_info.encode(bl); + rc = _store->do_idx_op_by_name(_lock_index, M0_IC_PUT, lock_name, bl); + } + return rc; +} diff --git a/src/rgw/motr/sync/motr_sync_impl.h b/src/rgw/motr/sync/motr_sync_impl.h new file mode 100644 index 0000000000000..2bd774945ca55 --- /dev/null +++ b/src/rgw/motr/sync/motr_sync_impl.h @@ -0,0 +1,118 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=2 sw=2 expandtab ft=cpp + +/* + * GC thread/process synchronization for the CORTX Motr backend + * + * Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MOTR_LOCK__ +#define __MOTR_LOCK__ + +/* +Usage: + Approach 1. + 1. Instantiate one of the LockProvider Implementation. + example: + std::unique_ptr lock_provider; + + 2. Initialize the lock provider with global lock index table + example: + lock_provider.initialize(pp, store, "motr.gc.index.locks"); + + The caller can do this during the process/thread + start-up/initialization; if "initialize()" returns < 0, the + caller is expected to stop further initialization if locking is esential + in the system. In multi-process environment, each process can call + "MotrKVLockProvider::initialize" during it's start-up, providing the same + name for the global lock index. + + 3. Instantiate one of the Locker Implmentations. + example: + std::unique_ptr motr_lock; + + 4. Initialize the Locker class with LockProvider Object. + example: + motr_lock.initialize(lock_provider); + + 5. Lock the required resource using Locker object + example: + motr_lock.lock(resource_name, lock_type, + lock_duration, locker_id); + + Any -ve value returned by MotrLock::lock() indicates resource + can't be locked; a value 0 indicates success. + + 6. Unlock the resource using the same Locker Object. + exmaple: + motr_lock.unlock(resource_name, lock_type, locker_id); + + Approach 2. + 1. Follow step (1) & step (2) in Approach 1. + 2. Call create_motr_Lock_instance(), passing it lock provider instance created + in step (1) above. + 3. Call get_lock_instance() wherever lock is needed. + +Note: Presently, locking framework supports EXCLUSIVE lock; it means +caller needs to set MotrLockType::EXCLUSIVE while calling MotrLock::lock(), +and do not pass locker_id. +*/ + +#include "rgw_sal_motr.h" +#include "motr/sync/motr_sync.h" + +class MotrLock : public MotrSync { +private: + std::shared_ptr _lock_provider; + +public: + virtual void initialize(std::shared_ptr lock_provider) override; + virtual int lock(const std::string& lock_name, MotrLockType lock_type, + utime_t lock_duration, const std::string& locker_id) override; + virtual int unlock(const std::string& lock_name, MotrLockType lock_type, + const std::string& locker_id) override; +}; + +class MotrKVLockProvider : public MotrLockProvider { +private: + const DoutPrefixProvider* _dpp; + rgw::sal::MotrStore* _store; + std::string _lock_index; +public: + int initialize(const DoutPrefixProvider* dpp, rgw::sal::MotrStore* _s, + std::string& lock_index_name); + virtual int read_lock(const std::string& lock_name, + motr_lock_info_t* lock_info) override; + virtual int write_lock(const std::string& lock_name, + motr_lock_info_t* lock_info, + bool update = false) override; + virtual int remove_lock(const std::string& lock_name, + const std::string& locker_id) override; +}; + +// Creates a global instance of MotrLock that can be used by caller +// This needs to be called by a single main thread of caller. +std::shared_ptr g_motr_lock; +std::shared_ptr& create_motr_Lock_instance( + std::shared_ptr lock_provider) { + static bool initialize = false; + if (!initialize) { + g_motr_lock = std::make_shared(); + g_motr_lock->initialize(lock_provider); + initialize = true; + return g_motr_lock; + } +} + +std::shared_ptr& get_lock_instance() { + return g_motr_lock; +} + +#endif From 0fefbda8c77ef8fbdc29f36de42e39e0334640a0 Mon Sep 17 00:00:00 2001 From: Sumedh Anantrao Kulkarni Date: Mon, 8 Aug 2022 16:54:46 +0530 Subject: [PATCH 5/7] rgw_sal_motr, motr_gc: [CORTX-32689] Implement MotrGC::enqueue() (#379) * rgw_sal_motr, motr_gc: [CORTX-32689] Implement MotrGC::enqueue() * Push {0_ObjTag: motr_gc_obj_info} and {1_ExpiryTime: motr_gc_obj_info} entry to the gc queues, i.e. motr dix. * Call gc->enqueue() method on simple object delete request without actually deleting the obj. Signed-off-by: Sumedh Anantrao Kulkarni * motr_gc: [CORTX-32689] Implement MotrGC::list() function MotrGC::list() goes through every gc queue and lists the adds the pending delete requests in resultant vector. Interface to list objs using admin tool or rest api's is yet to be developed. Signed-off-by: Sumedh Anantrao Kulkarni --- src/rgw/motr/gc/gc.cc | 120 ++++++++++++++++++++++++++++++++-------- src/rgw/motr/gc/gc.h | 39 +++++++------ src/rgw/rgw_sal_motr.cc | 24 ++++++-- src/rgw/rgw_sal_motr.h | 4 +- 4 files changed, 143 insertions(+), 44 deletions(-) diff --git a/src/rgw/motr/gc/gc.cc b/src/rgw/motr/gc/gc.cc index e8cfb71ca9560..4b16afa84eaed 100644 --- a/src/rgw/motr/gc/gc.cc +++ b/src/rgw/motr/gc/gc.cc @@ -22,8 +22,8 @@ void *MotrGC::GCWorker::entry() { << worker_id << " started." << dendl; // Get random number to lock the GC index. - uint32_t my_index = ceph::util::generate_random_number(0, \ - motr_gc->max_indices); + uint32_t my_index = \ + ceph::util::generate_random_number(0, motr_gc->max_indices - 1); // This is going to be endless loop do { ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix @@ -42,13 +42,12 @@ void *MotrGC::GCWorker::entry() { if (rc == 0) { uint32_t processed_count = 0; // form the index name - iname = gc_index_prefix + "." + std::to_string(my_index); + iname = motr_gc->index_names[my_index]; ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix - << worker_id << " Working on GC Queue: " << iname << dendl; + << worker_id << " Working on GC Queue: " << iname << dendl; // time based while loop do { // fetch the next entry from index "iname" - // check if the object is ready for deletion // for the motr object @@ -62,13 +61,13 @@ void *MotrGC::GCWorker::entry() { if (processed_count >= motr_gc->max_count) break; // Update current time current_time = std::time(nullptr); - } while (current_time < end_time); + } while (current_time < end_time && !motr_gc->going_down()); // unlock the GC queue } - my_index++; - if (my_index >= motr_gc->max_indices) my_index = 0; + my_index = (my_index + 1) % motr_gc->max_indices; + // sleep for remaining duration - if (end_time > current_time) sleep(end_time - current_time); + // if (end_time > current_time) sleep(end_time - current_time); cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10)); } while (! motr_gc->going_down()); @@ -81,10 +80,10 @@ void *MotrGC::GCWorker::entry() { void MotrGC::initialize() { // fetch max gc indices from config uint32_t rgw_gc_max_objs = cct->_conf->rgw_gc_max_objs; - if(rgw_gc_max_objs) { + if (rgw_gc_max_objs) { rgw_gc_max_objs = pow(2, ceil(log2(rgw_gc_max_objs))); max_indices = static_cast(std::min(rgw_gc_max_objs, - GC_MAX_QUEUES)); + GC_MAX_QUEUES)); } else { max_indices = GC_DEFAULT_QUEUES; @@ -93,7 +92,7 @@ void MotrGC::initialize() { ldpp_dout(this, 50) << __func__ << ": max_indices = " << max_indices << dendl; for (uint32_t ind_suf = 0; ind_suf < max_indices; ind_suf++) { std::string iname = gc_index_prefix + "." + std::to_string(ind_suf); - int rc = static_cast(store)->create_motr_idx_by_name(iname); + int rc = store->create_motr_idx_by_name(iname); if (rc < 0 && rc != -EEXIST){ ldout(cct, 0) << "ERROR: GC index creation failed with rc: " << rc << dendl; break; @@ -103,6 +102,10 @@ void MotrGC::initialize() { // Get the max count of objects to be deleted in 1 processing cycle max_count = cct->_conf->rgw_gc_max_trim_chunk; if (max_count == 0) max_count = GC_DEFAULT_COUNT; + + // set random starting index for enqueue of delete requests + enqueue_index = \ + ceph::util::generate_random_number(0, max_indices - 1); } void MotrGC::finalize() { @@ -130,6 +133,8 @@ void MotrGC::stop_processor() { // gracefully shutdown all the gc threads. down_flag = true; for (auto& worker : workers) { + ldout(cct, 20) << "stopping and joining " + << gc_thread_prefix << worker->get_id() << dendl; worker->stop(); worker->join(); } @@ -145,38 +150,109 @@ bool MotrGC::going_down() { return down_flag; } -int MotrGC::dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_obj_info obj) -{ +int MotrGC::enqueue(motr_gc_obj_info obj) { + int rc = 0; + // create ๐Ÿ”‘'s: + // - 1_{obj.time + min_wait_time} + // - 0_{obj.tag} + std::string key1 = obj_exp_time_prefix + + std::to_string(obj.time + cct->_conf->rgw_gc_obj_min_wait); + std::string key2 = obj_tag_prefix + obj.tag; + + bufferlist bl; + obj.encode(bl); + // push {1_ExpiryTime: motr_gc_obj_info} to the gc queue.๐Ÿ“ฅ + rc = store->do_idx_op_by_name(index_names[enqueue_index], + M0_IC_PUT, key1, bl); + if (rc < 0) + return rc; + + // push {0_ObjTag: motr_gc_obj_info} to the gc queue.๐Ÿ“ฅ + rc = store->do_idx_op_by_name(index_names[enqueue_index], + M0_IC_PUT, key2, bl); + if (rc < 0) { + // cleanup ๐Ÿงน: pop key1 ๐Ÿ“ค + store->do_idx_op_by_name(index_names[enqueue_index], + M0_IC_DEL, key1, bl); + // we are avoiding delete retry on failure since, + // while processing the gc entry we will ignore the + // 1_ExpiryTime entry if corresponding 0_ObjTag entry is absent. + return rc; + } + + // rolling increment the enqueue_index + enqueue_index = (enqueue_index + 1) % max_indices; + return rc; +} + +int MotrGC::dequeue(std::string iname, motr_gc_obj_info obj) { int rc; bufferlist bl; - rc = static_cast(store)->do_idx_op_by_name(iname, - M0_IC_DEL, obj.tag, bl); + rc = store->do_idx_op_by_name(iname, M0_IC_DEL, obj.tag, bl); if (rc < 0){ ldout(cct, 0) << "ERROR: failed to delete tag entry "<(store)->do_idx_op_by_name(iname, - M0_IC_DEL, std::to_string(obj.time), bl); + rc = store->do_idx_op_by_name( + iname, M0_IC_DEL, std::to_string(obj.time), bl); if (rc < 0 && rc != -EEXIST){ ldout(cct, 0) << "ERROR: failed to delete time entry "<& gc_entries) { + int rc = 0; + int max_entries = 1000; + for (uint32_t i = 0; i < max_indices; i++) { + std::vector keys(max_entries + 1); + std::vector vals(max_entries + 1); + std::string marker = ""; + bool truncated = false; + ldout(cct, 70) << "listing entries for " << index_names[i] << dendl; + do { + if (!marker.empty()) + keys[0] = marker; + rc = store->next_query_by_name(index_names[i], keys, vals, + obj_tag_prefix); + if (rc < 0) { + ldpp_dout(this, 0) <<__func__<<": ERROR: NEXT query failed. rc=" + << rc << dendl; + return rc; + } + if (rc == max_entries + 1) { + truncated = true; + marker = keys.back(); + } + for (int j = 0; j < max_entries && !keys[j].empty(); j++) { + bufferlist::const_iterator blitr = vals[j].cbegin(); + motr_gc_obj_info gc_obj; + gc_obj.decode(blitr); + gc_entries.push_back(gc_obj); + ldout(cct, 70) << gc_obj.tag << ", " + << gc_obj.name << ", " + << gc_obj.size << ", " << dendl; + } + } while (truncated); + } + return 0; +} + unsigned MotrGC::get_subsys() const { return dout_subsys; } diff --git a/src/rgw/motr/gc/gc.h b/src/rgw/motr/gc/gc.h index 9a27786216aec..412a5b74cf049 100644 --- a/src/rgw/motr/gc/gc.h +++ b/src/rgw/motr/gc/gc.h @@ -25,17 +25,21 @@ const uint32_t GC_DEFAULT_QUEUES = 64; const uint32_t GC_DEFAULT_COUNT = 256; const uint32_t GC_MAX_QUEUES = 4096; -static std::string gc_index_prefix = "motr.rgw.gc"; -static std::string gc_thread_prefix = "motr_gc_"; +static const std::string gc_index_prefix = "motr.rgw.gc"; +static const std::string gc_thread_prefix = "motr_gc_"; +static const std::string obj_tag_prefix = "0_"; +static const std::string obj_exp_time_prefix = "1_"; -struct Meta -{ +namespace rgw::sal { + class MotrStore; +} + +struct Meta { struct m0_uint128 oid = {}; struct m0_fid pver = {}; uint64_t layout_id = 0; - void encode(bufferlist &bl) const - { + void encode(bufferlist &bl) const { ENCODE_START(5, 5, bl); encode(oid.u_hi, bl); encode(oid.u_lo, bl); @@ -45,8 +49,7 @@ struct Meta ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) - { + void decode(bufferlist::const_iterator &bl) { DECODE_START(5, bl); decode(oid.u_hi, bl); decode(oid.u_lo, bl); @@ -57,8 +60,7 @@ struct Meta } }; -struct motr_gc_obj_info -{ +struct motr_gc_obj_info { std::string tag; // gc obj unique identifier std::string name; // fully qualified object name Meta mobj; // motr obj @@ -76,8 +78,7 @@ struct motr_gc_obj_info time(std::move(_time)), size(std::move(_size)), size_actual(std::move(_size_actual)), is_multipart(std::move(_is_multipart)), multipart_iname(std::move(_multipart_iname)) {} - void encode(bufferlist &bl) const - { + void encode(bufferlist &bl) const { ENCODE_START(12, 2, bl); encode(tag, bl); encode(name, bl); @@ -94,8 +95,7 @@ struct motr_gc_obj_info ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) - { + void decode(bufferlist::const_iterator &bl) { DECODE_START_LEGACY_COMPAT_LEN_32(12, 2, 2, bl); decode(tag, bl); decode(name, bl); @@ -117,9 +117,10 @@ WRITE_CLASS_ENCODER(motr_gc_obj_info); class MotrGC : public DoutPrefixProvider { private: CephContext *cct; - rgw::sal::Store *store; + rgw::sal::MotrStore *store; uint32_t max_indices = 0; uint32_t max_count = 0; + std::atomic enqueue_index; std::vector index_names; std::atomic down_flag = false; @@ -143,11 +144,12 @@ class MotrGC : public DoutPrefixProvider { void *entry() override; void stop(); + int get_id() { return worker_id; } }; std::vector> workers; - MotrGC(CephContext *_cct, rgw::sal::Store* _store) + MotrGC(CephContext *_cct, rgw::sal::MotrStore* _store) : cct(_cct), store(_store) {} ~MotrGC() { @@ -160,7 +162,10 @@ class MotrGC : public DoutPrefixProvider { void start_processor(); void stop_processor(); - int dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_obj_info obj); + + int enqueue(motr_gc_obj_info obj); + int dequeue(std::string iname, motr_gc_obj_info obj); + int list(std::vector& gc_entries); int get_locked_gc_index(uint32_t& rand_ind); bool going_down(); diff --git a/src/rgw/rgw_sal_motr.cc b/src/rgw/rgw_sal_motr.cc index 40b55f74dc157..895dc6b41e531 100644 --- a/src/rgw/rgw_sal_motr.cc +++ b/src/rgw/rgw_sal_motr.cc @@ -1514,8 +1514,8 @@ void MotrStore::finalize(void) { m0_client_fini(this->instance, true); } -MotrStore& MotrStore::set_run_gc_thread(bool _use_gc_thread) { - use_gc_thread = _use_gc_thread; +MotrStore& MotrStore::set_run_gc_thread(bool _use_gc_threads) { + use_gc_threads = _use_gc_threads; return *this; } @@ -1533,7 +1533,7 @@ int MotrStore::initialize(CephContext *cct, const DoutPrefixProvider *dpp) { return rc; } - if (use_gc_thread) { + if (use_gc_threads) { // Create MotrGC object and start GCWorker threads int rc = create_gc(); if (rc != 0) @@ -2200,6 +2200,7 @@ int MotrObject::remove_mobj_and_index_entry( int rc; bufferlist bl; uint64_t size_rounded = 0; + bool pushed_to_gc = false; // handling empty size object case if (ent.meta.size != 0) { @@ -2219,7 +2220,22 @@ int MotrObject::remove_mobj_and_index_entry( } } size_rounded = roundup(ent.meta.size, get_unit_sz()); - rc = this->delete_mobj(dpp); + if (store->gc_enabled()) { + std::string tag = PRIx64 + std::to_string(this->meta.oid.u_hi) + ":" + + PRIx64 + std::to_string(this->meta.oid.u_lo); + std::string obj_fqdn = bucket_name + "/" + delete_key; + ::Meta *mobj = reinterpret_cast<::Meta*>(&this->meta); + motr_gc_obj_info gc_obj(tag, obj_fqdn, *mobj, std::time(nullptr), + ent.meta.size, size_rounded, false, ""); + rc = store->get_gc()->enqueue(gc_obj); + if (rc == 0) { + pushed_to_gc = true; + ldpp_dout(dpp, 20) <<__func__<< ": Pushed the delete req for OID=" + << tag << " to the motr garbage collector." << dendl; + } + } + if (! pushed_to_gc) + rc = this->delete_mobj(dpp); } if (rc < 0) { ldpp_dout(dpp, 0) << "Failed to delete the object " << delete_key <<" from Motr." << dendl; diff --git a/src/rgw/rgw_sal_motr.h b/src/rgw/rgw_sal_motr.h index 3fa0c8cc6d456..a541985fa710f 100644 --- a/src/rgw/rgw_sal_motr.h +++ b/src/rgw/rgw_sal_motr.h @@ -986,7 +986,7 @@ class MotrStore : public Store { MotrMetaCache* bucket_inst_cache; std::unique_ptr motr_gc; - bool use_gc_thread; + bool use_gc_threads; bool use_cache; public: @@ -1108,6 +1108,8 @@ class MotrStore : public Store { virtual void finalize(void) override; int create_gc(); void stop_gc(); + bool gc_enabled() { return use_gc_threads; } + std::unique_ptr& get_gc() { return motr_gc; } MotrStore& set_run_gc_thread(bool _use_gc_thread); MotrStore& set_use_cache(bool _use_cache); From a39c95c8ae7dad7d8483b06b67ed5dcd30a8b0b2 Mon Sep 17 00:00:00 2001 From: Dattaprasad Govekar Date: Mon, 8 Aug 2022 02:52:40 -0600 Subject: [PATCH 6/7] motr_sync: [CORTX-33853] Support race coditions during lock acquisition. Changed lock() implementation to avoid multiple KV operations, thereby reducing the effect of race during lock() call by caller. Signed-off-by: Dattaprasad Govekar --- src/rgw/motr/sync/motr_sync.h | 2 + src/rgw/motr/sync/motr_sync_impl.cc | 110 +++++++++++++++++++--------- src/rgw/motr/sync/motr_sync_impl.h | 2 + 3 files changed, 78 insertions(+), 36 deletions(-) diff --git a/src/rgw/motr/sync/motr_sync.h b/src/rgw/motr/sync/motr_sync.h index ed2fa5824ffde..c8aafa473cb94 100644 --- a/src/rgw/motr/sync/motr_sync.h +++ b/src/rgw/motr/sync/motr_sync.h @@ -84,6 +84,8 @@ class MotrSync { utime_t lock_duration, const std::string& locker_id) = 0; virtual int unlock(const std::string& lock_name, MotrLockType lock_type, const std::string& locker_id) = 0; + virtual int check_lock(const std::string& lock_name, + const std::string& locker_id) = 0; }; // Abstract interface for entity that implements backend for lock objects diff --git a/src/rgw/motr/sync/motr_sync_impl.cc b/src/rgw/motr/sync/motr_sync_impl.cc index b0fd036043fd3..0442680ac89f7 100644 --- a/src/rgw/motr/sync/motr_sync_impl.cc +++ b/src/rgw/motr/sync/motr_sync_impl.cc @@ -15,6 +15,22 @@ #include "motr/sync/motr_sync_impl.h" +std::string random_string(size_t length) +{ + auto randchar = []() -> char + { + const char charset[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + const size_t max_index = (sizeof(charset) - 1); + return charset[ rand() % max_index ]; + }; + std::string str(length,0); + std::generate_n( str.begin(), length, randchar ); + return str; +} + void MotrLock::initialize(std::shared_ptr lock_provider) { _lock_provider = lock_provider; } @@ -29,46 +45,45 @@ int MotrLock::lock(const std::string& lock_name, } int rc = 0; motr_lock_info_t lock_obj; - rc = _lock_provider->read_lock(lock_name, &lock_obj); - if (rc != 0 && rc != -ENOENT) - return rc; + // First, try to write lock object + struct motr_locker_info_t locker; + std::string s_locker_id = locker_id; + if (s_locker_id.empty()) { + s_locker_id = random_string(32); + } + locker.cookie = s_locker_id; + locker.expiration = ceph_clock_now() + lock_duration; + locker.description = ""; + // Insert lock entry + lock_obj.lockers.insert( + std::pair(locker_id, + locker)); + rc = _lock_provider->write_lock(lock_name, &lock_obj, false); if (rc == 0) { - // lock entry is present. Check if this is a stale/expired lock - utime_t now = ceph_clock_now(); - auto iter = lock_obj.lockers.begin(); - while (iter != lock_obj.lockers.end()) { - struct motr_locker_info_t &info = iter->second; - if (!info.expiration.is_zero() && info.expiration < now) { - // locker has expired; delete it - iter = lock_obj.lockers.erase(iter); - } else { - ++iter; + // lock entry created successfully + return rc; + } else if (rc == -EEXIST) { + // Failed to acquire lock object; possibly, already acquired by someone + // Lock entry is present. Check if this is a stale/expired lock + rc = _lock_provider->read_lock(lock_name, &lock_obj); + if (rc == 0) { + utime_t now = ceph_clock_now(); + auto iter = lock_obj.lockers.begin(); + while (iter != lock_obj.lockers.end()) { + struct motr_locker_info_t &info = iter->second; + if (!info.expiration.is_zero() && info.expiration < now) { + // locker has expired; delete it + iter = lock_obj.lockers.erase(iter); + } else { + ++iter; + } } - } - // remove the lock if no locker is left - if (lock_obj.lockers.empty()) - _lock_provider->remove_lock(lock_name, locker_id); - } - - if (!lock_obj.lockers.empty() && MotrLockType::EXCLUSIVE == lock_type) { - // lock is not available - return -EBUSY; - } else { - // Try to acquire lock object - struct motr_locker_info_t locker; - locker.cookie = locker_id; - locker.expiration = ceph_clock_now() + lock_duration; - locker.description = ""; - // Update lock entry with current locker and lock the resource - lock_obj.lockers.insert( - std::pair(locker_id, locker)); - rc = _lock_provider->write_lock(lock_name, &lock_obj, false); - if (rc < 0) { - // Failed to acquire lock object; possibly, already acquired - return -EBUSY; + // remove the lock if no locker is left + if (lock_obj.lockers.empty()) + _lock_provider->remove_lock(lock_name, locker_id); } } - return rc; + return -EBUSY; } int MotrLock::unlock(const std::string& lock_name, @@ -88,6 +103,29 @@ int MotrKVLockProvider::initialize(const DoutPrefixProvider* dpp, return _store->create_motr_idx_by_name(lock_index_name); } +int MotrLock::check_lock(const std::string& lock_name, + const std::string& locker_id = "") { + if (!_lock_provider || locker_id.empty()) { + return -EINVAL; + } + motr_lock_info_t cnfm_lock_obj; + int rc = _lock_provider->read_lock(lock_name, &cnfm_lock_obj); + if (rc == 0) { + auto iter = cnfm_lock_obj.lockers.begin(); + while (iter != cnfm_lock_obj.lockers.end()) { + struct motr_locker_info_t &info = iter->second; + if (info.cookie == locker_id) { + // Same lock exists; this confirms lock object + return rc; + } + } + return -EBUSY; + } else if (rc == -ENOENT) { + // Looks like lock object is deleted by another caller + // as part of the race condition + return -EBUSY; + } +} int MotrKVLockProvider::read_lock(const std::string& lock_name, motr_lock_info_t* lock_info) { if (!_store || _lock_index.empty() || !lock_info) { diff --git a/src/rgw/motr/sync/motr_sync_impl.h b/src/rgw/motr/sync/motr_sync_impl.h index 2bd774945ca55..18cad9e197226 100644 --- a/src/rgw/motr/sync/motr_sync_impl.h +++ b/src/rgw/motr/sync/motr_sync_impl.h @@ -78,6 +78,8 @@ class MotrLock : public MotrSync { utime_t lock_duration, const std::string& locker_id) override; virtual int unlock(const std::string& lock_name, MotrLockType lock_type, const std::string& locker_id) override; + virtual int check_lock(const std::string& lock_name, + const std::string& locker_id) override; }; class MotrKVLockProvider : public MotrLockProvider { From d1f051c38362c70f55c075b9293f955049aa598c Mon Sep 17 00:00:00 2001 From: Dattaprasad Govekar Date: Tue, 9 Aug 2022 01:56:13 -0600 Subject: [PATCH 7/7] motr_sync: [CORTX-33853] Support race coditions during lock acquisition. - Changed lock() implementation to avoid multiple KV operations, thereby reducing the effect of race during lock() call by caller. - Added check_lock() to reconfirm the lock by caller. Signed-off-by: Dattaprasad Govekar --- src/rgw/motr/sync/motr_sync_impl.cc | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/rgw/motr/sync/motr_sync_impl.cc b/src/rgw/motr/sync/motr_sync_impl.cc index a44569c53df66..78365f395aef2 100644 --- a/src/rgw/motr/sync/motr_sync_impl.cc +++ b/src/rgw/motr/sync/motr_sync_impl.cc @@ -35,23 +35,18 @@ void MotrLock::initialize(std::shared_ptr lock_provider) { _lock_provider = lock_provider; } -int MotrLock::lock(const std::string& lock_name, +int MotrLock::lock(const std::string& lock_name, MotrLockType lock_type, utime_t lock_duration, const std::string& locker_id = "") { - if (!_lock_provider || (MotrLockType::EXCLUSIVE != lock_type && - locker_id.empty())) { + if (!_lock_provider || locker_id.empty()) { return -EINVAL; } int rc = 0; motr_lock_info_t lock_obj; // First, try to write lock object struct motr_locker_info_t locker; - std::string s_locker_id = locker_id; - if (s_locker_id.empty()) { - s_locker_id = random_string(32); - } - locker.cookie = s_locker_id; + locker.cookie = locker_id; locker.expiration = ceph_clock_now() + lock_duration; locker.description = ""; // Insert lock entry @@ -119,12 +114,12 @@ int MotrLock::check_lock(const std::string& lock_name, return rc; } } - return -EBUSY; } else if (rc == -ENOENT) { // Looks like lock object is deleted by another caller // as part of the race condition return -EBUSY; } + return -EBUSY; } int MotrKVLockProvider::read_lock(const std::string& lock_name,