Skip to content

Commit

Permalink
rgw_sal_motr, motr_gc: [CORTX-32689] Implement MotrGC::enqueue() (Sea…
Browse files Browse the repository at this point in the history
…gate#379)

* rgw_sal_motr, motr_gc: [CORTX-32689] Implement MotrGC::enqueue()

* Push {0_ObjTag: motr_gc_obj_info} and {1_ExpiryTime: motr_gc_obj_info}
  entry to the gc queues, i.e. motr dix.
* Call gc->enqueue() method on simple object delete request without
  actually deleting the obj.

Signed-off-by: Sumedh Anantrao Kulkarni <sumedh.a.kulkarni@seagate.com>

* motr_gc: [CORTX-32689] Implement MotrGC::list() function

MotrGC::list() goes through every gc queue and lists the adds the
pending delete requests in resultant vector.
Interface to list objs using admin tool or rest api's is yet
to be developed.

Signed-off-by: Sumedh Anantrao Kulkarni <sumedh.a.kulkarni@seagate.com>
  • Loading branch information
sumedhak27 committed Aug 9, 2022
1 parent 3149cac commit 238130f
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 44 deletions.
120 changes: 98 additions & 22 deletions src/rgw/motr/gc/gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ void *MotrGC::GCWorker::entry() {
<< worker_id << " started." << dendl;

// Get random number to lock the GC index.
uint32_t my_index = ceph::util::generate_random_number(0, \
motr_gc->max_indices);
uint32_t my_index = \
ceph::util::generate_random_number(0, motr_gc->max_indices - 1);
// This is going to be endless loop
do {
ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix
Expand All @@ -42,13 +42,12 @@ void *MotrGC::GCWorker::entry() {
if (rc == 0) {
uint32_t processed_count = 0;
// form the index name
iname = gc_index_prefix + "." + std::to_string(my_index);
iname = motr_gc->index_names[my_index];
ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix
<< worker_id << " Working on GC Queue: " << iname << dendl;
<< worker_id << " Working on GC Queue: " << iname << dendl;
// time based while loop
do {
// fetch the next entry from index "iname"

// check if the object is ready for deletion
// for the motr object

Expand All @@ -62,13 +61,13 @@ void *MotrGC::GCWorker::entry() {
if (processed_count >= motr_gc->max_count) break;
// Update current time
current_time = std::time(nullptr);
} while (current_time < end_time);
} while (current_time < end_time && !motr_gc->going_down());
// unlock the GC queue
}
my_index++;
if (my_index >= motr_gc->max_indices) my_index = 0;
my_index = (my_index + 1) % motr_gc->max_indices;

// sleep for remaining duration
if (end_time > current_time) sleep(end_time - current_time);
// if (end_time > current_time) sleep(end_time - current_time);
cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10));

} while (! motr_gc->going_down());
Expand All @@ -81,10 +80,10 @@ void *MotrGC::GCWorker::entry() {
void MotrGC::initialize() {
// fetch max gc indices from config
uint32_t rgw_gc_max_objs = cct->_conf->rgw_gc_max_objs;
if(rgw_gc_max_objs) {
if (rgw_gc_max_objs) {
rgw_gc_max_objs = pow(2, ceil(log2(rgw_gc_max_objs)));
max_indices = static_cast<int>(std::min(rgw_gc_max_objs,
GC_MAX_QUEUES));
GC_MAX_QUEUES));
}
else {
max_indices = GC_DEFAULT_QUEUES;
Expand All @@ -93,7 +92,7 @@ void MotrGC::initialize() {
ldpp_dout(this, 50) << __func__ << ": max_indices = " << max_indices << dendl;
for (uint32_t ind_suf = 0; ind_suf < max_indices; ind_suf++) {
std::string iname = gc_index_prefix + "." + std::to_string(ind_suf);
int rc = static_cast<rgw::sal::MotrStore*>(store)->create_motr_idx_by_name(iname);
int rc = store->create_motr_idx_by_name(iname);
if (rc < 0 && rc != -EEXIST){
ldout(cct, 0) << "ERROR: GC index creation failed with rc: " << rc << dendl;
break;
Expand All @@ -103,6 +102,10 @@ void MotrGC::initialize() {
// Get the max count of objects to be deleted in 1 processing cycle
max_count = cct->_conf->rgw_gc_max_trim_chunk;
if (max_count == 0) max_count = GC_DEFAULT_COUNT;

// set random starting index for enqueue of delete requests
enqueue_index = \
ceph::util::generate_random_number(0, max_indices - 1);
}

void MotrGC::finalize() {
Expand Down Expand Up @@ -130,6 +133,8 @@ void MotrGC::stop_processor() {
// gracefully shutdown all the gc threads.
down_flag = true;
for (auto& worker : workers) {
ldout(cct, 20) << "stopping and joining "
<< gc_thread_prefix << worker->get_id() << dendl;
worker->stop();
worker->join();
}
Expand All @@ -145,38 +150,109 @@ bool MotrGC::going_down() {
return down_flag;
}

int MotrGC::dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_obj_info obj)
{
int MotrGC::enqueue(motr_gc_obj_info obj) {
int rc = 0;
// create 🔑's:
// - 1_{obj.time + min_wait_time}
// - 0_{obj.tag}
std::string key1 = obj_exp_time_prefix +
std::to_string(obj.time + cct->_conf->rgw_gc_obj_min_wait);
std::string key2 = obj_tag_prefix + obj.tag;

bufferlist bl;
obj.encode(bl);
// push {1_ExpiryTime: motr_gc_obj_info} to the gc queue.📥
rc = store->do_idx_op_by_name(index_names[enqueue_index],
M0_IC_PUT, key1, bl);
if (rc < 0)
return rc;

// push {0_ObjTag: motr_gc_obj_info} to the gc queue.📥
rc = store->do_idx_op_by_name(index_names[enqueue_index],
M0_IC_PUT, key2, bl);
if (rc < 0) {
// cleanup 🧹: pop key1 📤
store->do_idx_op_by_name(index_names[enqueue_index],
M0_IC_DEL, key1, bl);
// we are avoiding delete retry on failure since,
// while processing the gc entry we will ignore the
// 1_ExpiryTime entry if corresponding 0_ObjTag entry is absent.
return rc;
}

// rolling increment the enqueue_index
enqueue_index = (enqueue_index + 1) % max_indices;
return rc;
}

int MotrGC::dequeue(std::string iname, motr_gc_obj_info obj) {
int rc;
bufferlist bl;
rc = static_cast<rgw::sal::MotrStore*>(store)->do_idx_op_by_name(iname,
M0_IC_DEL, obj.tag, bl);
rc = store->do_idx_op_by_name(iname, M0_IC_DEL, obj.tag, bl);
if (rc < 0){
ldout(cct, 0) << "ERROR: failed to delete tag entry "<<obj.tag<<" rc: " << rc << dendl;
}
rc = static_cast<rgw::sal::MotrStore*>(store)->do_idx_op_by_name(iname,
M0_IC_DEL, std::to_string(obj.time), bl);
rc = store->do_idx_op_by_name(
iname, M0_IC_DEL, std::to_string(obj.time), bl);
if (rc < 0 && rc != -EEXIST){
ldout(cct, 0) << "ERROR: failed to delete time entry "<<obj.time<<" rc: " << rc << dendl;
}
return rc;
}

int MotrGC::get_locked_gc_index(uint32_t& rand_ind)
{
int MotrGC::get_locked_gc_index(uint32_t& rand_ind) {
int rc = -1;
uint32_t new_index = 0;
// attempt to lock GC starting with passed in index
for (uint32_t ind = 0; ind < max_indices; ind++)
{
for (uint32_t ind = 1; ind < max_indices; ind++) {
new_index = (ind + rand_ind) % max_indices;
// try locking index
// on sucess mark rc as 0
rc = 0; // will be set by MotrLock.lock(gc_queue, exp_time);
if (rc == 0)
break;
}
rand_ind = new_index;
return rc;
}

int MotrGC::list(std::vector<motr_gc_obj_info>& gc_entries) {
int rc = 0;
int max_entries = 1000;
for (uint32_t i = 0; i < max_indices; i++) {
std::vector<std::string> keys(max_entries + 1);
std::vector<bufferlist> vals(max_entries + 1);
std::string marker = "";
bool truncated = false;
ldout(cct, 70) << "listing entries for " << index_names[i] << dendl;
do {
if (!marker.empty())
keys[0] = marker;
rc = store->next_query_by_name(index_names[i], keys, vals,
obj_tag_prefix);
if (rc < 0) {
ldpp_dout(this, 0) <<__func__<<": ERROR: NEXT query failed. rc="
<< rc << dendl;
return rc;
}
if (rc == max_entries + 1) {
truncated = true;
marker = keys.back();
}
for (int j = 0; j < max_entries && !keys[j].empty(); j++) {
bufferlist::const_iterator blitr = vals[j].cbegin();
motr_gc_obj_info gc_obj;
gc_obj.decode(blitr);
gc_entries.push_back(gc_obj);
ldout(cct, 70) << gc_obj.tag << ", "
<< gc_obj.name << ", "
<< gc_obj.size << ", " << dendl;
}
} while (truncated);
}
return 0;
}

unsigned MotrGC::get_subsys() const {
return dout_subsys;
}
Expand Down
39 changes: 22 additions & 17 deletions src/rgw/motr/gc/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@
const uint32_t GC_DEFAULT_QUEUES = 64;
const uint32_t GC_DEFAULT_COUNT = 256;
const uint32_t GC_MAX_QUEUES = 4096;
static std::string gc_index_prefix = "motr.rgw.gc";
static std::string gc_thread_prefix = "motr_gc_";
static const std::string gc_index_prefix = "motr.rgw.gc";
static const std::string gc_thread_prefix = "motr_gc_";
static const std::string obj_tag_prefix = "0_";
static const std::string obj_exp_time_prefix = "1_";

struct Meta
{
namespace rgw::sal {
class MotrStore;
}

struct Meta {
struct m0_uint128 oid = {};
struct m0_fid pver = {};
uint64_t layout_id = 0;

void encode(bufferlist &bl) const
{
void encode(bufferlist &bl) const {
ENCODE_START(5, 5, bl);
encode(oid.u_hi, bl);
encode(oid.u_lo, bl);
Expand All @@ -45,8 +49,7 @@ struct Meta
ENCODE_FINISH(bl);
}

void decode(bufferlist::const_iterator &bl)
{
void decode(bufferlist::const_iterator &bl) {
DECODE_START(5, bl);
decode(oid.u_hi, bl);
decode(oid.u_lo, bl);
Expand All @@ -57,8 +60,7 @@ struct Meta
}
};

struct motr_gc_obj_info
{
struct motr_gc_obj_info {
std::string tag; // gc obj unique identifier
std::string name; // fully qualified object name
Meta mobj; // motr obj
Expand All @@ -76,8 +78,7 @@ struct motr_gc_obj_info
time(std::move(_time)), size(std::move(_size)), size_actual(std::move(_size_actual)),
is_multipart(std::move(_is_multipart)), multipart_iname(std::move(_multipart_iname)) {}

void encode(bufferlist &bl) const
{
void encode(bufferlist &bl) const {
ENCODE_START(12, 2, bl);
encode(tag, bl);
encode(name, bl);
Expand All @@ -94,8 +95,7 @@ struct motr_gc_obj_info
ENCODE_FINISH(bl);
}

void decode(bufferlist::const_iterator &bl)
{
void decode(bufferlist::const_iterator &bl) {
DECODE_START_LEGACY_COMPAT_LEN_32(12, 2, 2, bl);
decode(tag, bl);
decode(name, bl);
Expand All @@ -117,9 +117,10 @@ WRITE_CLASS_ENCODER(motr_gc_obj_info);
class MotrGC : public DoutPrefixProvider {
private:
CephContext *cct;
rgw::sal::Store *store;
rgw::sal::MotrStore *store;
uint32_t max_indices = 0;
uint32_t max_count = 0;
std::atomic<uint32_t> enqueue_index;
std::vector<std::string> index_names;
std::atomic<bool> down_flag = false;

Expand All @@ -143,11 +144,12 @@ class MotrGC : public DoutPrefixProvider {

void *entry() override;
void stop();
int get_id() { return worker_id; }
};

std::vector<std::unique_ptr<MotrGC::GCWorker>> workers;

MotrGC(CephContext *_cct, rgw::sal::Store* _store)
MotrGC(CephContext *_cct, rgw::sal::MotrStore* _store)
: cct(_cct), store(_store) {}

~MotrGC() {
Expand All @@ -160,7 +162,10 @@ class MotrGC : public DoutPrefixProvider {

void start_processor();
void stop_processor();
int dequeue(const DoutPrefixProvider* dpp, std::string iname, motr_gc_obj_info obj);

int enqueue(motr_gc_obj_info obj);
int dequeue(std::string iname, motr_gc_obj_info obj);
int list(std::vector<motr_gc_obj_info>& gc_entries);
int get_locked_gc_index(uint32_t& rand_ind);
bool going_down();

Expand Down
24 changes: 20 additions & 4 deletions src/rgw/rgw_sal_motr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1514,8 +1514,8 @@ void MotrStore::finalize(void) {
m0_client_fini(this->instance, true);
}

MotrStore& MotrStore::set_run_gc_thread(bool _use_gc_thread) {
use_gc_thread = _use_gc_thread;
MotrStore& MotrStore::set_run_gc_thread(bool _use_gc_threads) {
use_gc_threads = _use_gc_threads;
return *this;
}

Expand All @@ -1533,7 +1533,7 @@ int MotrStore::initialize(CephContext *cct, const DoutPrefixProvider *dpp) {
return rc;
}

if (use_gc_thread) {
if (use_gc_threads) {
// Create MotrGC object and start GCWorker threads
int rc = create_gc();
if (rc != 0)
Expand Down Expand Up @@ -2197,6 +2197,7 @@ int MotrObject::remove_mobj_and_index_entry(
int rc;
bufferlist bl;
uint64_t size_rounded = 0;
bool pushed_to_gc = false;

// handling empty size object case
if (ent.meta.size != 0) {
Expand All @@ -2216,7 +2217,22 @@ int MotrObject::remove_mobj_and_index_entry(
}
}
size_rounded = roundup(ent.meta.size, get_unit_sz());
rc = this->delete_mobj(dpp);
if (store->gc_enabled()) {
std::string tag = PRIx64 + std::to_string(this->meta.oid.u_hi) + ":" +
PRIx64 + std::to_string(this->meta.oid.u_lo);
std::string obj_fqdn = bucket_name + "/" + delete_key;
::Meta *mobj = reinterpret_cast<::Meta*>(&this->meta);
motr_gc_obj_info gc_obj(tag, obj_fqdn, *mobj, std::time(nullptr),
ent.meta.size, size_rounded, false, "");
rc = store->get_gc()->enqueue(gc_obj);
if (rc == 0) {
pushed_to_gc = true;
ldpp_dout(dpp, 20) <<__func__<< ": Pushed the delete req for OID="
<< tag << " to the motr garbage collector." << dendl;
}
}
if (! pushed_to_gc)
rc = this->delete_mobj(dpp);
}
if (rc < 0) {
ldpp_dout(dpp, 0) << "Failed to delete the object " << delete_key <<" from Motr." << dendl;
Expand Down
4 changes: 3 additions & 1 deletion src/rgw/rgw_sal_motr.h
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ class MotrStore : public Store {
MotrMetaCache* bucket_inst_cache;

std::unique_ptr<MotrGC> motr_gc;
bool use_gc_thread;
bool use_gc_threads;
bool use_cache;

public:
Expand Down Expand Up @@ -1108,6 +1108,8 @@ class MotrStore : public Store {
virtual void finalize(void) override;
int create_gc();
void stop_gc();
bool gc_enabled() { return use_gc_threads; }
std::unique_ptr<MotrGC>& get_gc() { return motr_gc; }
MotrStore& set_run_gc_thread(bool _use_gc_thread);
MotrStore& set_use_cache(bool _use_cache);

Expand Down

0 comments on commit 238130f

Please sign in to comment.