diff --git a/db/column_family.cc b/db/column_family.cc index 94aef381978..c8ea7accffb 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -660,6 +660,11 @@ bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) { column_family_set_->Lock(); current_ = column_family_set_->GetColumnFamily(column_family_id); column_family_set_->Unlock(); + // TODO(icanadi) Maybe remove column family from the hash table when it's + // dropped? + if (current_ != nullptr && current_->IsDropped()) { + current_ = nullptr; + } } handle_.SetCFD(current_); return current_ != nullptr; @@ -685,6 +690,13 @@ ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() { return &handle_; } +void ColumnFamilyMemTablesImpl::CheckMemtableFull() { + if (current_ != nullptr && current_->mem()->ShouldScheduleFlush()) { + flush_scheduler_->ScheduleFlush(current_); + current_->mem()->MarkFlushScheduled(); + } +} + uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) { uint32_t column_family_id = 0; if (column_family != nullptr) { diff --git a/db/column_family.h b/db/column_family.h index 42e65afeef8..e7b21036fb3 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -22,6 +22,7 @@ #include "db/write_controller.h" #include "db/table_cache.h" #include "util/thread_local.h" +#include "db/flush_scheduler.h" namespace rocksdb { @@ -394,8 +395,11 @@ class ColumnFamilySet { // memtables of different column families (specified by ID in the write batch) class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { public: - explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set) - : column_family_set_(column_family_set), current_(nullptr) {} + explicit ColumnFamilyMemTablesImpl(ColumnFamilySet* column_family_set, + FlushScheduler* flush_scheduler) + : column_family_set_(column_family_set), + current_(nullptr), + flush_scheduler_(flush_scheduler) {} // sets current_ to ColumnFamilyData with column_family_id // returns false if column family doesn't exist @@ -414,9 +418,12 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { // Returns column family handle for the selected column family virtual ColumnFamilyHandle* GetColumnFamilyHandle() override; + virtual void CheckMemtableFull() override; + private: ColumnFamilySet* column_family_set_; ColumnFamilyData* current_; + FlushScheduler* flush_scheduler_; ColumnFamilyHandleInternal handle_; }; diff --git a/db/db_impl.cc b/db/db_impl.cc index f6634b6c4e8..74c114bfd2d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -361,8 +361,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) versions_.reset(new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(), &write_controller_)); - column_family_memtables_.reset( - new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); + column_family_memtables_.reset(new ColumnFamilyMemTablesImpl( + versions_->GetColumnFamilySet(), &flush_scheduler_)); DumpLeveldbBuildVersion(db_options_.info_log.get()); DumpDBFileSummary(db_options_, dbname_); @@ -392,6 +392,8 @@ DBImpl::~DBImpl() { bg_cv_.Wait(); } + flush_scheduler_.Clear(); + if (default_cf_handle_ != nullptr) { // we need to delete handle outside of lock because it does its own locking mutex_.Unlock(); @@ -1336,28 +1338,30 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } if (!read_only) { - // no need to refcount since client still doesn't have access - // to the DB and can not drop column families while we iterate - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->mem()->ShouldFlush()) { - // If this asserts, it means that InsertInto failed in - // filtering updates to already-flushed column families - assert(cfd->GetLogNumber() <= log_number); - auto iter = version_edits.find(cfd->GetID()); - assert(iter != version_edits.end()); - VersionEdit* edit = &iter->second; - status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit); - if (!status.ok()) { - // Reflect errors immediately so that conditions like full - // file-systems cause the DB::Open() to fail. - return status; - } - cfd->CreateNewMemtable(); + // we can do this because this is called before client has access to the + // DB and there is only a single thread operating on DB + ColumnFamilyData* cfd; + + while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { + cfd->Unref(); + // If this asserts, it means that InsertInto failed in + // filtering updates to already-flushed column families + assert(cfd->GetLogNumber() <= log_number); + auto iter = version_edits.find(cfd->GetID()); + assert(iter != version_edits.end()); + VersionEdit* edit = &iter->second; + status = WriteLevel0TableForRecovery(cfd, cfd->mem(), edit); + if (!status.ok()) { + // Reflect errors immediately so that conditions like full + // file-systems cause the DB::Open() to fail. + return status; } + cfd->CreateNewMemtable(); } } } + flush_scheduler_.Clear(); if (versions_->LastSequence() < *max_sequence) { versions_->SetLastSequence(*max_sequence); } @@ -2201,7 +2205,7 @@ void DBImpl::BackgroundCallCompaction() { } if (madeProgress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) { // signal if - // * madeProgress -- need to wakeup MakeRoomForWrite + // * madeProgress -- need to wakeup DelayWrite // * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl // * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction // If none of this is true, there is no need to signal since nobody is @@ -2622,7 +2626,7 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, cfd->Ref(); FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer); cfd->Unref(); - bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary + bg_cv_.SignalAll(); // Wakeup DelayWrite() if necessary } mutex_.Unlock(); log_buffer->FlushBufferToLog(); @@ -3959,10 +3963,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { w.timeout_hint_us = options.timeout_hint_us; uint64_t expiration_time = 0; + bool has_timeout = false; if (w.timeout_hint_us == 0) { w.timeout_hint_us = kNoTimeOut; } else { expiration_time = env_->NowMicros() + w.timeout_hint_us; + has_timeout = true; } if (!options.disableWAL) { @@ -3997,56 +4003,48 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { assert(!single_column_family_mode_ || versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1); - uint64_t flush_column_family_if_log_file = 0; uint64_t max_total_wal_size = (db_options_.max_total_wal_size == 0) ? 4 * max_total_in_memory_state_ : db_options_.max_total_wal_size; if (UNLIKELY(!single_column_family_mode_) && alive_log_files_.begin()->getting_flushed == false && total_log_size_ > max_total_wal_size) { - flush_column_family_if_log_file = alive_log_files_.begin()->number; + uint64_t flush_column_family_if_log_file = alive_log_files_.begin()->number; alive_log_files_.begin()->getting_flushed = true; Log(db_options_.info_log, "Flushing all column families with data in WAL number %" PRIu64 ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64, flush_column_family_if_log_file, total_log_size_, max_total_wal_size); - } - - if (write_controller_.IsStopped() || write_controller_.GetDelay() > 0) { - DelayWrite(expiration_time); - } - - if (LIKELY(single_column_family_mode_)) { - // fast path - status = MakeRoomForWrite(default_cf_handle_->cfd(), - &context, expiration_time); - } else { - // refcounting cfd in iteration - bool dead_cfd = false; + // no need to refcount because drop is happening in write thread, so can't + // happen while we're in the write thread for (auto cfd : *versions_->GetColumnFamilySet()) { - cfd->Ref(); - if (flush_column_family_if_log_file != 0 && - cfd->GetLogNumber() <= flush_column_family_if_log_file) { - // log size excedded limit and we need to do flush - // SetNewMemtableAndNewLogFie may temporarily unlock and wait + if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { status = SetNewMemtableAndNewLogFile(cfd, &context); + if (!status.ok()) { + break; + } cfd->imm()->FlushRequested(); - MaybeScheduleFlushOrCompaction(); - } else { - // May temporarily unlock and wait. - status = MakeRoomForWrite(cfd, &context, expiration_time); - } - - if (cfd->Unref()) { - dead_cfd = true; - } - if (!status.ok()) { - break; } } - if (dead_cfd) { - versions_->GetColumnFamilySet()->FreeDeadColumnFamilies(); - } + MaybeScheduleFlushOrCompaction(); + } + + if (UNLIKELY(status.ok() && !bg_error_.ok())) { + status = bg_error_; + } + + if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { + status = ScheduleFlushes(&context); + } + + if (UNLIKELY(status.ok()) && + (write_controller_.IsStopped() || write_controller_.GetDelay() > 0)) { + DelayWrite(expiration_time); + } + + if (UNLIKELY(status.ok() && has_timeout && + env_->NowMicros() > expiration_time)) { + status = Status::TimedOut(); } uint64_t last_sequence = versions_->LastSequence(); @@ -4241,36 +4239,23 @@ void DBImpl::DelayWrite(uint64_t expiration_time) { } } -// REQUIRES: mutex_ is held -// REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, - WriteContext* context, - uint64_t expiration_time) { - mutex_.AssertHeld(); - assert(!writers_.empty()); - Status s; - bool has_timeout = (expiration_time > 0); - - while (true) { - if (!bg_error_.ok()) { - // Yield previous error - s = bg_error_; - break; - } else if (has_timeout && env_->NowMicros() > expiration_time) { - s = Status::TimedOut(); - break; - } else if (!cfd->mem()->ShouldFlush()) { - // There is room in current memtable - break; - } else { - s = SetNewMemtableAndNewLogFile(cfd, context); - if (!s.ok()) { - break; - } - MaybeScheduleFlushOrCompaction(); +Status DBImpl::ScheduleFlushes(WriteContext* context) { + bool schedule_bg_work = false; + ColumnFamilyData* cfd; + while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { + schedule_bg_work = true; + auto status = SetNewMemtableAndNewLogFile(cfd, context); + if (cfd->Unref()) { + delete cfd; + } + if (!status.ok()) { + return status; } } - return s; + if (schedule_bg_work) { + MaybeScheduleFlushOrCompaction(); + } + return Status::OK(); } // REQUIRES: mutex_ is held diff --git a/db/db_impl.h b/db/db_impl.h index cf7914fece8..0336b3af5a1 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -33,6 +33,7 @@ #include "util/scoped_arena_iterator.h" #include "db/internal_stats.h" #include "db/write_controller.h" +#include "db/flush_scheduler.h" namespace rocksdb { @@ -399,8 +400,7 @@ class DBImpl : public DB { void DelayWrite(uint64_t expiration_time); - Status MakeRoomForWrite(ColumnFamilyData* cfd, WriteContext* context, - uint64_t expiration_time); + Status ScheduleFlushes(WriteContext* context); Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, WriteContext* context); @@ -557,6 +557,7 @@ class DBImpl : public DB { WriteBatch tmp_batch_; WriteController write_controller_; + FlushScheduler flush_scheduler_; SnapshotList snapshots_; diff --git a/db/db_test.cc b/db/db_test.cc index f4f7c2c4054..dcfdb2aae95 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1151,6 +1151,17 @@ void VerifyTableProperties(DB* db, uint64_t expected_entries_size) { ASSERT_EQ(props.size(), unique_entries.size()); ASSERT_EQ(expected_entries_size, sum); } + +uint64_t GetNumberOfSstFilesForColumnFamily(DB* db, + std::string column_family_name) { + std::vector metadata; + db->GetLiveFilesMetaData(&metadata); + uint64_t result = 0; + for (auto& fileMetadata : metadata) { + result += (fileMetadata.column_family_name == column_family_name); + } + return result; +} } // namespace TEST(DBTest, Empty) { @@ -2777,6 +2788,41 @@ TEST(DBTest, RecoverDuringMemtableCompaction) { } while (ChangeOptions()); } +TEST(DBTest, FlushSchedule) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.level0_stop_writes_trigger = 1 << 10; + options.level0_slowdown_writes_trigger = 1 << 10; + options.min_write_buffer_number_to_merge = 1; + options.max_write_buffer_number = 2; + options.write_buffer_size = 100 * 1000; + CreateAndReopenWithCF({"pikachu"}, &options); + std::vector threads; + + std::atomic thread_num; + // each column family will have 5 thread, each thread generating 2 memtables. + // each column family should end up with 10 table files + for (int i = 0; i < 10; ++i) { + threads.emplace_back([&]() { + int a = thread_num.fetch_add(1); + Random rnd(a); + // this should fill up 2 memtables + for (int k = 0; k < 5000; ++k) { + Put(a & 1, RandomString(&rnd, 13), ""); + } + }); + } + + for (auto& t : threads) { + t.join(); + } + + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"), + static_cast(10)); + ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "pikachu"), + static_cast(10)); +} + TEST(DBTest, MinorCompactionsHappen) { do { Options options; @@ -6171,17 +6217,6 @@ std::vector ListLogFiles(Env* env, const std::string& path) { std::vector ListTableFiles(Env* env, const std::string& path) { return ListSpecificFiles(env, path, kTableFile); } - -uint64_t GetNumberOfSstFilesForColumnFamily(DB* db, - std::string column_family_name) { - std::vector metadata; - db->GetLiveFilesMetaData(&metadata); - uint64_t result = 0; - for (auto& fileMetadata : metadata) { - result += (fileMetadata.column_family_name == column_family_name); - } - return result; -} } // namespace TEST(DBTest, FlushOneColumnFamily) { @@ -6465,7 +6500,7 @@ TEST(DBTest, PurgeInfoLogs) { ASSERT_EQ(5, info_log_count); Destroy(&options); - // For mode (1), test DestoryDB() to delete all the logs under DB dir. + // For mode (1), test DestroyDB() to delete all the logs under DB dir. // For mode (2), no info log file should have been put under DB dir. std::vector db_files; env_->GetChildren(dbname_, &db_files); @@ -7894,10 +7929,6 @@ TEST(DBTest, SimpleWriteTimeoutTest) { // fill the two write buffers ASSERT_OK(Put(Key(1), Key(1) + std::string(100000, 'v'), write_opt)); ASSERT_OK(Put(Key(2), Key(2) + std::string(100000, 'v'), write_opt)); - // this will switch the previous memtable, but will not cause block because - // DelayWrite() is called before MakeRoomForWrite() - // TODO(icanadi) remove this as part of https://reviews.facebook.net/D23067 - ASSERT_OK(Put(Key(3), Key(3), write_opt)); // As the only two write buffers are full in this moment, the third // Put is expected to be timed-out. write_opt.timeout_hint_us = 50; diff --git a/db/flush_scheduler.cc b/db/flush_scheduler.cc new file mode 100644 index 00000000000..636ff5a98ee --- /dev/null +++ b/db/flush_scheduler.cc @@ -0,0 +1,62 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/flush_scheduler.h" + +#include + +#include "db/column_family.h" + +namespace rocksdb { + +void FlushScheduler::ScheduleFlush(ColumnFamilyData* cfd) { +#ifndef NDEBUG + assert(column_families_set_.find(cfd) == column_families_set_.end()); + column_families_set_.insert(cfd); +#endif // NDEBUG + cfd->Ref(); + column_families_.push_back(cfd); +} + +ColumnFamilyData* FlushScheduler::GetNextColumnFamily() { + ColumnFamilyData* cfd = nullptr; + while (column_families_.size() > 0) { + cfd = column_families_.front(); + column_families_.pop_front(); + if (cfd->IsDropped()) { + if (cfd->Unref()) { + delete cfd; + } + } else { + break; + } + } +#ifndef NDEBUG + if (cfd != nullptr) { + auto itr = column_families_set_.find(cfd); + assert(itr != column_families_set_.end()); + column_families_set_.erase(itr); + } +#endif // NDEBUG + return cfd; +} + +bool FlushScheduler::Empty() { return column_families_.empty(); } + +void FlushScheduler::Clear() { + for (auto cfd : column_families_) { +#ifndef NDEBUG + auto itr = column_families_set_.find(cfd); + assert(itr != column_families_set_.end()); + column_families_set_.erase(itr); +#endif // NDEBUG + if (cfd->Unref()) { + delete cfd; + } + } + column_families_.clear(); +} + +} // namespace rocksdb diff --git a/db/flush_scheduler.h b/db/flush_scheduler.h new file mode 100644 index 00000000000..201e4a13c75 --- /dev/null +++ b/db/flush_scheduler.h @@ -0,0 +1,39 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include +#include +#include + +namespace rocksdb { + +class ColumnFamilyData; + +// This class is thread-compatible. It's should only be accessed from single +// write thread (between BeginWrite() and EndWrite()) +class FlushScheduler { + public: + FlushScheduler() = default; + ~FlushScheduler() = default; + + void ScheduleFlush(ColumnFamilyData* cfd); + // Returns Ref()-ed column family. Client needs to Unref() + ColumnFamilyData* GetNextColumnFamily(); + + bool Empty(); + + void Clear(); + + private: + std::deque column_families_; +#ifndef NDEBUG + std::set column_families_set_; +#endif // NDEBUG +}; + +} // namespace rocksdb diff --git a/db/log_and_apply_bench.cc b/db/log_and_apply_bench.cc index 60baeb5ec44..3a5535d2d77 100644 --- a/db/log_and_apply_bench.cc +++ b/db/log_and_apply_bench.cc @@ -9,6 +9,7 @@ #include "util/testharness.h" #include "util/benchharness.h" #include "db/version_set.h" +#include "db/write_controller.h" #include "util/mutexlock.h" namespace rocksdb { diff --git a/db/memtable.cc b/db/memtable.cc index 23cc6227045..804404bb8e5 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -54,8 +54,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, kArenaBlockSize(OptimizeBlockSize(moptions.arena_block_size)), arena_(moptions.arena_block_size), table_(ioptions.memtable_factory->CreateMemTableRep( - comparator_, &arena_, ioptions.prefix_extractor, - ioptions.info_log)), + comparator_, &arena_, ioptions.prefix_extractor, ioptions.info_log)), num_entries_(0), flush_in_progress_(false), flush_completed_(false), @@ -65,7 +64,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, locks_(moptions.inplace_update_support ? moptions.inplace_update_num_locks : 0), prefix_extractor_(ioptions.prefix_extractor), - should_flush_(ShouldFlushNow()) { + should_flush_(ShouldFlushNow()), + flush_scheduled_(false) { // if should_flush_ == true without an entry inserted, something must have // gone wrong already. assert(!should_flush_); @@ -79,9 +79,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, } } -MemTable::~MemTable() { - assert(refs_ == 0); -} +MemTable::~MemTable() { assert(refs_ == 0); } size_t MemTable::ApproximateMemoryUsage() { size_t arena_usage = arena_.ApproximateMemoryUsage(); diff --git a/db/memtable.h b/db/memtable.h index 0371dc3cf98..fa6db6fe14d 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -10,7 +10,9 @@ #pragma once #include #include +#include #include +#include #include "db/dbformat.h" #include "db/skiplist.h" #include "db/version_edit.h" @@ -86,7 +88,11 @@ class MemTable { // This method heuristically determines if the memtable should continue to // host more data. - bool ShouldFlush() const { return should_flush_; } + bool ShouldScheduleFlush() const { + return flush_scheduled_ == false && should_flush_; + } + + void MarkFlushScheduled() { flush_scheduled_ = true; } // Return an iterator that yields the contents of the memtable. // @@ -194,7 +200,7 @@ class MemTable { const MemTableOptions* GetMemTableOptions() const { return &moptions_; } private: - // Dynamically check if we can add more incoming entries. + // Dynamically check if we can add more incoming entries bool ShouldFlushNow() const; friend class MemTableIterator; @@ -238,6 +244,9 @@ class MemTable { // a flag indicating if a memtable has met the criteria to flush bool should_flush_; + + // a flag indicating if flush has been scheduled + bool flush_scheduled_; }; extern const char* EncodeKey(std::string* scratch, const Slice& target); diff --git a/db/write_batch.cc b/db/write_batch.cc index cacb4a5e3f8..b8d0322d850 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -394,6 +394,7 @@ class MemTableInserter : public WriteBatch::Handler { // sequence number. Even if the update eventually fails and does not result // in memtable add/update. sequence_++; + cf_mems_->CheckMemtableFull(); return Status::OK(); } @@ -465,6 +466,7 @@ class MemTableInserter : public WriteBatch::Handler { } sequence_++; + cf_mems_->CheckMemtableFull(); return Status::OK(); } @@ -494,6 +496,7 @@ class MemTableInserter : public WriteBatch::Handler { } mem->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; + cf_mems_->CheckMemtableFull(); return Status::OK(); } }; diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 615a47f5ebd..568cd70d812 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -28,6 +28,7 @@ class ColumnFamilyMemTables { virtual MemTable* GetMemTable() const = 0; virtual const Options* GetOptions() const = 0; virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0; + virtual void CheckMemtableFull() = 0; }; class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { @@ -54,6 +55,8 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; } + void CheckMemtableFull() override {} + private: bool ok_; MemTable* mem_;