Skip to content

Commit

Permalink
Rotate compaction output at user key boundary
Browse files Browse the repository at this point in the history
so that keys with same user key will not spread over multiple tables in
same level, level 1 or above.

Level compaction, except for level-0, don't pick overlapping tables. So
if we produce overlapping tables in user key level, it is possible that
file with newer version user key got picked and compacted to next level.
Now, the invariant that Version::Get depends on is broken, Version::Get
is broken.

The vulnerability of this approach is that a malicious client can retain
a snapshhot and do large number of writes to same key, which can cause
huge sorted table generated. LevelDB is a library that embedded in other
application. I think it should be the application's responsibility to
keep snapshhot from malicious clients.

Picking overlapping tables in all level compactions is another approach
to solve this problem. I think this approach may violate the algorithm
this implementation implies.
  • Loading branch information
kezhuw committed May 18, 2016
1 parent a7bff69 commit 849345d
Showing 1 changed file with 11 additions and 16 deletions.
27 changes: 11 additions & 16 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -926,16 +926,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
}

Slice key = input->key();
if (compact->compaction->ShouldStopBefore(key) &&
compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}

// Handle key/value, add to state, etc.
bool drop = false;
bool first_occurrence = false;
if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys
current_user_key.clear();
Expand All @@ -947,6 +941,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
Slice(current_user_key)) != 0) {
// First occurrence of this user key
current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
// Don't rotate at this key if last key is an error key.
first_occurrence = last_sequence_for_key != kMaxSequenceNumber;
has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber;
}
Expand Down Expand Up @@ -986,21 +982,20 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (!status.ok()) {
break;
}
} else if (first_occurrence &&
(compact->compaction->ShouldStopBefore(key) ||
compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize())) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
if (compact->builder->NumEntries() == 0) {
compact->current_output()->smallest.DecodeFrom(key);
}
compact->current_output()->largest.DecodeFrom(key);
compact->builder->Add(key, input->value());

// Close output file if it is big enough
if (compact->builder->FileSize() >=
compact->compaction->MaxOutputFileSize()) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
}

input->Next();
Expand Down

0 comments on commit 849345d

Please sign in to comment.