Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

Commit

Permalink
motr_sync: [CORTX-33853] Support race coditions during lock acquisition.
Browse files Browse the repository at this point in the history
Changed lock() implementation to avoid multiple KV operations, thereby
reducing the effect of race during lock() call by caller.

Signed-off-by: Dattaprasad Govekar <dattaprasad.govekar@seagate.com>
  • Loading branch information
DPG17 committed Aug 9, 2022
1 parent 0fefbda commit a39c95c
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 36 deletions.
2 changes: 2 additions & 0 deletions src/rgw/motr/sync/motr_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class MotrSync {
utime_t lock_duration, const std::string& locker_id) = 0;
virtual int unlock(const std::string& lock_name, MotrLockType lock_type,
const std::string& locker_id) = 0;
virtual int check_lock(const std::string& lock_name,
const std::string& locker_id) = 0;
};

// Abstract interface for entity that implements backend for lock objects
Expand Down
110 changes: 74 additions & 36 deletions src/rgw/motr/sync/motr_sync_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@

#include "motr/sync/motr_sync_impl.h"

std::string random_string(size_t length)
{
auto randchar = []() -> char
{
const char charset[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const size_t max_index = (sizeof(charset) - 1);
return charset[ rand() % max_index ];
};
std::string str(length,0);
std::generate_n( str.begin(), length, randchar );
return str;
}

void MotrLock::initialize(std::shared_ptr<MotrLockProvider> lock_provider) {
_lock_provider = lock_provider;
}
Expand All @@ -29,46 +45,45 @@ int MotrLock::lock(const std::string& lock_name,
}
int rc = 0;
motr_lock_info_t lock_obj;
rc = _lock_provider->read_lock(lock_name, &lock_obj);
if (rc != 0 && rc != -ENOENT)
return rc;
// First, try to write lock object
struct motr_locker_info_t locker;
std::string s_locker_id = locker_id;
if (s_locker_id.empty()) {
s_locker_id = random_string(32);
}
locker.cookie = s_locker_id;
locker.expiration = ceph_clock_now() + lock_duration;
locker.description = "";
// Insert lock entry
lock_obj.lockers.insert(
std::pair<std::string, struct motr_locker_info_t>(locker_id,
locker));
rc = _lock_provider->write_lock(lock_name, &lock_obj, false);
if (rc == 0) {
// lock entry is present. Check if this is a stale/expired lock
utime_t now = ceph_clock_now();
auto iter = lock_obj.lockers.begin();
while (iter != lock_obj.lockers.end()) {
struct motr_locker_info_t &info = iter->second;
if (!info.expiration.is_zero() && info.expiration < now) {
// locker has expired; delete it
iter = lock_obj.lockers.erase(iter);
} else {
++iter;
// lock entry created successfully
return rc;
} else if (rc == -EEXIST) {
// Failed to acquire lock object; possibly, already acquired by someone
// Lock entry is present. Check if this is a stale/expired lock
rc = _lock_provider->read_lock(lock_name, &lock_obj);
if (rc == 0) {
utime_t now = ceph_clock_now();
auto iter = lock_obj.lockers.begin();
while (iter != lock_obj.lockers.end()) {
struct motr_locker_info_t &info = iter->second;
if (!info.expiration.is_zero() && info.expiration < now) {
// locker has expired; delete it
iter = lock_obj.lockers.erase(iter);
} else {
++iter;
}
}
}
// remove the lock if no locker is left
if (lock_obj.lockers.empty())
_lock_provider->remove_lock(lock_name, locker_id);
}

if (!lock_obj.lockers.empty() && MotrLockType::EXCLUSIVE == lock_type) {
// lock is not available
return -EBUSY;
} else {
// Try to acquire lock object
struct motr_locker_info_t locker;
locker.cookie = locker_id;
locker.expiration = ceph_clock_now() + lock_duration;
locker.description = "";
// Update lock entry with current locker and lock the resource
lock_obj.lockers.insert(
std::pair<std::string, struct motr_locker_info_t>(locker_id, locker));
rc = _lock_provider->write_lock(lock_name, &lock_obj, false);
if (rc < 0) {
// Failed to acquire lock object; possibly, already acquired
return -EBUSY;
// remove the lock if no locker is left
if (lock_obj.lockers.empty())
_lock_provider->remove_lock(lock_name, locker_id);
}
}
return rc;
return -EBUSY;
}

int MotrLock::unlock(const std::string& lock_name,
Expand All @@ -88,6 +103,29 @@ int MotrKVLockProvider::initialize(const DoutPrefixProvider* dpp,
return _store->create_motr_idx_by_name(lock_index_name);
}

int MotrLock::check_lock(const std::string& lock_name,
const std::string& locker_id = "") {
if (!_lock_provider || locker_id.empty()) {
return -EINVAL;
}
motr_lock_info_t cnfm_lock_obj;
int rc = _lock_provider->read_lock(lock_name, &cnfm_lock_obj);
if (rc == 0) {
auto iter = cnfm_lock_obj.lockers.begin();
while (iter != cnfm_lock_obj.lockers.end()) {
struct motr_locker_info_t &info = iter->second;
if (info.cookie == locker_id) {
// Same lock exists; this confirms lock object
return rc;
}
}
return -EBUSY;
} else if (rc == -ENOENT) {
// Looks like lock object is deleted by another caller
// as part of the race condition
return -EBUSY;
}
}
int MotrKVLockProvider::read_lock(const std::string& lock_name,
motr_lock_info_t* lock_info) {
if (!_store || _lock_index.empty() || !lock_info) {
Expand Down
2 changes: 2 additions & 0 deletions src/rgw/motr/sync/motr_sync_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class MotrLock : public MotrSync {
utime_t lock_duration, const std::string& locker_id) override;
virtual int unlock(const std::string& lock_name, MotrLockType lock_type,
const std::string& locker_id) override;
virtual int check_lock(const std::string& lock_name,
const std::string& locker_id) override;
};

class MotrKVLockProvider : public MotrLockProvider {
Expand Down

0 comments on commit a39c95c

Please sign in to comment.