Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

rgw_sal_motr, motr_gc: [CORTX-33148] add MotrGC, MotrGC::GCWorker infra code #356

Merged
merged 5 commits into from
Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/rgw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ if(WITH_RADOSGW_DBSTORE)
list(APPEND librgw_common_srcs rgw_sal_dbstore.cc)
endif()
if(WITH_RADOSGW_MOTR)
list(APPEND librgw_common_srcs rgw_sal_motr.cc)
list(APPEND librgw_common_srcs rgw_sal_motr.cc motr/gc/gc.cc)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to add more files in future. Better to add cmakelist in motr directory and add that directory here.
ref. https://github.com/Seagate/cortx-rgw/blob/main/src/rgw/store/dbstore/CMakeLists.txt

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I tried adding a subdirectory, but the compilation failed initially, I did not try to resolve it.
Will try it again in the end.

Copy link
Author

@sumedhak27 sumedhak27 Jul 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The below code works but I have to repeat the lines from src/rgw/CMakeLists.txt in the new file,
does anyone know how to avoid that.

// file_name: src/rgw/motr/CMakeLists.txt

set(cortx_rgw_srcs
    gc/gc.cc)

add_library(cortx_rgw ${cortx_rgw_srcs})
target_include_directories(cortx_rgw PUBLIC "${CMAKE_SOURCE_DIR}/src/rgw")

// *************** repeated section start ***************
target_include_directories(cortx_rgw PRIVATE "/usr/include/motr")
target_compile_options(cortx_rgw PRIVATE "-Wno-attributes")
target_compile_definitions(cortx_rgw PRIVATE "M0_EXTERN=extern" "M0_INTERNAL=")
target_link_libraries(cortx_rgw PRIVATE motr motr-helpers)
// *************** repeated section end ***************

set(link_targets spawn)
target_link_libraries(cortx_rgw PUBLIC ${link_targets})

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andriytk @siningwuseagate can you please help here?

endif()
if(WITH_JAEGER)
list(APPEND librgw_common_srcs rgw_tracer.cc)
Expand Down
96 changes: 96 additions & 0 deletions src/rgw/motr/gc/gc.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=2 sw=2 expandtab ft=cpp

/*
* Garbage Collector implementation for the CORTX Motr backend
*
* Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#include "motr/gc/gc.h"

void *MotrGC::GCWorker::entry() {
std::unique_lock<std::mutex> lk(lock);
ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix
<< worker_id << " started." << dendl;

do {

ldpp_dout(dpp, 10) << __func__ << ": " << gc_thread_prefix
<< worker_id << " iteration" << dendl;
cv.wait_for(lk, std::chrono::milliseconds(gc_interval * 10));

} while (! motr_gc->going_down());

ldpp_dout(dpp, 0) << __func__ << ": Stop signalled called for "
<< gc_thread_prefix << worker_id << dendl;
return nullptr;
}

void MotrGC::initialize() {
// fetch max gc indices from config
auto max_indices = std::min(cct->_conf->rgw_gc_max_objs,
GC_MAX_SHARDS_PRIME);
ldpp_dout(this, 50) << __func__ << ": max_indices = " << max_indices << dendl;

index_names.reserve(max_indices);
for (int i = 0; i < max_indices; i++) {
// Append index name to the gc index list
index_names.push_back(gc_index_prefix + std::to_string(i));

// [To be Implemented] create index in motr dix
}

}

void MotrGC::finalize() {
// [To be Implemented] undo steps from initialize stage
}

void MotrGC::start_processor() {
// fetch max_concurrent_io i.e. max_threads to create from config.
// start all the gc_worker threads
auto max_workers = cct->_conf->rgw_gc_max_concurrent_io;
ldpp_dout(this, 50) << __func__ << ": max_workers = "
<< max_workers << dendl;
workers.reserve(max_workers);
for (int ix = 0; ix < max_workers; ++ix) {
auto worker = std::make_unique<MotrGC::GCWorker>(this /* dpp */,
cct, this, ix);
worker->create((gc_thread_prefix + std::to_string(ix)).c_str());
workers.push_back(std::move(worker));
}
}

void MotrGC::stop_processor() {
// gracefully shutdown all the gc threads.
down_flag = true;
for (auto& worker : workers) {
worker->stop();
worker->join();
}
workers.clear();
}

void MotrGC::GCWorker::stop() {
std::lock_guard l{lock};
cv.notify_all();
}

bool MotrGC::going_down() {
return down_flag;
}

unsigned MotrGC::get_subsys() const {
return dout_subsys;
}

std::ostream& MotrGC::gen_prefix(std::ostream& out) const {
return out << "garbage_collector: ";
}
82 changes: 82 additions & 0 deletions src/rgw/motr/gc/gc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=2 sw=2 expandtab ft=cpp

/*
* Garbage Collector Classes for the CORTX Motr backend
*
* Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef __MOTR_GC_H__
sachinpunadikar marked this conversation as resolved.
Show resolved Hide resolved
#define __MOTR_GC_H__

#include "rgw_sal_motr.h"
#include "common/Thread.h"
#include <mutex>
#include <condition_variable>
#include <atomic>

const int64_t GC_MAX_SHARDS_PRIME = 65521;
static std::string gc_index_prefix = "gc.";
static std::string gc_thread_prefix = "gc_thread_";

class MotrGC : public DoutPrefixProvider {
private:
CephContext *cct;
rgw::sal::Store *store;
jjxsg marked this conversation as resolved.
Show resolved Hide resolved
int max_indices = 0;
std::vector<std::string> index_names;
std::atomic<bool> down_flag = false;

public:
class GCWorker : public Thread {
private:
const DoutPrefixProvider *dpp;
CephContext *cct;
MotrGC *motr_gc;
int worker_id;
uint32_t gc_interval = 60*60; // default: 24*60*60 sec
std::mutex lock;
std::condition_variable cv;
public:
GCWorker(const DoutPrefixProvider* _dpp, CephContext *_cct,
MotrGC *_motr_gc, int _worker_id)
: dpp(_dpp),
cct(_cct),
motr_gc(_motr_gc),
worker_id(_worker_id) {};

void *entry() override;
void stop();
};
std::vector<std::unique_ptr<MotrGC::GCWorker>> workers;

MotrGC(CephContext *_cct, rgw::sal::Store* _store)
: cct(_cct), store(_store) {}

~MotrGC() {
stop_processor();
finalize();
}

void initialize();
void finalize();

void start_processor();
void stop_processor();

bool going_down();

// Set Up logging prefix for GC
CephContext *get_cct() const override { return cct; }
unsigned get_subsys() const;
std::ostream& gen_prefix(std::ostream& out) const;
};

#endif
8 changes: 7 additions & 1 deletion src/rgw/rgw_sal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ rgw::sal::Store* StoreManager::init_storage_provider(const DoutPrefixProvider* d
ldpp_dout(dpp, 0) << "newMotrStore() failed!" << dendl;
return store;
}
((rgw::sal::MotrStore *)store)->init_metadata_cache(dpp, cct, use_cache);

if ((*(rgw::sal::MotrStore*)store).set_use_cache(use_cache)
.set_run_gc_thread(use_gc_thread)
.initialize(cct, dpp) < 0) {
delete store;
return nullptr;
}

return store;
}
Expand Down
51 changes: 48 additions & 3 deletions src/rgw/rgw_sal_motr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1454,12 +1454,57 @@ int MotrBucket::abort_multiparts(const DoutPrefixProvider *dpp, CephContext *cct
return 0;
}

void MotrStore::finalize(void)
{
void MotrStore::finalize(void) {
// stop gc worker threads
stop_gc();
// close connection with motr
m0_client_fini(this->instance, true);
}

MotrStore& MotrStore::set_run_gc_thread(bool _use_gc_thread) {
use_gc_thread = _use_gc_thread;
return *this;
}

MotrStore& MotrStore::set_use_cache(bool _use_cache) {
use_cache = _use_cache;
return *this;
}

int MotrStore::initialize(CephContext *cct, const DoutPrefixProvider *dpp) {
// Create metadata objects and set enabled=use_cache value
int rc = init_metadata_cache(dpp, cct);
if (rc != 0) {
ldpp_dout(dpp, 0) << __func__ << ": Metadata cache init failed " <<
"with rc = " << rc << dendl;
return rc;
}

if (use_gc_thread) {
// Create MotrGC object and start GCWorker threads
int rc = create_gc();
if (rc != 0)
ldpp_dout(dpp, 0) << __func__ << ": Failed to Create MotrGC " <<
"with rc = " << rc << dendl;
}
return rc;
}

int MotrStore::create_gc() {
int ret = 0;
motr_gc = std::make_unique<MotrGC>(cctx, this);
motr_gc->initialize();
motr_gc->start_processor();
return ret;
}

void MotrStore::stop_gc() {
if (motr_gc) {
motr_gc->stop_processor();
motr_gc->finalize();
}
}

uint64_t MotrStore::get_new_req_id()
{
uint64_t req_id = ceph::util::generate_random_number<uint64_t>();
Expand Down Expand Up @@ -5362,7 +5407,7 @@ std::string MotrStore::get_cluster_id(const DoutPrefixProvider* dpp, optional_y
}

int MotrStore::init_metadata_cache(const DoutPrefixProvider *dpp,
CephContext *cct, bool use_cache)
CephContext *cct)
{
this->obj_meta_cache = new MotrMetaCache(dpp, cct);
this->get_obj_meta_cache()->set_enabled(use_cache);
Expand Down
14 changes: 13 additions & 1 deletion src/rgw/rgw_sal_motr.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ extern "C" {
#include "rgw_role.h"
#include "rgw_multi.h"
#include "rgw_putobj_processor.h"
#include "motr/gc/gc.h"
typedef void (*progress_cb)(off_t, void*);

class MotrGC;

namespace rgw::sal {

class MotrStore;
Expand Down Expand Up @@ -982,6 +985,10 @@ class MotrStore : public Store {
MotrMetaCache* user_cache;
MotrMetaCache* bucket_inst_cache;

std::unique_ptr<MotrGC> motr_gc;
bool use_gc_thread;
bool use_cache;

public:
CephContext *cctx;
struct m0_client *instance;
Expand Down Expand Up @@ -1097,7 +1104,12 @@ class MotrStore : public Store {
uint64_t olh_epoch,
const std::string& unique_tag) override;

virtual int initialize(CephContext *cct, const DoutPrefixProvider *dpp);
virtual void finalize(void) override;
int create_gc();
void stop_gc();
MotrStore& set_run_gc_thread(bool _use_gc_thread);
MotrStore& set_use_cache(bool _use_cache);

virtual CephContext *ctx(void) override {
return cctx;
Expand Down Expand Up @@ -1133,7 +1145,7 @@ class MotrStore : public Store {
int delete_access_key(const DoutPrefixProvider *dpp, optional_yield y, std::string access_key);
int store_email_info(const DoutPrefixProvider *dpp, optional_yield y, MotrEmailInfo& email_info);

int init_metadata_cache(const DoutPrefixProvider *dpp, CephContext *cct, bool use_cache);
int init_metadata_cache(const DoutPrefixProvider *dpp, CephContext *cct);
jjxsg marked this conversation as resolved.
Show resolved Hide resolved
MotrMetaCache* get_obj_meta_cache() {return obj_meta_cache;}
MotrMetaCache* get_user_cache() {return user_cache;}
MotrMetaCache* get_bucket_inst_cache() {return bucket_inst_cache;}
Expand Down