Skip to content

Commit

Permalink
created a new ReadOptions parameter 'iterate_upper_bound'
Browse files Browse the repository at this point in the history
  • Loading branch information
Raghav Pisolkar committed Sep 4, 2014
1 parent 51ea889 commit e0b99d4
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 45 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Support Multiple DB paths in universal style compactions
* Add feature of storing plain table index and bloom filter in SST file.
* CompactRange() will never output compacted files to level 0. This used to be the case when all the compaction input files were at level 0.
* Added iterate_upper_bound to define the extent upto which the forward iterator will return entries. This will prevent iterating over delete markers and overwritten entries for edge cases where you want to break out the iterator anyways. This may improve perfomance in case there are a large number of delete markers or overwritten entries.

### Public API changes
* DBOptions.db_paths now is a vector of a DBPath structure which indicates both of path and target size
Expand Down
7 changes: 7 additions & 0 deletions db/c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1844,6 +1844,13 @@ void rocksdb_readoptions_set_snapshot(
opt->rep.snapshot = (snap ? snap->rep : nullptr);
}

void rocksdb_readoptions_set_iterate_upper_bound(
rocksdb_readoptions_t* opt,
const char* key, size_t keylen) {
Slice prefix = Slice(key, keylen);
opt->rep.iterate_upper_bound = &prefix;
}

void rocksdb_readoptions_set_read_tier(
rocksdb_readoptions_t* opt, int v) {
opt->rep.read_tier = static_cast<rocksdb::ReadTier>(v);
Expand Down
6 changes: 4 additions & 2 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3677,7 +3677,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
// TODO(ljin): remove tailing iterator
auto iter = new ForwardIterator(this, options, cfd);
return NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter,
kMaxSequenceNumber);
kMaxSequenceNumber, options.iterate_upper_bound);
// return new TailingIterator(env_, this, options, cfd);
#endif
} else {
Expand Down Expand Up @@ -3733,7 +3733,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
// likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->options(), cfd->user_comparator(), snapshot);
env_, *cfd->options(), cfd->user_comparator(),
snapshot, options.iterate_upper_bound);

Iterator* internal_iter =
NewInternalIterator(options, cfd, sv, db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter);
Expand Down
113 changes: 72 additions & 41 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ class DBIter: public Iterator {
};

DBIter(Env* env, const Options& options, const Comparator* cmp,
Iterator* iter, SequenceNumber s, bool arena_mode)
Iterator* iter, SequenceNumber s, bool arena_mode,
const Slice* iterate_upper_bound = nullptr)
: arena_mode_(arena_mode),
env_(env),
logger_(options.info_log.get()),
Expand All @@ -70,9 +71,10 @@ class DBIter: public Iterator {
direction_(kForward),
valid_(false),
current_entry_is_merged_(false),
statistics_(options.statistics.get()) {
statistics_(options.statistics.get()),
iterate_upper_bound_(iterate_upper_bound) {
RecordTick(statistics_, NO_ITERATORS);
has_prefix_extractor_ = (options.prefix_extractor.get() != nullptr);
prefix_extractor_ = options.prefix_extractor.get();
max_skip_ = options.max_sequential_skip_in_iterations;
}
virtual ~DBIter() {
Expand Down Expand Up @@ -132,7 +134,7 @@ class DBIter: public Iterator {
}
}

bool has_prefix_extractor_;
const SliceTransform* prefix_extractor_;
bool arena_mode_;
Env* const env_;
Logger* logger_;
Expand All @@ -149,6 +151,7 @@ class DBIter: public Iterator {
bool current_entry_is_merged_;
Statistics* statistics_;
uint64_t max_skip_;
const Slice* iterate_upper_bound_;

// No copying allowed
DBIter(const DBIter&);
Expand Down Expand Up @@ -207,36 +210,44 @@ void DBIter::FindNextUserEntryInternal(bool skipping) {
uint64_t num_skipped = 0;
do {
ParsedInternalKey ikey;
if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
if (skipping &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
num_skipped++; // skip this entry
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
} else {
skipping = false;
switch (ikey.type) {
case kTypeDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
saved_key_.SetKey(ikey.user_key);
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeValue:
valid_ = true;
saved_key_.SetKey(ikey.user_key);
return;
case kTypeMerge:
// By now, we are sure the current ikey is going to yield a value
saved_key_.SetKey(ikey.user_key);
current_entry_is_merged_ = true;
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
return;
default:
assert(false);
break;

if (ParseKey(&ikey)) {
if (iterate_upper_bound_ != nullptr &&
ikey.user_key.compare(*iterate_upper_bound_) >= 0) {
break;
}

if (ikey.sequence <= sequence_) {
if (skipping &&
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
num_skipped++; // skip this entry
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
} else {
skipping = false;
switch (ikey.type) {
case kTypeDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
saved_key_.SetKey(ikey.user_key);
skipping = true;
num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeValue:
valid_ = true;
saved_key_.SetKey(ikey.user_key);
return;
case kTypeMerge:
// By now, we are sure the current ikey is going to yield a value
saved_key_.SetKey(ikey.user_key);
current_entry_is_merged_ = true;
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
return;
default:
assert(false);
break;
}
}
}
}
Expand Down Expand Up @@ -398,6 +409,7 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeDeletion:
operands.clear();
last_not_merge_type = kTypeDeletion;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeMerge:
assert(user_merge_operator_ != nullptr);
Expand All @@ -407,6 +419,7 @@ bool DBIter::FindValueForCurrentKey() {
assert(false);
}

PERF_COUNTER_ADD(internal_key_skipped_count, 1);
assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0);
iter_->Prev();
++num_skipped;
Expand Down Expand Up @@ -553,6 +566,20 @@ void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
void DBIter::Seek(const Slice& target) {
StopWatch sw(env_, statistics_, DB_SEEK);

// total ordering is not guaranteed if prefix_extractor is set
// hence prefix based seeks will not give correct results
if (iterate_upper_bound_ != nullptr && prefix_extractor_ != nullptr) {
if (!prefix_extractor_->InDomain(*iterate_upper_bound_) ||
!prefix_extractor_->InDomain(target) ||
prefix_extractor_->Transform(*iterate_upper_bound_).compare(
prefix_extractor_->Transform(target)) != 0) {
status_ = Status::InvalidArgument("read_options.iterate_*_bound "
" and seek target need to have the same prefix.");
valid_ = false;
return;
}
}

saved_key_.Clear();
// now savved_key is used to store internal key.
saved_key_.SetInternalKey(target, sequence_);
Expand All @@ -574,7 +601,7 @@ void DBIter::Seek(const Slice& target) {
void DBIter::SeekToFirst() {
// Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek wiil be used.
if (has_prefix_extractor_) {
if (prefix_extractor_ != nullptr) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
direction_ = kForward;
Expand All @@ -595,7 +622,7 @@ void DBIter::SeekToFirst() {
void DBIter::SeekToLast() {
// Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek wiil be used.
if (has_prefix_extractor_) {
if (prefix_extractor_ != nullptr) {
max_skip_ = std::numeric_limits<uint64_t>::max();
}
direction_ = kReverse;
Expand All @@ -612,9 +639,10 @@ void DBIter::SeekToLast() {
Iterator* NewDBIterator(Env* env, const Options& options,
const Comparator* user_key_comparator,
Iterator* internal_iter,
const SequenceNumber& sequence) {
const SequenceNumber& sequence,
const Slice* iterate_upper_bound) {
return new DBIter(env, options, user_key_comparator, internal_iter, sequence,
false);
false, iterate_upper_bound);
}

ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
Expand Down Expand Up @@ -643,13 +671,16 @@ void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,

ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const Options& options, const Comparator* user_key_comparator,
const SequenceNumber& sequence) {
const SequenceNumber& sequence,
const Slice* iterate_upper_bound) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
Arena* arena = iter->GetArena();
auto mem = arena->AllocateAligned(sizeof(DBIter));
DBIter* db_iter = new (mem)
DBIter(env, options, user_key_comparator, nullptr, sequence, true);
DBIter* db_iter = new (mem) DBIter(env, options, user_key_comparator,
nullptr, sequence, true, iterate_upper_bound);

iter->SetDBIter(db_iter);

return iter;
}

Expand Down
5 changes: 3 additions & 2 deletions db/db_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ extern Iterator* NewDBIterator(
const Options& options,
const Comparator *user_key_comparator,
Iterator* internal_iter,
const SequenceNumber& sequence);
const SequenceNumber& sequence,
const Slice* iterate_upper_bound = nullptr);

// A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed be allocated. This class is used as an entry point of
Expand Down Expand Up @@ -68,6 +69,6 @@ class ArenaWrappedDBIter : public Iterator {
// Generate the arena wrapped iterator class.
extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const Options& options, const Comparator* user_key_comparator,
const SequenceNumber& sequence);
const SequenceNumber& sequence, const Slice* iterate_upper_bound = nullptr);

} // namespace rocksdb
Loading

0 comments on commit e0b99d4

Please sign in to comment.