Skip to content

Commit

Permalink
GC thread processing logic (Seagate#371)
Browse files Browse the repository at this point in the history
The GC thread will aquire GC index and process the object entries for
deletion either upto the count governed by "rgw_gc_max_trim_chunk" or
time allowed by "rgw_gc_processor_max_time".

Signed-off-by: Sachin Punadikar <sachin.punadikar@seagate.com>
  • Loading branch information
Sachin Punadikar authored and sumedhak27 committed Aug 9, 2022
1 parent d92ac63 commit e1243de
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 7 deletions.
73 changes: 69 additions & 4 deletions src/rgw/motr/gc/gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,61 @@
*/

#include "motr/gc/gc.h"
#include <ctime>

void *MotrGC::GCWorker::entry() {
std::unique_lock<std::mutex> lk(lock);
ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix
<< worker_id << " started." << dendl;

// Get random number to lock the GC index.
uint32_t my_index = ceph::util::generate_random_number(0, \
motr_gc->max_indices);
// This is going to be endless loop
do {

ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix
<< worker_id << " iteration" << dendl;
<< worker_id << " Iteration Started" << dendl;

std::string iname = "";
// Get lock on an GC index
int rc = motr_gc->get_locked_gc_index(my_index);

// Lock has been aquired, start the timer
std::time_t start_time = std::time(nullptr);
std::time_t end_time = start_time + \
cct->_conf->rgw_gc_processor_max_time - 10;
std::time_t current_time = std::time(nullptr);

if (rc == 0) {
uint32_t processed_count = 0;
// form the index name
iname = gc_index_prefix + "." + std::to_string(my_index);
ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix
<< worker_id << " Working on GC Queue: " << iname << dendl;
// time based while loop
do {
// fetch the next entry from index "iname"

// check if the object is ready for deletion
// for the motr object

// delete that object
// rc = dequeue();

// remove the entry from iname

// Exit the loop if required work is complete
processed_count++;
if (processed_count >= motr_gc->max_count) break;
// Update current time
current_time = std::time(nullptr);
} while (current_time < end_time);
// unlock the GC queue
}
my_index++;
if (my_index >= motr_gc->max_indices) my_index = 0;
// sleep for remaining duration
if (end_time > current_time) sleep(end_time - current_time);
cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10));

} while (! motr_gc->going_down());
Expand All @@ -35,7 +80,7 @@ void *MotrGC::GCWorker::entry() {

void MotrGC::initialize() {
// fetch max gc indices from config
uint64_t rgw_gc_max_objs = cct->_conf->rgw_gc_max_objs;
uint32_t rgw_gc_max_objs = cct->_conf->rgw_gc_max_objs;
if(rgw_gc_max_objs) {
rgw_gc_max_objs = pow(2, ceil(log2(rgw_gc_max_objs)));
max_indices = static_cast<int>(std::min(rgw_gc_max_objs,
Expand All @@ -55,10 +100,15 @@ void MotrGC::initialize() {
}
index_names.push_back(iname);
}
// Get the max count of objects to be deleted in 1 processing cycle
max_count = cct->_conf->rgw_gc_max_trim_chunk;
if (max_count == 0) max_count = GC_DEFAULT_COUNT;
}

void MotrGC::finalize() {
// [To be Implemented] undo steps from initialize stage
// We do not delete GC queues or related lock entry.
// GC queue & lock entries would be needed after RGW / cluster restart,
// so do not delete those.
}

void MotrGC::start_processor() {
Expand Down Expand Up @@ -112,6 +162,21 @@ int MotrGC::dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_ob
return rc;
}

int MotrGC::get_locked_gc_index(uint32_t& rand_ind)
{
int rc = -1;
uint32_t new_index = 0;
// attempt to lock GC starting with passed in index
for (int ind = 0; ind < max_indices; ind++)
{
new_index = (ind + rand_ind) % max_indices;
// try locking index
// on sucess mark rc as 0
}
rand_ind = new_index;
return rc;
}

unsigned MotrGC::get_subsys() const {
return dout_subsys;
}
Expand Down
9 changes: 6 additions & 3 deletions src/rgw/motr/gc/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
#include <condition_variable>
#include <atomic>

const uint64_t GC_DEFAULT_QUEUES = 64;
const uint64_t GC_MAX_QUEUES = 4096;
const uint32_t GC_DEFAULT_QUEUES = 64;
const uint32_t GC_DEFAULT_COUNT = 256;
const uint32_t GC_MAX_QUEUES = 4096;
static std::string gc_index_prefix = "motr.rgw.gc";
static std::string gc_thread_prefix = "motr_gc_";

Expand Down Expand Up @@ -117,7 +118,8 @@ class MotrGC : public DoutPrefixProvider {
private:
CephContext *cct;
rgw::sal::Store *store;
int max_indices = 0;
uint32_t max_indices = 0;
uint32_t max_count = 0;
std::vector<std::string> index_names;
std::atomic<bool> down_flag = false;

Expand Down Expand Up @@ -159,6 +161,7 @@ class MotrGC : public DoutPrefixProvider {
void start_processor();
void stop_processor();
int dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_obj_info obj);
int get_locked_gc_index(uint32_t& rand_ind);
bool going_down();

// Set Up logging prefix for GC
Expand Down

0 comments on commit e1243de

Please sign in to comment.