From 238130f0536a7c6b43a1a5d9f7ef2ee38456502f Mon Sep 17 00:00:00 2001 From: Sumedh Anantrao Kulkarni Date: Mon, 8 Aug 2022 16:54:46 +0530 Subject: [PATCH] rgw_sal_motr, motr_gc: [CORTX-32689] Implement MotrGC::enqueue() (#379) * rgw_sal_motr, motr_gc: [CORTX-32689] Implement MotrGC::enqueue() * Push {0_ObjTag: motr_gc_obj_info} and {1_ExpiryTime: motr_gc_obj_info} entry to the gc queues, i.e. motr dix. * Call gc->enqueue() method on simple object delete request without actually deleting the obj. Signed-off-by: Sumedh Anantrao Kulkarni * motr_gc: [CORTX-32689] Implement MotrGC::list() function MotrGC::list() goes through every gc queue and lists the adds the pending delete requests in resultant vector. Interface to list objs using admin tool or rest api's is yet to be developed. Signed-off-by: Sumedh Anantrao Kulkarni --- src/rgw/motr/gc/gc.cc | 120 ++++++++++++++++++++++++++++++++-------- src/rgw/motr/gc/gc.h | 39 +++++++------ src/rgw/rgw_sal_motr.cc | 24 ++++++-- src/rgw/rgw_sal_motr.h | 4 +- 4 files changed, 143 insertions(+), 44 deletions(-) diff --git a/src/rgw/motr/gc/gc.cc b/src/rgw/motr/gc/gc.cc index e8cfb71ca9560..4b16afa84eaed 100644 --- a/src/rgw/motr/gc/gc.cc +++ b/src/rgw/motr/gc/gc.cc @@ -22,8 +22,8 @@ void *MotrGC::GCWorker::entry() { << worker_id << " started." << dendl; // Get random number to lock the GC index. - uint32_t my_index = ceph::util::generate_random_number(0, \ - motr_gc->max_indices); + uint32_t my_index = \ + ceph::util::generate_random_number(0, motr_gc->max_indices - 1); // This is going to be endless loop do { ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix @@ -42,13 +42,12 @@ void *MotrGC::GCWorker::entry() { if (rc == 0) { uint32_t processed_count = 0; // form the index name - iname = gc_index_prefix + "." + std::to_string(my_index); + iname = motr_gc->index_names[my_index]; ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix - << worker_id << " Working on GC Queue: " << iname << dendl; + << worker_id << " Working on GC Queue: " << iname << dendl; // time based while loop do { // fetch the next entry from index "iname" - // check if the object is ready for deletion // for the motr object @@ -62,13 +61,13 @@ void *MotrGC::GCWorker::entry() { if (processed_count >= motr_gc->max_count) break; // Update current time current_time = std::time(nullptr); - } while (current_time < end_time); + } while (current_time < end_time && !motr_gc->going_down()); // unlock the GC queue } - my_index++; - if (my_index >= motr_gc->max_indices) my_index = 0; + my_index = (my_index + 1) % motr_gc->max_indices; + // sleep for remaining duration - if (end_time > current_time) sleep(end_time - current_time); + // if (end_time > current_time) sleep(end_time - current_time); cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10)); } while (! motr_gc->going_down()); @@ -81,10 +80,10 @@ void *MotrGC::GCWorker::entry() { void MotrGC::initialize() { // fetch max gc indices from config uint32_t rgw_gc_max_objs = cct->_conf->rgw_gc_max_objs; - if(rgw_gc_max_objs) { + if (rgw_gc_max_objs) { rgw_gc_max_objs = pow(2, ceil(log2(rgw_gc_max_objs))); max_indices = static_cast(std::min(rgw_gc_max_objs, - GC_MAX_QUEUES)); + GC_MAX_QUEUES)); } else { max_indices = GC_DEFAULT_QUEUES; @@ -93,7 +92,7 @@ void MotrGC::initialize() { ldpp_dout(this, 50) << __func__ << ": max_indices = " << max_indices << dendl; for (uint32_t ind_suf = 0; ind_suf < max_indices; ind_suf++) { std::string iname = gc_index_prefix + "." + std::to_string(ind_suf); - int rc = static_cast(store)->create_motr_idx_by_name(iname); + int rc = store->create_motr_idx_by_name(iname); if (rc < 0 && rc != -EEXIST){ ldout(cct, 0) << "ERROR: GC index creation failed with rc: " << rc << dendl; break; @@ -103,6 +102,10 @@ void MotrGC::initialize() { // Get the max count of objects to be deleted in 1 processing cycle max_count = cct->_conf->rgw_gc_max_trim_chunk; if (max_count == 0) max_count = GC_DEFAULT_COUNT; + + // set random starting index for enqueue of delete requests + enqueue_index = \ + ceph::util::generate_random_number(0, max_indices - 1); } void MotrGC::finalize() { @@ -130,6 +133,8 @@ void MotrGC::stop_processor() { // gracefully shutdown all the gc threads. down_flag = true; for (auto& worker : workers) { + ldout(cct, 20) << "stopping and joining " + << gc_thread_prefix << worker->get_id() << dendl; worker->stop(); worker->join(); } @@ -145,38 +150,109 @@ bool MotrGC::going_down() { return down_flag; } -int MotrGC::dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_obj_info obj) -{ +int MotrGC::enqueue(motr_gc_obj_info obj) { + int rc = 0; + // create ๐Ÿ”‘'s: + // - 1_{obj.time + min_wait_time} + // - 0_{obj.tag} + std::string key1 = obj_exp_time_prefix + + std::to_string(obj.time + cct->_conf->rgw_gc_obj_min_wait); + std::string key2 = obj_tag_prefix + obj.tag; + + bufferlist bl; + obj.encode(bl); + // push {1_ExpiryTime: motr_gc_obj_info} to the gc queue.๐Ÿ“ฅ + rc = store->do_idx_op_by_name(index_names[enqueue_index], + M0_IC_PUT, key1, bl); + if (rc < 0) + return rc; + + // push {0_ObjTag: motr_gc_obj_info} to the gc queue.๐Ÿ“ฅ + rc = store->do_idx_op_by_name(index_names[enqueue_index], + M0_IC_PUT, key2, bl); + if (rc < 0) { + // cleanup ๐Ÿงน: pop key1 ๐Ÿ“ค + store->do_idx_op_by_name(index_names[enqueue_index], + M0_IC_DEL, key1, bl); + // we are avoiding delete retry on failure since, + // while processing the gc entry we will ignore the + // 1_ExpiryTime entry if corresponding 0_ObjTag entry is absent. + return rc; + } + + // rolling increment the enqueue_index + enqueue_index = (enqueue_index + 1) % max_indices; + return rc; +} + +int MotrGC::dequeue(std::string iname, motr_gc_obj_info obj) { int rc; bufferlist bl; - rc = static_cast(store)->do_idx_op_by_name(iname, - M0_IC_DEL, obj.tag, bl); + rc = store->do_idx_op_by_name(iname, M0_IC_DEL, obj.tag, bl); if (rc < 0){ ldout(cct, 0) << "ERROR: failed to delete tag entry "<(store)->do_idx_op_by_name(iname, - M0_IC_DEL, std::to_string(obj.time), bl); + rc = store->do_idx_op_by_name( + iname, M0_IC_DEL, std::to_string(obj.time), bl); if (rc < 0 && rc != -EEXIST){ ldout(cct, 0) << "ERROR: failed to delete time entry "<& gc_entries) { + int rc = 0; + int max_entries = 1000; + for (uint32_t i = 0; i < max_indices; i++) { + std::vector keys(max_entries + 1); + std::vector vals(max_entries + 1); + std::string marker = ""; + bool truncated = false; + ldout(cct, 70) << "listing entries for " << index_names[i] << dendl; + do { + if (!marker.empty()) + keys[0] = marker; + rc = store->next_query_by_name(index_names[i], keys, vals, + obj_tag_prefix); + if (rc < 0) { + ldpp_dout(this, 0) <<__func__<<": ERROR: NEXT query failed. rc=" + << rc << dendl; + return rc; + } + if (rc == max_entries + 1) { + truncated = true; + marker = keys.back(); + } + for (int j = 0; j < max_entries && !keys[j].empty(); j++) { + bufferlist::const_iterator blitr = vals[j].cbegin(); + motr_gc_obj_info gc_obj; + gc_obj.decode(blitr); + gc_entries.push_back(gc_obj); + ldout(cct, 70) << gc_obj.tag << ", " + << gc_obj.name << ", " + << gc_obj.size << ", " << dendl; + } + } while (truncated); + } + return 0; +} + unsigned MotrGC::get_subsys() const { return dout_subsys; } diff --git a/src/rgw/motr/gc/gc.h b/src/rgw/motr/gc/gc.h index 9a27786216aec..412a5b74cf049 100644 --- a/src/rgw/motr/gc/gc.h +++ b/src/rgw/motr/gc/gc.h @@ -25,17 +25,21 @@ const uint32_t GC_DEFAULT_QUEUES = 64; const uint32_t GC_DEFAULT_COUNT = 256; const uint32_t GC_MAX_QUEUES = 4096; -static std::string gc_index_prefix = "motr.rgw.gc"; -static std::string gc_thread_prefix = "motr_gc_"; +static const std::string gc_index_prefix = "motr.rgw.gc"; +static const std::string gc_thread_prefix = "motr_gc_"; +static const std::string obj_tag_prefix = "0_"; +static const std::string obj_exp_time_prefix = "1_"; -struct Meta -{ +namespace rgw::sal { + class MotrStore; +} + +struct Meta { struct m0_uint128 oid = {}; struct m0_fid pver = {}; uint64_t layout_id = 0; - void encode(bufferlist &bl) const - { + void encode(bufferlist &bl) const { ENCODE_START(5, 5, bl); encode(oid.u_hi, bl); encode(oid.u_lo, bl); @@ -45,8 +49,7 @@ struct Meta ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) - { + void decode(bufferlist::const_iterator &bl) { DECODE_START(5, bl); decode(oid.u_hi, bl); decode(oid.u_lo, bl); @@ -57,8 +60,7 @@ struct Meta } }; -struct motr_gc_obj_info -{ +struct motr_gc_obj_info { std::string tag; // gc obj unique identifier std::string name; // fully qualified object name Meta mobj; // motr obj @@ -76,8 +78,7 @@ struct motr_gc_obj_info time(std::move(_time)), size(std::move(_size)), size_actual(std::move(_size_actual)), is_multipart(std::move(_is_multipart)), multipart_iname(std::move(_multipart_iname)) {} - void encode(bufferlist &bl) const - { + void encode(bufferlist &bl) const { ENCODE_START(12, 2, bl); encode(tag, bl); encode(name, bl); @@ -94,8 +95,7 @@ struct motr_gc_obj_info ENCODE_FINISH(bl); } - void decode(bufferlist::const_iterator &bl) - { + void decode(bufferlist::const_iterator &bl) { DECODE_START_LEGACY_COMPAT_LEN_32(12, 2, 2, bl); decode(tag, bl); decode(name, bl); @@ -117,9 +117,10 @@ WRITE_CLASS_ENCODER(motr_gc_obj_info); class MotrGC : public DoutPrefixProvider { private: CephContext *cct; - rgw::sal::Store *store; + rgw::sal::MotrStore *store; uint32_t max_indices = 0; uint32_t max_count = 0; + std::atomic enqueue_index; std::vector index_names; std::atomic down_flag = false; @@ -143,11 +144,12 @@ class MotrGC : public DoutPrefixProvider { void *entry() override; void stop(); + int get_id() { return worker_id; } }; std::vector> workers; - MotrGC(CephContext *_cct, rgw::sal::Store* _store) + MotrGC(CephContext *_cct, rgw::sal::MotrStore* _store) : cct(_cct), store(_store) {} ~MotrGC() { @@ -160,7 +162,10 @@ class MotrGC : public DoutPrefixProvider { void start_processor(); void stop_processor(); - int dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_obj_info obj); + + int enqueue(motr_gc_obj_info obj); + int dequeue(std::string iname, motr_gc_obj_info obj); + int list(std::vector& gc_entries); int get_locked_gc_index(uint32_t& rand_ind); bool going_down(); diff --git a/src/rgw/rgw_sal_motr.cc b/src/rgw/rgw_sal_motr.cc index cc4ab07e97a73..980dd86dbbb75 100644 --- a/src/rgw/rgw_sal_motr.cc +++ b/src/rgw/rgw_sal_motr.cc @@ -1514,8 +1514,8 @@ void MotrStore::finalize(void) { m0_client_fini(this->instance, true); } -MotrStore& MotrStore::set_run_gc_thread(bool _use_gc_thread) { - use_gc_thread = _use_gc_thread; +MotrStore& MotrStore::set_run_gc_thread(bool _use_gc_threads) { + use_gc_threads = _use_gc_threads; return *this; } @@ -1533,7 +1533,7 @@ int MotrStore::initialize(CephContext *cct, const DoutPrefixProvider *dpp) { return rc; } - if (use_gc_thread) { + if (use_gc_threads) { // Create MotrGC object and start GCWorker threads int rc = create_gc(); if (rc != 0) @@ -2197,6 +2197,7 @@ int MotrObject::remove_mobj_and_index_entry( int rc; bufferlist bl; uint64_t size_rounded = 0; + bool pushed_to_gc = false; // handling empty size object case if (ent.meta.size != 0) { @@ -2216,7 +2217,22 @@ int MotrObject::remove_mobj_and_index_entry( } } size_rounded = roundup(ent.meta.size, get_unit_sz()); - rc = this->delete_mobj(dpp); + if (store->gc_enabled()) { + std::string tag = PRIx64 + std::to_string(this->meta.oid.u_hi) + ":" + + PRIx64 + std::to_string(this->meta.oid.u_lo); + std::string obj_fqdn = bucket_name + "/" + delete_key; + ::Meta *mobj = reinterpret_cast<::Meta*>(&this->meta); + motr_gc_obj_info gc_obj(tag, obj_fqdn, *mobj, std::time(nullptr), + ent.meta.size, size_rounded, false, ""); + rc = store->get_gc()->enqueue(gc_obj); + if (rc == 0) { + pushed_to_gc = true; + ldpp_dout(dpp, 20) <<__func__<< ": Pushed the delete req for OID=" + << tag << " to the motr garbage collector." << dendl; + } + } + if (! pushed_to_gc) + rc = this->delete_mobj(dpp); } if (rc < 0) { ldpp_dout(dpp, 0) << "Failed to delete the object " << delete_key <<" from Motr." << dendl; diff --git a/src/rgw/rgw_sal_motr.h b/src/rgw/rgw_sal_motr.h index 3fa0c8cc6d456..a541985fa710f 100644 --- a/src/rgw/rgw_sal_motr.h +++ b/src/rgw/rgw_sal_motr.h @@ -986,7 +986,7 @@ class MotrStore : public Store { MotrMetaCache* bucket_inst_cache; std::unique_ptr motr_gc; - bool use_gc_thread; + bool use_gc_threads; bool use_cache; public: @@ -1108,6 +1108,8 @@ class MotrStore : public Store { virtual void finalize(void) override; int create_gc(); void stop_gc(); + bool gc_enabled() { return use_gc_threads; } + std::unique_ptr& get_gc() { return motr_gc; } MotrStore& set_run_gc_thread(bool _use_gc_thread); MotrStore& set_use_cache(bool _use_cache);