Skip to content

Commit

Permalink
leveldb: Replace AtomicPointer with std::atomic.
Browse files Browse the repository at this point in the history
This CL removes AtomicPointer from leveldb's port interface. Its usage is replaced with std::atomic<> from the C++11 standard library.

AtomicPointer was used to wrap flags, numbers, and pointers, so its instances are replaced with std::atomic<bool>, std::atomic<int>, std::atomic<size_t> and std::atomic<Node*>.

This CL does not revise the memory ordering. AtomicPointer's methods are replaced mechanically with their std::atomic equivalents, even when the underlying usage is incorrect. (Example: DBImpl::has_imm_ is written using release stores, even though it is always read using relaxed ordering.) Revising the memory ordering is left for future CLs.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=237865146
  • Loading branch information
pwnall authored and cmumford committed Mar 11, 2019
1 parent dd90626 commit 7d8e41e
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 419 deletions.
20 changes: 0 additions & 20 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
// seekrandom -- N random seeks
// open -- cost of opening a DB
// crc32c -- repeated crc32c of 4K of data
// acquireload -- load N*1000 times
// Meta operations:
// compact -- Compact the entire DB
// stats -- Print DB stats
Expand All @@ -57,7 +56,6 @@ static const char* FLAGS_benchmarks =
"crc32c,"
"snappycomp,"
"snappyuncomp,"
"acquireload,"
;

// Number of key/values to place in database
Expand Down Expand Up @@ -510,8 +508,6 @@ class Benchmark {
method = &Benchmark::Compact;
} else if (name == Slice("crc32c")) {
method = &Benchmark::Crc32c;
} else if (name == Slice("acquireload")) {
method = &Benchmark::AcquireLoad;
} else if (name == Slice("snappycomp")) {
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
Expand Down Expand Up @@ -639,22 +635,6 @@ class Benchmark {
thread->stats.AddMessage(label);
}

void AcquireLoad(ThreadState* thread) {
int dummy;
port::AtomicPointer ap(&dummy);
int count = 0;
void *ptr = nullptr;
thread->stats.AddMessage("(each op is 1000 loads)");
while (count < 100000) {
for (int i = 0; i < 1000; i++) {
ptr = ap.Acquire_Load();
}
count++;
thread->stats.FinishedSingleOp();
}
if (ptr == nullptr) exit(1); // Disable unused variable warning.
}

void SnappyCompress(ThreadState* thread) {
RandomGenerator gen;
Slice input = gen.Generate(Options().block_size);
Expand Down
33 changes: 17 additions & 16 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <stdio.h>

#include <algorithm>
#include <atomic>
#include <set>
#include <string>
#include <vector>
Expand Down Expand Up @@ -132,10 +133,11 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
dbname_(dbname),
table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
db_lock_(nullptr),
shutting_down_(nullptr),
shutting_down_(false),
background_work_finished_signal_(&mutex_),
mem_(nullptr),
imm_(nullptr),
has_imm_(false),
logfile_(nullptr),
logfile_number_(0),
log_(nullptr),
Expand All @@ -144,14 +146,12 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
background_compaction_scheduled_(false),
manual_compaction_(nullptr),
versions_(new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_)) {
has_imm_.Release_Store(nullptr);
}
&internal_comparator_)) {}

DBImpl::~DBImpl() {
// Wait for background work to finish
// Wait for background work to finish.
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-null value is ok
shutting_down_.store(true, std::memory_order_release);
while (background_compaction_scheduled_) {
background_work_finished_signal_.Wait();
}
Expand Down Expand Up @@ -547,7 +547,7 @@ void DBImpl::CompactMemTable() {
Status s = WriteLevel0Table(imm_, &edit, base);
base->Unref();

if (s.ok() && shutting_down_.Acquire_Load()) {
if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
s = Status::IOError("Deleting DB during memtable compaction");
}

Expand All @@ -562,7 +562,7 @@ void DBImpl::CompactMemTable() {
// Commit to the new state
imm_->Unref();
imm_ = nullptr;
has_imm_.Release_Store(nullptr);
has_imm_.store(false, std::memory_order_release);
DeleteObsoleteFiles();
} else {
RecordBackgroundError(s);
Expand Down Expand Up @@ -610,7 +610,8 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,
}

MutexLock l(&mutex_);
while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) {
while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
bg_error_.ok()) {
if (manual_compaction_ == nullptr) { // Idle
manual_compaction_ = &manual;
MaybeScheduleCompaction();
Expand Down Expand Up @@ -652,7 +653,7 @@ void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (background_compaction_scheduled_) {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
} else if (!bg_error_.ok()) {
// Already got an error; no more changes
Expand All @@ -673,7 +674,7 @@ void DBImpl::BGWork(void* db) {
void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(background_compaction_scheduled_);
if (shutting_down_.Acquire_Load()) {
if (shutting_down_.load(std::memory_order_acquire)) {
// No more background work when shutting down.
} else if (!bg_error_.ok()) {
// No more background work after a background error.
Expand Down Expand Up @@ -752,7 +753,7 @@ void DBImpl::BackgroundCompaction() {

if (status.ok()) {
// Done
} else if (shutting_down_.Acquire_Load()) {
} else if (shutting_down_.load(std::memory_order_acquire)) {
// Ignore compaction errors found during shutting down
} else {
Log(options_.info_log,
Expand Down Expand Up @@ -919,9 +920,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
std::string current_user_key;
bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
for (; input->Valid() && !shutting_down_.load(std::memory_order_acquire); ) {
// Prioritize immutable compaction work
if (has_imm_.NoBarrier_Load() != nullptr) {
if (has_imm_.load(std::memory_order_relaxed)) {
const uint64_t imm_start = env_->NowMicros();
mutex_.Lock();
if (imm_ != nullptr) {
Expand Down Expand Up @@ -1014,7 +1015,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
input->Next();
}

if (status.ok() && shutting_down_.Acquire_Load()) {
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::IOError("Deleting DB during compaction");
}
if (status.ok() && compact->builder != nullptr) {
Expand Down Expand Up @@ -1378,7 +1379,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
has_imm_.store(true, std::memory_order_release);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
Expand Down
7 changes: 5 additions & 2 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_H_

#include <atomic>
#include <deque>
#include <set>
#include <string>

#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
Expand Down Expand Up @@ -136,11 +139,11 @@ class DBImpl : public DB {

// State below is protected by mutex_
port::Mutex mutex_;
port::AtomicPointer shutting_down_;
std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); // Memtable being compacted
port::AtomicPointer has_imm_; // So bg thread can detect non-null imm_
std::atomic<bool> has_imm_; // So bg thread can detect non-null imm_
WritableFile* logfile_;
uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;
Expand Down
Loading

0 comments on commit 7d8e41e

Please sign in to comment.