Skip to content

Commit

Permalink
Support ingest file when range deletions exist
Browse files Browse the repository at this point in the history
Summary:
Previously we returned NotSupported when ingesting files into a database containing any range deletions. This diff adds the support.

- Flush if any memtable contains range deletions overlapping the to-be-ingested file
- Place to-be-ingested file before any level that contains range deletions overlapping it.
- Added support for `Version` to return iterators over range deletions in a given level. Previously, we piggybacked getting range deletions onto `Version`'s `Get()` / `AddIterator()` functions by passing them a `RangeDelAggregator*`. But file ingestion needs to get iterators over range deletions, not populate an aggregator (since the aggregator does collapsing and doesn't expose the actual ranges).
Closes #2370

Differential Revision: D5127648

Pulled By: ajkr

fbshipit-source-id: 816faeb9708adfa5287962bafdde717db56e3f1a
  • Loading branch information
ajkr authored and facebook-github-bot committed May 31, 2017
1 parent ad19eb8 commit 9c9909b
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 9 deletions.
49 changes: 49 additions & 0 deletions db/external_sst_file_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,55 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
Reopen(options);

std::map<std::string, std::string> true_data;
int file_id = 1;
// prevent range deletions from being dropped due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();

// range del [0, 50) in L0 file, [50, 100) in memtable
for (int i = 0; i < 2; i++) {
if (i == 1) {
db_->Flush(FlushOptions());
}
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
Key(50 * i), Key(50 * (i + 1))));
}
ASSERT_EQ(1, NumTableFilesAtLevel(0));

// overlaps with L0 file but not memtable, so flush is skipped
SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
ASSERT_OK(GenerateAndAddExternalFile(
options, {10, 40}, {ValueType::kTypeValue, ValueType::kTypeValue},
file_id++, &true_data));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
ASSERT_EQ(2, NumTableFilesAtLevel(0));

// overlaps with memtable, so flush is triggered (thus file count increases by
// two at this step).
ASSERT_OK(GenerateAndAddExternalFile(
options, {50, 90}, {ValueType::kTypeValue, ValueType::kTypeValue},
file_id++, &true_data));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno);
ASSERT_EQ(4, NumTableFilesAtLevel(0));

// snapshot unneeded now that both range deletions are persisted
db_->ReleaseSnapshot(snapshot);

// overlaps with nothing, so places at bottom level and skips incrementing
// seqnum.
ASSERT_OK(GenerateAndAddExternalFile(
options, {101, 125}, {ValueType::kTypeValue, ValueType::kTypeValue},
file_id++, &true_data));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
ASSERT_EQ(4, NumTableFilesAtLevel(0));
ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1));
}

#endif // ROCKSDB_LITE

} // namespace rocksdb
Expand Down
64 changes: 55 additions & 9 deletions db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,14 @@ Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables(
sv->imm->AddIterators(ro, &merge_iter_builder);
ScopedArenaIterator memtable_iter(merge_iter_builder.Finish());

MergeIteratorBuilder merge_range_del_iter_builder(
&cfd_->internal_comparator(), &arena);
merge_range_del_iter_builder.AddIterator(
sv->mem->NewRangeTombstoneIterator(ro));
sv->imm->AddRangeTombstoneIterators(ro, &merge_range_del_iter_builder);
ScopedArenaIterator memtable_range_del_iter(
merge_range_del_iter_builder.Finish());

Status status;
*overlap = false;
for (IngestedFileInfo& f : files_to_ingest_) {
Expand All @@ -389,6 +397,11 @@ Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables(
if (!status.ok() || *overlap == true) {
break;
}
status = IngestedFileOverlapWithRangeDeletions(
&f, memtable_range_del_iter.get(), overlap);
if (!status.ok() || *overlap == true) {
break;
}
}

return status;
Expand Down Expand Up @@ -555,6 +568,34 @@ Status ExternalSstFileIngestionJob::IngestedFileOverlapWithIteratorRange(
return iter->status();
}

Status ExternalSstFileIngestionJob::IngestedFileOverlapWithRangeDeletions(
const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter,
bool* overlap) {
auto* vstorage = cfd_->current()->storage_info();
auto* ucmp = vstorage->InternalComparator()->user_comparator();

*overlap = false;
if (range_del_iter != nullptr) {
for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
range_del_iter->Next()) {
ParsedInternalKey parsed_key;
if (!ParseInternalKey(range_del_iter->key(), &parsed_key)) {
return Status::Corruption("corrupted range deletion key: " +
range_del_iter->key().ToString());
}
RangeTombstone range_del(parsed_key, range_del_iter->value());
if (ucmp->Compare(range_del.start_key_,
file_to_ingest->largest_user_key) <= 0 &&
ucmp->Compare(file_to_ingest->smallest_user_key,
range_del.end_key_) <= 0) {
*overlap = true;
break;
}
}
}
return Status::OK();
}

bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
const IngestedFileInfo* file_to_ingest, int level) {
if (level == 0) {
Expand Down Expand Up @@ -591,17 +632,22 @@ Status ExternalSstFileIngestionJob::IngestedFileOverlapWithLevel(
ro.total_order_seek = true;
MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(),
&arena);
RangeDelAggregator range_del_agg(cfd_->internal_comparator(),
{} /* snapshots */);
sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder,
lvl, &range_del_agg);
if (!range_del_agg.IsEmpty()) {
return Status::NotSupported(
"file ingestion with range tombstones is currently unsupported");
}
MergeIteratorBuilder merge_range_del_iter_builder(
&cfd_->internal_comparator(), &arena);
sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, lvl,
nullptr /* range_del_agg */);
sv->current->AddRangeDelIteratorsForLevel(ro, env_options_,
&merge_range_del_iter_builder, lvl);
ScopedArenaIterator level_iter(merge_iter_builder.Finish());
return IngestedFileOverlapWithIteratorRange(
ScopedArenaIterator level_range_del_iter(
merge_range_del_iter_builder.Finish());
Status status = IngestedFileOverlapWithIteratorRange(
file_to_ingest, level_iter.get(), overlap_with_level);
if (status.ok() && *overlap_with_level == false) {
status = IngestedFileOverlapWithRangeDeletions(
file_to_ingest, level_range_del_iter.get(), overlap_with_level);
}
return status;
}

} // namespace rocksdb
Expand Down
7 changes: 7 additions & 0 deletions db/external_sst_file_ingestion_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ class ExternalSstFileIngestionJob {
const IngestedFileInfo* file_to_ingest, InternalIterator* iter,
bool* overlap);

// Check if `file_to_ingest` key range overlaps with any range deletions
// specified by `iter`.
// REQUIRES: Mutex held
Status IngestedFileOverlapWithRangeDeletions(
const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter,
bool* overlap);

// Check if `file_to_ingest` key range overlap with level
// REQUIRES: Mutex held
Status IngestedFileOverlapWithLevel(SuperVersion* sv,
Expand Down
8 changes: 8 additions & 0 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,14 @@ Status MemTableListVersion::AddRangeTombstoneIterators(
return Status::OK();
}

Status MemTableListVersion::AddRangeTombstoneIterators(
const ReadOptions& read_opts, MergeIteratorBuilder* merge_iter_builder) {
for (auto& m : memlist_) {
merge_iter_builder->AddIterator(m->NewRangeTombstoneIterator(read_opts));
}
return Status::OK();
}

void MemTableListVersion::AddIterators(
const ReadOptions& options, std::vector<InternalIterator*>* iterator_list,
Arena* arena) {
Expand Down
2 changes: 2 additions & 0 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class MemTableListVersion {

Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena,
RangeDelAggregator* range_del_agg);
Status AddRangeTombstoneIterators(const ReadOptions& read_opts,
MergeIteratorBuilder* merge_iter_builder);

void AddIterators(const ReadOptions& options,
std::vector<InternalIterator*>* iterator_list,
Expand Down
36 changes: 36 additions & 0 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,42 @@ InternalIterator* TableCache::NewIterator(
return result;
}

InternalIterator* TableCache::NewRangeTombstoneIterator(
const ReadOptions& options, const EnvOptions& env_options,
const InternalKeyComparator& icomparator, const FileDescriptor& fd,
HistogramImpl* file_read_hist, bool skip_filters, int level) {
Status s;
TableReader* table_reader = nullptr;
Cache::Handle* handle = nullptr;
table_reader = fd.table_reader;
if (table_reader == nullptr) {
s = FindTable(env_options, icomparator, fd, &handle,
options.read_tier == kBlockCacheTier /* no_io */,
true /* record read_stats */, file_read_hist, skip_filters,
level);
if (s.ok()) {
table_reader = GetTableReaderFromHandle(handle);
}
}
InternalIterator* result = nullptr;
if (s.ok()) {
result = table_reader->NewRangeTombstoneIterator(options);
if (result != nullptr) {
result->RegisterCleanup(&UnrefEntry, cache_, handle);
}
}
if (result == nullptr && handle != nullptr) {
// the range deletion block didn't exist, or there was a failure between
// getting handle and getting iterator.
ReleaseHandle(handle);
}
if (!s.ok()) {
assert(result == nullptr);
result = NewErrorInternalIterator(s);
}
return result;
}

Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, const Slice& k,
Expand Down
6 changes: 6 additions & 0 deletions db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ class TableCache {
HistogramImpl* file_read_hist = nullptr, bool for_compaction = false,
Arena* arena = nullptr, bool skip_filters = false, int level = -1);

InternalIterator* NewRangeTombstoneIterator(
const ReadOptions& options, const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, HistogramImpl* file_read_hist,
bool skip_filters, int level);

// If a seek to internal key "k" in specified file finds an entry,
// call (*handle_result)(arg, found_key, found_value) repeatedly until
// it returns false.
Expand Down
13 changes: 13 additions & 0 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,19 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options,
}
}

void Version::AddRangeDelIteratorsForLevel(
const ReadOptions& read_options, const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder, int level) {
for (size_t i = 0; i < storage_info_.LevelFilesBrief(level).num_files; i++) {
const auto& file = storage_info_.LevelFilesBrief(level).files[i];
merge_iter_builder->AddIterator(
cfd_->table_cache()->NewRangeTombstoneIterator(
read_options, soptions, cfd_->internal_comparator(), file.fd,
cfd_->internal_stats()->GetFileReadHist(level),
false /* skip_filters */, level));
}
}

VersionStorageInfo::VersionStorageInfo(
const InternalKeyComparator* internal_comparator,
const Comparator* user_comparator, int levels,
Expand Down
5 changes: 5 additions & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,11 @@ class Version {
MergeIteratorBuilder* merger_iter_builder,
int level, RangeDelAggregator* range_del_agg);

void AddRangeDelIteratorsForLevel(const ReadOptions& read_options,
const EnvOptions& soptions,
MergeIteratorBuilder* merge_iter_builder,
int level);

// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status.
// Uses *operands to store merge_operator operations to apply later.
Expand Down

2 comments on commit 9c9909b

@zhangjinpeng87
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ajkr I see the pr has merged about 2 months, but not release yet. When this pr will be release? Is there any problem?

@ajkr
Copy link
Contributor Author

@ajkr ajkr commented on 9c9909b Jul 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhangjinpeng1987, It should be available since 5.6.0. Let me know if you still think there's an issue with it.

Please sign in to comment.