diff --git a/src/rgw/motr/gc/gc.cc b/src/rgw/motr/gc/gc.cc index 4c0fc2bb006e2..e564be359d026 100644 --- a/src/rgw/motr/gc/gc.cc +++ b/src/rgw/motr/gc/gc.cc @@ -14,16 +14,61 @@ */ #include "motr/gc/gc.h" +#include void *MotrGC::GCWorker::entry() { std::unique_lock lk(lock); ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix << worker_id << " started." << dendl; + // Get random number to lock the GC index. + uint32_t my_index = ceph::util::generate_random_number(0, \ + motr_gc->max_indices); + // This is going to be endless loop do { - ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix - << worker_id << " iteration" << dendl; + << worker_id << " Iteration Started" << dendl; + + std::string iname = ""; + // Get lock on an GC index + int rc = motr_gc->get_locked_gc_index(my_index); + + // Lock has been aquired, start the timer + std::time_t start_time = std::time(nullptr); + std::time_t end_time = start_time + \ + cct->_conf->rgw_gc_processor_max_time - 10; + std::time_t current_time = std::time(nullptr); + + if (rc == 0) { + uint32_t processed_count = 0; + // form the index name + iname = gc_index_prefix + "." + std::to_string(my_index); + ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix + << worker_id << " Working on GC Queue: " << iname << dendl; + // time based while loop + do { + // fetch the next entry from index "iname" + + // check if the object is ready for deletion + // for the motr object + + // delete that object + // rc = dequeue(); + + // remove the entry from iname + + // Exit the loop if required work is complete + processed_count++; + if (processed_count >= motr_gc->max_count) break; + // Update current time + current_time = std::time(nullptr); + } while (current_time < end_time); + // unlock the GC queue + } + my_index++; + if (my_index >= motr_gc->max_indices) my_index = 0; + // sleep for remaining duration + if (end_time > current_time) sleep(end_time - current_time); cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10)); } while (! motr_gc->going_down()); @@ -35,7 +80,7 @@ void *MotrGC::GCWorker::entry() { void MotrGC::initialize() { // fetch max gc indices from config - uint64_t rgw_gc_max_objs = cct->_conf->rgw_gc_max_objs; + uint32_t rgw_gc_max_objs = cct->_conf->rgw_gc_max_objs; if(rgw_gc_max_objs) { rgw_gc_max_objs = pow(2, ceil(log2(rgw_gc_max_objs))); max_indices = static_cast(std::min(rgw_gc_max_objs, @@ -55,10 +100,15 @@ void MotrGC::initialize() { } index_names.push_back(iname); } + // Get the max count of objects to be deleted in 1 processing cycle + max_count = cct->_conf->rgw_gc_max_trim_chunk; + if (max_count == 0) max_count = GC_DEFAULT_COUNT; } void MotrGC::finalize() { - // [To be Implemented] undo steps from initialize stage + // We do not delete GC queues or related lock entry. + // GC queue & lock entries would be needed after RGW / cluster restart, + // so do not delete those. } void MotrGC::start_processor() { @@ -112,6 +162,21 @@ int MotrGC::dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_ob return rc; } +int MotrGC::get_locked_gc_index(uint32_t& rand_ind) +{ + int rc = -1; + uint32_t new_index = 0; + // attempt to lock GC starting with passed in index + for (int ind = 0; ind < max_indices; ind++) + { + new_index = (ind + rand_ind) % max_indices; + // try locking index + // on sucess mark rc as 0 + } + rand_ind = new_index; + return rc; +} + unsigned MotrGC::get_subsys() const { return dout_subsys; } diff --git a/src/rgw/motr/gc/gc.h b/src/rgw/motr/gc/gc.h index b5304a939bf9d..9a27786216aec 100644 --- a/src/rgw/motr/gc/gc.h +++ b/src/rgw/motr/gc/gc.h @@ -22,8 +22,9 @@ #include #include -const uint64_t GC_DEFAULT_QUEUES = 64; -const uint64_t GC_MAX_QUEUES = 4096; +const uint32_t GC_DEFAULT_QUEUES = 64; +const uint32_t GC_DEFAULT_COUNT = 256; +const uint32_t GC_MAX_QUEUES = 4096; static std::string gc_index_prefix = "motr.rgw.gc"; static std::string gc_thread_prefix = "motr_gc_"; @@ -117,7 +118,8 @@ class MotrGC : public DoutPrefixProvider { private: CephContext *cct; rgw::sal::Store *store; - int max_indices = 0; + uint32_t max_indices = 0; + uint32_t max_count = 0; std::vector index_names; std::atomic down_flag = false; @@ -159,6 +161,7 @@ class MotrGC : public DoutPrefixProvider { void start_processor(); void stop_processor(); int dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_obj_info obj); + int get_locked_gc_index(uint32_t& rand_ind); bool going_down(); // Set Up logging prefix for GC