Skip to content

Commit

Permalink
motr_sync: [CORTX-33691] Define and implement rgw_motr synchronization (
Browse files Browse the repository at this point in the history
Seagate#373)

* rgw_sal_motr: [CORTX-33691] Defines abstract synchronization primitives
and implements it for Motr GC across multiple RGW instances.

Signed-off-by: Dattaprasad Govekar <dattaprasad.govekar@seagate.com>

* motr_sync_impl: [CORTX-33691] update lock() & remove_lock() methods

Update and refactor the MotrLock::lock() and MotrKVLockProvider::remove_lock()
methods.

Signed-off-by: Sumedh Anantrao Kulkarni <sumedh.a.kulkarni@seagate.com>

Co-authored-by: Dattaprasad Govekar <dattaprasad.govekar@seagate.com>
  • Loading branch information
sumedhak27 and DPG17 committed Aug 9, 2022
1 parent e1243de commit 3149cac
Show file tree
Hide file tree
Showing 5 changed files with 373 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/rgw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ if(WITH_RADOSGW_DBSTORE)
list(APPEND librgw_common_srcs rgw_sal_dbstore.cc)
endif()
if(WITH_RADOSGW_MOTR)
list(APPEND librgw_common_srcs rgw_sal_motr.cc motr/gc/gc.cc)
list(APPEND librgw_common_srcs rgw_sal_motr.cc motr/gc/gc.cc motr/sync/motr_sync_impl.cc)
endif()
if(WITH_JAEGER)
list(APPEND librgw_common_srcs rgw_tracer.cc)
Expand Down
4 changes: 2 additions & 2 deletions src/rgw/motr/gc/gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void MotrGC::initialize() {
}
index_names.reserve(max_indices);
ldpp_dout(this, 50) << __func__ << ": max_indices = " << max_indices << dendl;
for (int ind_suf = 0; ind_suf < max_indices; ind_suf++) {
for (uint32_t ind_suf = 0; ind_suf < max_indices; ind_suf++) {
std::string iname = gc_index_prefix + "." + std::to_string(ind_suf);
int rc = static_cast<rgw::sal::MotrStore*>(store)->create_motr_idx_by_name(iname);
if (rc < 0 && rc != -EEXIST){
Expand Down Expand Up @@ -167,7 +167,7 @@ int MotrGC::get_locked_gc_index(uint32_t& rand_ind)
int rc = -1;
uint32_t new_index = 0;
// attempt to lock GC starting with passed in index
for (int ind = 0; ind < max_indices; ind++)
for (uint32_t ind = 0; ind < max_indices; ind++)
{
new_index = (ind + rand_ind) % max_indices;
// try locking index
Expand Down
99 changes: 99 additions & 0 deletions src/rgw/motr/sync/motr_sync.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=2 sw=2 expandtab ft=cpp

/*
* Abstract thread/process synchronization for the CORTX Motr backend
*
* Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef __MOTR_SYNCHROZATION__
#define __MOTR_SYNCHROZATION__

#include <unistd.h>

#include "include/types.h"
#include "include/utime.h"
#include "rgw_sal_motr.h"

// Define MOTR_EXCLUSIVE_RW_LOCK only when system needs exclusive read/write lock
#define MOTR_EXCLUSIVE_RW_LOCK 1

class MotrLockProvider;

struct motr_locker_info_t
{
std::string cookie; // unique id of caller
utime_t expiration; // expiration: non-zero means epoch of locker expiration
std::string description; // description: locker description, may be empty

motr_locker_info_t() {}
motr_locker_info_t(const utime_t& _e, std::string& _c, const std::string& _d)
: expiration(_e), cookie(_c), description(_d) {}
void encode(ceph::buffer::list &bl) const {
ENCODE_START(1, 1, bl);
encode(cookie, bl);
encode(expiration, bl);
encode(description, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator &bl) {
DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl);
decode(cookie, bl);
decode(expiration, bl);
decode(description, bl);
DECODE_FINISH(bl);
}
};
WRITE_CLASS_ENCODER(motr_locker_info_t)

struct motr_lock_info_t {
public:
std::map<std::string, motr_locker_info_t> lockers;
void encode(ceph::buffer::list &bl) const {
ENCODE_START(1, 1, bl);
encode(lockers, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator &bl) {
DECODE_START_LEGACY_COMPAT_LEN(1, 1, 1, bl);
decode(lockers, bl);
DECODE_FINISH(bl);
}
};
WRITE_CLASS_ENCODER(motr_lock_info_t)

enum class MotrLockType {
NONE = 0,
EXCLUSIVE = 1,
SHARED = 2,
EXCLUSIVE_EPHEMERAL = 3, /* lock object is removed @ unlock */
};

// Abstract interface for Motr named synchronization
class MotrSync {
public:
virtual void initialize(std::shared_ptr<MotrLockProvider> lock_provider) = 0;
virtual int lock(const std::string& lock_name, MotrLockType lock_type,
utime_t lock_duration, const std::string& locker_id) = 0;
virtual int unlock(const std::string& lock_name, MotrLockType lock_type,
const std::string& locker_id) = 0;
};

// Abstract interface for entity that implements backend for lock objects
class MotrLockProvider {
public:
virtual int read_lock(const std::string& lock_name,
motr_lock_info_t* lock_info) = 0;
virtual int write_lock(const std::string& lock_name,
motr_lock_info_t* lock_info, bool update) = 0;
virtual int remove_lock(const std::string& lock_name,
const std::string& locker_id) = 0;
};
#endif __MOTR_SYNCHROZATION__
153 changes: 153 additions & 0 deletions src/rgw/motr/sync/motr_sync_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=2 sw=2 expandtab ft=cpp

/*
* GC thread/process synchronization for the CORTX Motr backend
*
* Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#include "motr/sync/motr_sync_impl.h"

void MotrLock::initialize(std::shared_ptr<MotrLockProvider> lock_provider) {
_lock_provider = lock_provider;
}

int MotrLock::lock(const std::string& lock_name,
MotrLockType lock_type,
utime_t lock_duration,
const std::string& locker_id = "") {
if (!_lock_provider || (MotrLockType::EXCLUSIVE != lock_type &&
locker_id.empty())) {
return -EINVAL;
}
int rc = 0;
motr_lock_info_t lock_obj;
rc = _lock_provider->read_lock(lock_name, &lock_obj);
if (rc != 0 && rc != -ENOENT)
return rc;
if (rc == 0) {
// lock entry is present. Check if this is a stale/expired lock
utime_t now = ceph_clock_now();
auto iter = lock_obj.lockers.begin();
while (iter != lock_obj.lockers.end()) {
struct motr_locker_info_t &info = iter->second;
if (!info.expiration.is_zero() && info.expiration < now) {
// locker has expired; delete it
iter = lock_obj.lockers.erase(iter);
} else {
++iter;
}
}
// remove the lock if no locker is left
if (lock_obj.lockers.empty())
_lock_provider->remove_lock(lock_name, locker_id);
}

if (!lock_obj.lockers.empty() && MotrLockType::EXCLUSIVE == lock_type) {
// lock is not available
return -EBUSY;
} else {
// Try to acquire lock object
struct motr_locker_info_t locker;
locker.cookie = locker_id;
locker.expiration = ceph_clock_now() + lock_duration;
locker.description = "";
// Update lock entry with current locker and lock the resource
lock_obj.lockers.insert(
std::pair<std::string, struct motr_locker_info_t>(locker_id, locker));
rc = _lock_provider->write_lock(lock_name, &lock_obj, false);
if (rc < 0) {
// Failed to acquire lock object; possibly, already acquired
return -EBUSY;
}
}
return rc;
}

int MotrLock::unlock(const std::string& lock_name,
MotrLockType lock_type, const std::string& locker_id) {
return _lock_provider->remove_lock(lock_name, locker_id);
}

int MotrKVLockProvider::initialize(const DoutPrefixProvider* dpp,
rgw::sal::MotrStore* _s,
std::string& lock_index_name) {
_dpp = dpp;
_store = _s;
_lock_index = lock_index_name;
if (!_store || lock_index_name.empty()) {
return -EINVAL;
}
return _store->create_motr_idx_by_name(lock_index_name);
}

int MotrKVLockProvider::read_lock(const std::string& lock_name,
motr_lock_info_t* lock_info) {
if (!_store || _lock_index.empty() || !lock_info) {
return -EINVAL;
}
int rc = 0;
bufferlist bl;
rc = _store->do_idx_op_by_name(_lock_index, M0_IC_GET, lock_name, bl);
if (rc != 0) {
return rc;
}
bufferlist::const_iterator bitr = bl.begin();
lock_info->decode(bitr);
return 0;
}

int MotrKVLockProvider::write_lock(const std::string& lock_name,
motr_lock_info_t* lock_info, bool update) {
if (!_store || _lock_index.empty() || !lock_info) {
return -EINVAL;
}
bufferlist bl;
lock_info->encode(bl);
int rc =
_store->do_idx_op_by_name(_lock_index, M0_IC_PUT, lock_name, bl, update);
return rc;
}

int MotrKVLockProvider::remove_lock(const std::string& lock_name,
const std::string& locker_id = "") {
if (!_store || _lock_index.empty() || lock_name.empty()) {
return -EINVAL;
}
motr_lock_info_t lock_info;
bufferlist bl;
int rc = 0;
bool del_lock_entry = false;
rc = read_lock(lock_name, &lock_info);
if (rc != 0)
return (rc == -ENOENT) ? 0 : rc;

if (locker_id.empty()) {
// this is exclusive lock
del_lock_entry = true;
} else {
auto it = lock_info.lockers.find(locker_id);
if (it != lock_info.lockers.end()) {
lock_info.lockers.erase(it);
}
if (lock_info.lockers.empty()) {
del_lock_entry = true;
}
}
if (del_lock_entry) {
// all lockers removed; now delete lock object
rc = _store->do_idx_op_by_name(_lock_index, M0_IC_DEL, lock_name, bl);
} else {
// update the lock entry
lock_info.encode(bl);
rc = _store->do_idx_op_by_name(_lock_index, M0_IC_PUT, lock_name, bl);
}
return rc;
}
118 changes: 118 additions & 0 deletions src/rgw/motr/sync/motr_sync_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=2 sw=2 expandtab ft=cpp

/*
* GC thread/process synchronization for the CORTX Motr backend
*
* Copyright (C) 2022 Seagate Technology LLC and/or its Affiliates
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#ifndef __MOTR_LOCK__
#define __MOTR_LOCK__

/*
Usage:
Approach 1.
1. Instantiate one of the LockProvider Implementation.
example:
std::unique_ptr<MotrKvLockProvider> lock_provider;
2. Initialize the lock provider with global lock index table
example:
lock_provider.initialize(pp, store, "motr.gc.index.locks");
The caller can do this during the process/thread
start-up/initialization; if "initialize()" returns < 0, the
caller is expected to stop further initialization if locking is esential
in the system. In multi-process environment, each process can call
"MotrKVLockProvider::initialize" during it's start-up, providing the same
name for the global lock index.
3. Instantiate one of the Locker Implmentations.
example:
std::unique_ptr<MotrLock> motr_lock;
4. Initialize the Locker class with LockProvider Object.
example:
motr_lock.initialize(lock_provider);
5. Lock the required resource using Locker object
example:
motr_lock.lock(resource_name, lock_type,
lock_duration, locker_id);
Any -ve value returned by MotrLock::lock() indicates resource
can't be locked; a value 0 indicates success.
6. Unlock the resource using the same Locker Object.
exmaple:
motr_lock.unlock(resource_name, lock_type, locker_id);
Approach 2.
1. Follow step (1) & step (2) in Approach 1.
2. Call create_motr_Lock_instance(), passing it lock provider instance created
in step (1) above.
3. Call get_lock_instance() wherever lock is needed.
Note: Presently, locking framework supports EXCLUSIVE lock; it means
caller needs to set MotrLockType::EXCLUSIVE while calling MotrLock::lock(),
and do not pass locker_id.
*/

#include "rgw_sal_motr.h"
#include "motr/sync/motr_sync.h"

class MotrLock : public MotrSync {
private:
std::shared_ptr<MotrLockProvider> _lock_provider;

public:
virtual void initialize(std::shared_ptr<MotrLockProvider> lock_provider) override;
virtual int lock(const std::string& lock_name, MotrLockType lock_type,
utime_t lock_duration, const std::string& locker_id) override;
virtual int unlock(const std::string& lock_name, MotrLockType lock_type,
const std::string& locker_id) override;
};

class MotrKVLockProvider : public MotrLockProvider {
private:
const DoutPrefixProvider* _dpp;
rgw::sal::MotrStore* _store;
std::string _lock_index;
public:
int initialize(const DoutPrefixProvider* dpp, rgw::sal::MotrStore* _s,
std::string& lock_index_name);
virtual int read_lock(const std::string& lock_name,
motr_lock_info_t* lock_info) override;
virtual int write_lock(const std::string& lock_name,
motr_lock_info_t* lock_info,
bool update = false) override;
virtual int remove_lock(const std::string& lock_name,
const std::string& locker_id) override;
};

// Creates a global instance of MotrLock that can be used by caller
// This needs to be called by a single main thread of caller.
std::shared_ptr<MotrSync> g_motr_lock;
std::shared_ptr<MotrSync>& create_motr_Lock_instance(
std::shared_ptr<MotrLockProvider> lock_provider) {
static bool initialize = false;
if (!initialize) {
g_motr_lock = std::make_shared<MotrLock>();
g_motr_lock->initialize(lock_provider);
initialize = true;
return g_motr_lock;
}
}

std::shared_ptr<MotrSync>& get_lock_instance() {
return g_motr_lock;
}

#endif

0 comments on commit 3149cac

Please sign in to comment.