Skip to content

Commit

Permalink
ForwardIterator: reset incomplete iterators on Seek()
Browse files Browse the repository at this point in the history
Summary:
When reading from kBlockCacheTier, ForwardIterator's internal child iterators
may end up in the incomplete state (read was unable to complete without doing
disk I/O). `ForwardIterator::status()` will correctly report that; however, the
iterator may be stuck in that state until all sub-iterators are rebuilt:

  * `NeedToSeekImmutable()` may return false even if some sub-iterators are
    incomplete
  * one of the child iterators may be an empty iterator without any state other
    that the kIncomplete status (created using `NewErrorIterator()`); seeking on
    any such iterator has no effect -- we need to construct it again

Akin to rebuilding iterators after a superversion bump, this diff makes forward
iterator reset all incomplete child iterators when `Seek()` or `Next()` are
called.

Test Plan: TEST_TMPDIR=/dev/shm/rocksdbtest ROCKSDB_TESTS=TailingIterator ./db_test

Reviewers: igor, sdong, ljin

Reviewed By: ljin

Subscribers: lovro, march, leveldb

Differential Revision: https://reviews.facebook.net/D22575
  • Loading branch information
tnovak committed Aug 29, 2014
1 parent 722d80c commit 0f9c43e
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 15 deletions.
83 changes: 70 additions & 13 deletions db/forward_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
#ifndef ROCKSDB_LITE
#include "db/forward_iterator.h"

#include <limits>
#include <string>
#include <utility>
#include <limits>

#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/column_family.h"
Expand Down Expand Up @@ -37,12 +38,16 @@ class LevelIterator : public Iterator {
assert(file_index < files_.size());
if (file_index != file_index_) {
file_index_ = file_index;
file_iter_.reset(cfd_->table_cache()->NewIterator(
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
files_[file_index_]->fd, nullptr /* table_reader_ptr */, false));
Reset();
}
valid_ = false;
}
void Reset() {
assert(file_index_ < files_.size());
file_iter_.reset(cfd_->table_cache()->NewIterator(
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
files_[file_index_]->fd, nullptr /* table_reader_ptr */, false));
}
void SeekToLast() override {
status_ = Status::NotSupported("LevelIterator::SeekToLast()");
valid_ = false;
Expand All @@ -63,20 +68,22 @@ class LevelIterator : public Iterator {
assert(file_iter_ != nullptr);
file_iter_->Seek(internal_key);
valid_ = file_iter_->Valid();
assert(valid_);
}
void Next() override {
assert(valid_);
file_iter_->Next();
while (!file_iter_->Valid()) {
for (;;) {
if (file_iter_->status().IsIncomplete() || file_iter_->Valid()) {
valid_ = !file_iter_->status().IsIncomplete();
return;
}
if (file_index_ + 1 >= files_.size()) {
valid_ = false;
return;
}
SetFileIndex(file_index_ + 1);
file_iter_->SeekToFirst();
}
valid_ = file_iter_->Valid();
}
Slice key() const override {
assert(valid_);
Expand Down Expand Up @@ -160,6 +167,8 @@ void ForwardIterator::SeekToFirst() {
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
RebuildIterators();
} else if (status_.IsIncomplete()) {
ResetIncompleteIterators();
}
SeekInternal(Slice(), true);
}
Expand All @@ -168,6 +177,8 @@ void ForwardIterator::Seek(const Slice& internal_key) {
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
RebuildIterators();
} else if (status_.IsIncomplete()) {
ResetIncompleteIterators();
}
SeekInternal(internal_key, false);
}
Expand Down Expand Up @@ -211,7 +222,15 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
}
l0_iters_[i]->Seek(internal_key);
}
if (l0_iters_[i]->Valid()) {

if (l0_iters_[i]->status().IsIncomplete()) {
// if any of the immutable iterators is incomplete (no-io option was
// used), we are unable to reliably find the smallest key
assert(read_options_.read_tier == kBlockCacheTier);
status_ = l0_iters_[i]->status();
valid_ = false;
return;
} else if (l0_iters_[i]->Valid()) {
immutable_min_heap_.push(l0_iters_[i]);
}
}
Expand Down Expand Up @@ -280,7 +299,14 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
level_iters_[level - 1]->SetFileIndex(f_idx);
seek_to_first ? level_iters_[level - 1]->SeekToFirst() :
level_iters_[level - 1]->Seek(internal_key);
if (level_iters_[level - 1]->Valid()) {

if (level_iters_[level - 1]->status().IsIncomplete()) {
// see above
assert(read_options_.read_tier == kBlockCacheTier);
status_ = level_iters_[level - 1]->status();
valid_ = false;
return;
} else if (level_iters_[level - 1]->Valid()) {
immutable_min_heap_.push(level_iters_[level - 1]);
}
}
Expand All @@ -304,7 +330,7 @@ void ForwardIterator::Next() {
assert(valid_);

if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
sv_->version_number != cfd_->GetSuperVersionNumber()) {
std::string current_key = key().ToString();
Slice old_key(current_key.data(), current_key.size());

Expand All @@ -320,9 +346,17 @@ void ForwardIterator::Next() {
}

current_->Next();
if (current_->Valid() && current_ != mutable_iter_) {
immutable_min_heap_.push(current_);
if (current_ != mutable_iter_) {
if (current_->status().IsIncomplete()) {
assert(read_options_.read_tier == kBlockCacheTier);
status_ = current_->status();
valid_ = false;
return;
} else if (current_->Valid()) {
immutable_min_heap_.push(current_);
}
}

UpdateCurrent();
}

Expand Down Expand Up @@ -389,6 +423,29 @@ void ForwardIterator::RebuildIterators() {
is_prev_set_ = false;
}

void ForwardIterator::ResetIncompleteIterators() {
const auto& l0_files = sv_->current->files_[0];
for (uint32_t i = 0; i < l0_iters_.size(); ++i) {
assert(i < l0_files.size());
if (!l0_iters_[i]->status().IsIncomplete()) {
continue;
}
delete l0_iters_[i];
l0_iters_[i] = cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
l0_files[i]->fd);
}

for (auto* level_iter : level_iters_) {
if (level_iter && level_iter->status().IsIncomplete()) {
level_iter->Reset();
}
}

current_ = nullptr;
is_prev_set_ = false;
}

void ForwardIterator::UpdateCurrent() {
if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
current_ = nullptr;
Expand Down Expand Up @@ -417,7 +474,7 @@ void ForwardIterator::UpdateCurrent() {
}

bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
if (!is_prev_set_) {
if (!valid_ || !is_prev_set_) {
return true;
}
Slice prev_key = prev_key_.GetKey();
Expand Down
1 change: 1 addition & 0 deletions db/forward_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ForwardIterator : public Iterator {
private:
void Cleanup();
void RebuildIterators();
void ResetIncompleteIterators();
void SeekInternal(const Slice& internal_key, bool seek_to_first);
void UpdateCurrent();
bool NeedToSeekImmutable(const Slice& internal_key);
Expand Down
5 changes: 3 additions & 2 deletions table/two_level_iterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ void TwoLevelIterator::InitDataBlock() {
SetSecondLevelIterator(nullptr);
} else {
Slice handle = first_level_iter_.value();
if (second_level_iter_.iter() != nullptr
&& handle.compare(data_block_handle_) == 0) {
if (second_level_iter_.iter() != nullptr &&
!second_level_iter_.status().IsIncomplete() &&
handle.compare(data_block_handle_) == 0) {
// second_level_iter is already constructed with this iterator, so
// no need to change anything
} else {
Expand Down

0 comments on commit 0f9c43e

Please sign in to comment.