Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OnFlushCompleted fired before flush result write to MANIFEST #5908

Closed
wants to merge 15 commits into from
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Fix a bug when format_version=3, partitioned fitlers, and prefix search are used in conjunction. The bug could result into Seek::(prefix) returning NotFound for an existing prefix.
* Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strong results when reseek happens with a different iterator upper bound.
* Fix a bug causing a crash during ingest external file when background compaction cause severe error (file not found).
* Fix OnFlushCompleted fired before flush result persisted in MANIFEST when there's concurrent flush job. The bug exists since OnFlushCompleted was introduced in rocksdb 3.8.
### New Features
* Introduced DBOptions::max_write_batch_group_size_bytes to configure maximum limit on number of bytes that are written in a single batch of WAL or memtable write. It is followed when the leader write size is larger than 1/8 of this limit.
* VerifyChecksum() by default will issue readahead. Allow ReadOptions to be passed in to those functions to override the readhead size. For checksum verifying before external SST file ingestion, a new option IngestExternalFileOptions.verify_checksums_readahead_size, is added for this readahead setting.
Expand Down
94 changes: 94 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <atomic>

#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "test_util/fault_injection_test_env.h"
#include "test_util/sync_point.h"
#include "util/mutexlock.h"

namespace rocksdb {

Expand Down Expand Up @@ -323,6 +328,95 @@ TEST_F(DBFlushTest, CFDropRaceWithWaitForFlushMemTables) {
SyncPoint::GetInstance()->DisableProcessing();
}

#ifndef ROCKSDB_LITE
TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
class TestListener : public EventListener {
public:
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
// There's only one key in each flush.
ASSERT_EQ(info.smallest_seqno, info.largest_seqno);
ASSERT_NE(0, info.smallest_seqno);
if (info.smallest_seqno == seq1) {
// First flush completed
ASSERT_FALSE(completed1);
completed1 = true;
CheckFlushResultCommitted(db, seq1);
} else {
// Second flush completed
ASSERT_FALSE(completed2);
completed2 = true;
ASSERT_EQ(info.smallest_seqno, seq2);
CheckFlushResultCommitted(db, seq2);
}
}

void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db);
yiwu-arbug marked this conversation as resolved.
Show resolved Hide resolved
auto* mutex = db_impl->mutex();
yiwu-arbug marked this conversation as resolved.
Show resolved Hide resolved
mutex->Lock();
auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(db->DefaultColumnFamily())
->cfd();
ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
mutex->Unlock();
}

std::atomic<SequenceNumber> seq1;
std::atomic<SequenceNumber> seq2;
std::atomic<bool> completed1{false};
std::atomic<bool> completed2{false};
};
std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();
std::atomic<bool> first_flush_pending{false};

SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FlushMemTable:AfterScheduleFlush",
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:1"},
{"DBImpl::FlushMemTableToOutputFile:Finish",
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:2"}});
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table", [&](void* arg) {
// Wait for the second flush finished, out of mutex.
auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);
if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {
TEST_SYNC_POINT(
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:2");
}
});

Options options = CurrentOptions();
options.create_if_missing = true;
options.listeners.push_back(listener);
// Setting max_flush_jobs = max_background_jobs / 4 = 2.
options.max_background_jobs = 8;
// Allow 2 immutable memtables.
options.max_write_buffer_number = 3;
Reopen(options);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "v"));
listener->seq1 = db_->GetLatestSequenceNumber();
// t1 will wait for the second flush complete before committing flush result.
auto t1 = port::Thread([&]() {
yiwu-arbug marked this conversation as resolved.
Show resolved Hide resolved
// flush_opts.wait = true
ASSERT_OK(db_->Flush(FlushOptions()));
});
// Wait for first flush scheduled.
TEST_SYNC_POINT("DBFlushTest::FireOnFlushCompletedAfterCommittedResult:1");
yiwu-arbug marked this conversation as resolved.
Show resolved Hide resolved
// The second flush will exit early without commit its result. The work
// is delegated to the first flush.
ASSERT_OK(Put("bar", "v"));
listener->seq2 = db_->GetLatestSequenceNumber();
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts));
t1.join();
ASSERT_TRUE(listener->completed1);
ASSERT_TRUE(listener->completed2);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
#endif // !ROCKSDB_LITE

TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;
Expand Down
6 changes: 3 additions & 3 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1005,11 +1005,11 @@ class DBImpl : public DB {

void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop);
int job_id);
yiwu-arbug marked this conversation as resolved.
Show resolved Hide resolved

void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta,
void NotifyOnFlushCompleted(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop);
autovector<FlushJobInfo*>* flush_jobs_info);

void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
const Status& st,
Expand Down
55 changes: 21 additions & 34 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ Status DBImpl::FlushMemTableToOutputFile(

#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
flush_job.GetTableProperties());
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
#endif // ROCKSDB_LITE

Status s;
Expand Down Expand Up @@ -213,8 +212,8 @@ Status DBImpl::FlushMemTableToOutputFile(
if (s.ok()) {
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
job_context->job_id, flush_job.GetTableProperties());
NotifyOnFlushCompleted(cfd, mutable_cf_options,
flush_job.GetCommittedFlushJobsInfo());
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm) {
Expand All @@ -233,6 +232,7 @@ Status DBImpl::FlushMemTableToOutputFile(
}
#endif // ROCKSDB_LITE
}
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
return s;
}

Expand Down Expand Up @@ -351,7 +351,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
job_context->job_id, jobs[i].GetTableProperties());
job_context->job_id);
}
#endif /* !ROCKSDB_LITE */

Expand Down Expand Up @@ -505,8 +505,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (cfds[i]->IsDropped()) {
continue;
}
NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i],
job_context->job_id, jobs[i].GetTableProperties());
NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
jobs[i].GetCommittedFlushJobsInfo());
if (sfm) {
std::string file_path = MakeTableFileName(
cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
Expand Down Expand Up @@ -549,7 +549,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(

void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
int job_id) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.size() == 0U) {
return;
Expand Down Expand Up @@ -580,7 +580,6 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushBegin(this, info);
Expand All @@ -594,15 +593,14 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
(void)file_meta;
(void)mutable_cf_options;
(void)job_id;
(void)prop;
#endif // ROCKSDB_LITE
}

void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
void DBImpl::NotifyOnFlushCompleted(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
autovector<FlushJobInfo*>* flush_jobs_info) {
#ifndef ROCKSDB_LITE
assert(flush_jobs_info != nullptr);
if (immutable_db_options_.listeners.size() == 0U) {
return;
}
Expand All @@ -619,34 +617,23 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
// release lock while notifying events
mutex_.Unlock();
{
FlushJobInfo info;
info.cf_id = cfd->GetID();
info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushCompleted(this, info);
for (auto* info : *flush_jobs_info) {
info->triggered_writes_slowdown = triggered_writes_slowdown;
info->triggered_writes_stop = triggered_writes_stop;
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushCompleted(this, *info);
}
delete info;
}
flush_jobs_info->clear();
}
mutex_.Lock();
// no need to signal bg_cv_ as it will be signaled at the end of the
// flush process.
#else
(void)cfd;
(void)file_meta;
(void)mutable_cf_options;
(void)job_id;
(void)prop;
(void)flush_jobs_info;
#endif // ROCKSDB_LITE
}

Expand Down
26 changes: 24 additions & 2 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_);
log_buffer_, &committed_flush_jobs_info_);
}

if (s.ok() && file_meta != nullptr) {
Expand Down Expand Up @@ -392,7 +392,7 @@ Status FlushJob::WriteLevel0Table() {
if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
s = output_file_directory_->Fsync();
}
TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
db_mutex_->Lock();
}
base_->Unref();
Expand All @@ -410,6 +410,10 @@ Status FlushJob::WriteLevel0Table() {
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction);
}
#ifndef ROCKSDB_LITE
// Piggyback FlushJobInfo on the first first flushed memtable.
mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
#endif // !ROCKSDB_LITE

// Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
Expand All @@ -424,4 +428,22 @@ Status FlushJob::WriteLevel0Table() {
return s;
}

#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
db_mutex_->AssertHeld();
std::unique_ptr<FlushJobInfo> info(new FlushJobInfo);
info->cf_id = cfd_->GetID();
info->cf_name = cfd_->GetName();
info->file_path = MakeTableFileName(cfd_->ioptions()->cf_paths[0].path,
meta_.fd.GetNumber());
info->thread_id = db_options_.env->GetThreadID();
info->job_id = job_context_->job_id;
info->smallest_seqno = meta_.fd.smallest_seqno;
info->largest_seqno = meta_.fd.largest_seqno;
info->table_properties = table_properties_;
info->flush_reason = cfd_->GetFlushReason();
return info;
}
#endif // !ROCKSDB_LITE

} // namespace rocksdb
17 changes: 15 additions & 2 deletions db/flush_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
#include <deque>
#include <limits>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include <string>

#include "db/column_family.h"
#include "db/dbformat.h"
Expand All @@ -34,6 +34,7 @@
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/listener.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/transaction_log.h"
#include "table/scoped_arena_iterator.h"
Expand Down Expand Up @@ -79,14 +80,22 @@ class FlushJob {
Status Run(LogsWithPrepTracker* prep_tracker = nullptr,
FileMetaData* file_meta = nullptr);
void Cancel();
TableProperties GetTableProperties() const { return table_properties_; }
const autovector<MemTable*>& GetMemTables() const { return mems_; }

#ifndef ROCKSDB_LITE
autovector<FlushJobInfo*>* GetCommittedFlushJobsInfo() {
return &committed_flush_jobs_info_;
}
#endif // !ROCKSDB_LITE

private:
void ReportStartedFlush();
void ReportFlushInputSize(const autovector<MemTable*>& mems);
void RecordFlushIOStats();
Status WriteLevel0Table();
#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> GetFlushJobInfo() const;
#endif // !ROCKSDB_LITE

const std::string& dbname_;
ColumnFamilyData* cfd_;
Expand Down Expand Up @@ -131,6 +140,10 @@ class FlushJob {
// In this case, only after all flush jobs succeed in flush can RocksDB
// commit to the MANIFEST.
const bool write_manifest_;
// The current flush job can commit flush result of a concurrent flush job.
// We collect FlushJobInfo of all jobs committed by current job and fire
// OnFlushCompleted for them.
autovector<FlushJobInfo*> committed_flush_jobs_info_;
yiwu-arbug marked this conversation as resolved.
Show resolved Hide resolved

// Variables below are set by PickMemTable():
FileMetaData meta_;
Expand Down
Loading