Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

812 stress test error initiateflushesthread assertion fail num running flushes 0 #817

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ db_stress has been updated as well to take into account that some memtables do n
* stress test: Fix TestIterateAgainstExpected not supporting 0 iterations. TestIterateAgainstExpected was not designed to support value of 0 in FLAGS_num_iterations.
RocksDB has a value of 10 by default and we've added the option to randomize the values from 0 to 100 in https://github.com/speedb-io/speedb/commit/434692a63318036a3995a53001337f18bf467903
* Add more checks for using db_stress with --enable_speedb_features=true
* Proactive Flushes: Have the initiator return a correct answer when it was requested to initate a flush (#812).

### Miscellaneous
* Remove leftover references to ROCKSDB_LITE (#755).
Expand Down
12 changes: 7 additions & 5 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,9 @@ class DBImpl : public DB {

// flush initiated by the write buffer manager to free some space
bool InitiateMemoryManagerFlushRequest(size_t min_size_to_flush);
bool InitiateMemoryManagerFlushRequestAtomicFlush(
size_t InitiateMemoryManagerFlushRequestAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options);
bool InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options);

virtual SequenceNumber GetLatestSequenceNumber() const override;
Expand Down Expand Up @@ -1995,15 +1995,17 @@ class DBImpl : public DB {
// Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
FlushReason flush_reason,
bool entered_write_thread = false);
bool entered_write_thread = false,
size_t* num_flushes_initiated = nullptr);

// Atomic-flush memtables from quanlified CFs among `provided_candidate_cfds`
// (if non-empty) or amomg all column families and atomically record the
// result to the MANIFEST.
Status AtomicFlushMemTables(
const FlushOptions& options, FlushReason flush_reason,
const autovector<ColumnFamilyData*>& provided_candidate_cfds = {},
bool entered_write_thread = false);
bool entered_write_thread = false,
size_t* num_flushes_initiated = nullptr);

// Wait until flushing this column family won't stall writes
Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
Expand Down Expand Up @@ -2156,7 +2158,7 @@ class DBImpl : public DB {
void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushReason flush_reason, FlushRequest* req);

void SchedulePendingFlush(const FlushRequest& req);
bool SchedulePendingFlush(const FlushRequest& req);

void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
Expand Down
81 changes: 60 additions & 21 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2333,7 +2333,12 @@ void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
FlushReason flush_reason,
bool entered_write_thread) {
bool entered_write_thread,
size_t* num_flushes_initiated) {
if (num_flushes_initiated != nullptr) {
*num_flushes_initiated = 0U;
}

// This method should not be called if atomic_flush is true.
assert(!immutable_db_options_.atomic_flush);
if (!flush_options.wait && write_controller_->IsStopped()) {
Expand Down Expand Up @@ -2447,7 +2452,10 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}
for (const auto& req : flush_reqs) {
SchedulePendingFlush(req);
bool pushed_req = SchedulePendingFlush(req);
if (pushed_req && (num_flushes_initiated != nullptr)) {
++(*num_flushes_initiated);
}
}
MaybeScheduleFlushOrCompaction();
}
Expand Down Expand Up @@ -2486,8 +2494,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
Status DBImpl::AtomicFlushMemTables(
const FlushOptions& flush_options, FlushReason flush_reason,
const autovector<ColumnFamilyData*>& provided_candidate_cfds,
bool entered_write_thread) {
bool entered_write_thread, size_t* num_flushes_initiated) {
assert(immutable_db_options_.atomic_flush);

if (num_flushes_initiated != nullptr) {
*num_flushes_initiated = 0U;
}

if (!flush_options.wait && write_controller_->IsStopped()) {
std::ostringstream oss;
oss << "Writes have been stopped, thus unable to perform manual flush. "
Expand Down Expand Up @@ -2598,7 +2611,10 @@ Status DBImpl::AtomicFlushMemTables(
}
}
GenerateFlushRequest(cfds, flush_reason, &flush_req);
SchedulePendingFlush(flush_req);
bool pushed_req = SchedulePendingFlush(flush_req);
if (pushed_req && (num_flushes_initiated != nullptr)) {
++(*num_flushes_initiated);
}
MaybeScheduleFlushOrCompaction();
}

Expand Down Expand Up @@ -3014,14 +3030,17 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue(
return cfd;
}

void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
mutex_.AssertHeld();
if (reject_new_background_jobs_) {
return;
return false;
}
if (flush_req.cfd_to_max_mem_id_to_persist.empty()) {
return;
return false;
}

bool pushed_req = false;

if (!immutable_db_options_.atomic_flush) {
// For the non-atomic flush case, we never schedule multiple column
// families in the same flush request.
Expand All @@ -3035,6 +3054,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
cfd->set_queued_for_flush(true);
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
pushed_req = true;
}
} else {
for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
Expand All @@ -3043,7 +3063,10 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
}
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
pushed_req = true;
}

return pushed_req;
}

void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
Expand Down Expand Up @@ -4351,16 +4374,20 @@ bool DBImpl::InitiateMemoryManagerFlushRequest(size_t min_size_to_flush) {
flush_options.allow_write_stall = true;
flush_options.wait = false;

size_t num_flushes_initiated = 0U;
if (immutable_db_options_.atomic_flush) {
return InitiateMemoryManagerFlushRequestAtomicFlush(min_size_to_flush,
flush_options);
num_flushes_initiated = InitiateMemoryManagerFlushRequestAtomicFlush(
min_size_to_flush, flush_options);
} else {
return InitiateMemoryManagerFlushRequestNonAtomicFlush(min_size_to_flush,
flush_options);
num_flushes_initiated = InitiateMemoryManagerFlushRequestNonAtomicFlush(
min_size_to_flush, flush_options);
}

// TODO - Have Proactive Flushes handle num_flushes_initiated > 1
ofriedma marked this conversation as resolved.
Show resolved Hide resolved
return (num_flushes_initiated > 0U);
}

bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(
size_t DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options) {
assert(immutable_db_options_.atomic_flush);

Expand All @@ -4370,7 +4397,7 @@ bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(

SelectColumnFamiliesForAtomicFlush(&cfds);
if (cfds.empty()) {
return false;
return 0U;
}

// min_size_to_flush may be 0.
Expand All @@ -4391,7 +4418,7 @@ bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(
}
}
if (total_size_to_flush < min_size_to_flush) {
return false;
return 0U;
}
}
}
Expand All @@ -4404,17 +4431,23 @@ bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(

TEST_SYNC_POINT(
"DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush::BeforeFlush");
size_t num_flushes_initiated = 0U;
auto s = AtomicFlushMemTables(
flush_options, FlushReason::kWriteBufferManagerInitiated, cfds);
flush_options, FlushReason::kWriteBufferManagerInitiated, cfds,
false /* entered_write_thread */, &num_flushes_initiated);

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"write buffer manager initiated Atomic flush finished, status: %s",
s.ToString().c_str());
return s.ok();

if (s.ok() == false) {
ofriedma marked this conversation as resolved.
Show resolved Hide resolved
assert(num_flushes_initiated == 0);
}
return num_flushes_initiated;
}

bool DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options) {
assert(immutable_db_options_.atomic_flush == false);

Expand Down Expand Up @@ -4456,7 +4489,7 @@ bool DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(
}

if (cfd_to_flush == nullptr) {
return false;
return 0U;
}

orig_cfd_to_flush = cfd_to_flush;
Expand Down Expand Up @@ -4503,15 +4536,21 @@ bool DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(

TEST_SYNC_POINT(
"DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush::BeforeFlush");
auto s = FlushMemTable(cfd_to_flush, flush_options,
FlushReason::kWriteBufferManagerInitiated);
size_t num_flushes_initiated = 0U;

auto s = FlushMemTable(
cfd_to_flush, flush_options, FlushReason::kWriteBufferManagerInitiated,
false /* entered_write_thread */, &num_flushes_initiated);

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] write buffer manager initialize flush finished, status: %s\n",
cfd_to_flush->GetName().c_str(), s.ToString().c_str());

return s.ok();
if (s.ok() == false) {
assert(num_flushes_initiated == 0);
ofriedma marked this conversation as resolved.
Show resolved Hide resolved
}
return num_flushes_initiated;
}

} // namespace ROCKSDB_NAMESPACE
7 changes: 3 additions & 4 deletions memtable/write_buffer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,6 @@ void WriteBufferManager::InitiateFlushesThread() {
// the callback (which may take a long time).
was_flush_initiated = initiator.cb(kMinFlushSizes[iter]);
}

if (!was_flush_initiated) {
// No flush was initiated => undo the counters update
assert(num_running_flushes_ > 0U);
Expand Down Expand Up @@ -809,9 +808,9 @@ void WriteBufferManager::FlushEnded(bool /* wbm_initiated */) {
// the WBM will not be aware of the number of running flushes at the time
// it is enabled. The counter will become valid once all of the flushes
// that were running when it was enabled will have completed.
if (num_running_flushes_ > 0U) {
--num_running_flushes_;
}
assert(num_running_flushes_ > 0U);
--num_running_flushes_;

size_t curr_memory_used = memory_usage();
RecalcFlushInitiationSize();
ReevaluateNeedForMoreFlushesLockHeld(curr_memory_used);
Expand Down
Loading