diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 9c0d3caed9e..9d6c16aafbf 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -315,6 +315,55 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + Reopen(options); + + std::map true_data; + int file_id = 1; + // prevent range deletions from being dropped due to becoming obsolete. + const Snapshot* snapshot = db_->GetSnapshot(); + + // range del [0, 50) in L0 file, [50, 100) in memtable + for (int i = 0; i < 2; i++) { + if (i == 1) { + db_->Flush(FlushOptions()); + } + ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), + Key(50 * i), Key(50 * (i + 1)))); + } + ASSERT_EQ(1, NumTableFilesAtLevel(0)); + + // overlaps with L0 file but not memtable, so flush is skipped + SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber(); + ASSERT_OK(GenerateAndAddExternalFile( + options, {10, 40}, {ValueType::kTypeValue, ValueType::kTypeValue}, + file_id++, &true_data)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno); + ASSERT_EQ(2, NumTableFilesAtLevel(0)); + + // overlaps with memtable, so flush is triggered (thus file count increases by + // two at this step). + ASSERT_OK(GenerateAndAddExternalFile( + options, {50, 90}, {ValueType::kTypeValue, ValueType::kTypeValue}, + file_id++, &true_data)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), ++last_seqno); + ASSERT_EQ(4, NumTableFilesAtLevel(0)); + + // snapshot unneeded now that both range deletions are persisted + db_->ReleaseSnapshot(snapshot); + + // overlaps with nothing, so places at bottom level and skips incrementing + // seqnum. + ASSERT_OK(GenerateAndAddExternalFile( + options, {101, 125}, {ValueType::kTypeValue, ValueType::kTypeValue}, + file_id++, &true_data)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno); + ASSERT_EQ(4, NumTableFilesAtLevel(0)); + ASSERT_EQ(1, NumTableFilesAtLevel(options.num_levels - 1)); +} + #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index ff919df4c5e..337f515889c 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -381,6 +381,14 @@ Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables( sv->imm->AddIterators(ro, &merge_iter_builder); ScopedArenaIterator memtable_iter(merge_iter_builder.Finish()); + MergeIteratorBuilder merge_range_del_iter_builder( + &cfd_->internal_comparator(), &arena); + merge_range_del_iter_builder.AddIterator( + sv->mem->NewRangeTombstoneIterator(ro)); + sv->imm->AddRangeTombstoneIterators(ro, &merge_range_del_iter_builder); + ScopedArenaIterator memtable_range_del_iter( + merge_range_del_iter_builder.Finish()); + Status status; *overlap = false; for (IngestedFileInfo& f : files_to_ingest_) { @@ -389,6 +397,11 @@ Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables( if (!status.ok() || *overlap == true) { break; } + status = IngestedFileOverlapWithRangeDeletions( + &f, memtable_range_del_iter.get(), overlap); + if (!status.ok() || *overlap == true) { + break; + } } return status; @@ -555,6 +568,34 @@ Status ExternalSstFileIngestionJob::IngestedFileOverlapWithIteratorRange( return iter->status(); } +Status ExternalSstFileIngestionJob::IngestedFileOverlapWithRangeDeletions( + const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter, + bool* overlap) { + auto* vstorage = cfd_->current()->storage_info(); + auto* ucmp = vstorage->InternalComparator()->user_comparator(); + + *overlap = false; + if (range_del_iter != nullptr) { + for (range_del_iter->SeekToFirst(); range_del_iter->Valid(); + range_del_iter->Next()) { + ParsedInternalKey parsed_key; + if (!ParseInternalKey(range_del_iter->key(), &parsed_key)) { + return Status::Corruption("corrupted range deletion key: " + + range_del_iter->key().ToString()); + } + RangeTombstone range_del(parsed_key, range_del_iter->value()); + if (ucmp->Compare(range_del.start_key_, + file_to_ingest->largest_user_key) <= 0 && + ucmp->Compare(file_to_ingest->smallest_user_key, + range_del.end_key_) <= 0) { + *overlap = true; + break; + } + } + } + return Status::OK(); +} + bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( const IngestedFileInfo* file_to_ingest, int level) { if (level == 0) { @@ -591,17 +632,22 @@ Status ExternalSstFileIngestionJob::IngestedFileOverlapWithLevel( ro.total_order_seek = true; MergeIteratorBuilder merge_iter_builder(&cfd_->internal_comparator(), &arena); - RangeDelAggregator range_del_agg(cfd_->internal_comparator(), - {} /* snapshots */); - sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, - lvl, &range_del_agg); - if (!range_del_agg.IsEmpty()) { - return Status::NotSupported( - "file ingestion with range tombstones is currently unsupported"); - } + MergeIteratorBuilder merge_range_del_iter_builder( + &cfd_->internal_comparator(), &arena); + sv->current->AddIteratorsForLevel(ro, env_options_, &merge_iter_builder, lvl, + nullptr /* range_del_agg */); + sv->current->AddRangeDelIteratorsForLevel(ro, env_options_, + &merge_range_del_iter_builder, lvl); ScopedArenaIterator level_iter(merge_iter_builder.Finish()); - return IngestedFileOverlapWithIteratorRange( + ScopedArenaIterator level_range_del_iter( + merge_range_del_iter_builder.Finish()); + Status status = IngestedFileOverlapWithIteratorRange( file_to_ingest, level_iter.get(), overlap_with_level); + if (status.ok() && *overlap_with_level == false) { + status = IngestedFileOverlapWithRangeDeletions( + file_to_ingest, level_range_del_iter.get(), overlap_with_level); + } + return status; } } // namespace rocksdb diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index 9cf5b16e5a0..10963852fa0 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -141,6 +141,13 @@ class ExternalSstFileIngestionJob { const IngestedFileInfo* file_to_ingest, InternalIterator* iter, bool* overlap); + // Check if `file_to_ingest` key range overlaps with any range deletions + // specified by `iter`. + // REQUIRES: Mutex held + Status IngestedFileOverlapWithRangeDeletions( + const IngestedFileInfo* file_to_ingest, InternalIterator* range_del_iter, + bool* overlap); + // Check if `file_to_ingest` key range overlap with level // REQUIRES: Mutex held Status IngestedFileOverlapWithLevel(SuperVersion* sv, diff --git a/db/memtable_list.cc b/db/memtable_list.cc index f3dcfdddecf..6bfa1192d53 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -168,6 +168,14 @@ Status MemTableListVersion::AddRangeTombstoneIterators( return Status::OK(); } +Status MemTableListVersion::AddRangeTombstoneIterators( + const ReadOptions& read_opts, MergeIteratorBuilder* merge_iter_builder) { + for (auto& m : memlist_) { + merge_iter_builder->AddIterator(m->NewRangeTombstoneIterator(read_opts)); + } + return Status::OK(); +} + void MemTableListVersion::AddIterators( const ReadOptions& options, std::vector* iterator_list, Arena* arena) { diff --git a/db/memtable_list.h b/db/memtable_list.h index c0d90743b16..4c7e63875fc 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -84,6 +84,8 @@ class MemTableListVersion { Status AddRangeTombstoneIterators(const ReadOptions& read_opts, Arena* arena, RangeDelAggregator* range_del_agg); + Status AddRangeTombstoneIterators(const ReadOptions& read_opts, + MergeIteratorBuilder* merge_iter_builder); void AddIterators(const ReadOptions& options, std::vector* iterator_list, diff --git a/db/table_cache.cc b/db/table_cache.cc index 0a9a9dae9b2..a849440a838 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -267,6 +267,42 @@ InternalIterator* TableCache::NewIterator( return result; } +InternalIterator* TableCache::NewRangeTombstoneIterator( + const ReadOptions& options, const EnvOptions& env_options, + const InternalKeyComparator& icomparator, const FileDescriptor& fd, + HistogramImpl* file_read_hist, bool skip_filters, int level) { + Status s; + TableReader* table_reader = nullptr; + Cache::Handle* handle = nullptr; + table_reader = fd.table_reader; + if (table_reader == nullptr) { + s = FindTable(env_options, icomparator, fd, &handle, + options.read_tier == kBlockCacheTier /* no_io */, + true /* record read_stats */, file_read_hist, skip_filters, + level); + if (s.ok()) { + table_reader = GetTableReaderFromHandle(handle); + } + } + InternalIterator* result = nullptr; + if (s.ok()) { + result = table_reader->NewRangeTombstoneIterator(options); + if (result != nullptr) { + result->RegisterCleanup(&UnrefEntry, cache_, handle); + } + } + if (result == nullptr && handle != nullptr) { + // the range deletion block didn't exist, or there was a failure between + // getting handle and getting iterator. + ReleaseHandle(handle); + } + if (!s.ok()) { + assert(result == nullptr); + result = NewErrorInternalIterator(s); + } + return result; +} + Status TableCache::Get(const ReadOptions& options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, const Slice& k, diff --git a/db/table_cache.h b/db/table_cache.h index 3c71657876e..68dbfa74e28 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -60,6 +60,12 @@ class TableCache { HistogramImpl* file_read_hist = nullptr, bool for_compaction = false, Arena* arena = nullptr, bool skip_filters = false, int level = -1); + InternalIterator* NewRangeTombstoneIterator( + const ReadOptions& options, const EnvOptions& toptions, + const InternalKeyComparator& internal_comparator, + const FileDescriptor& file_fd, HistogramImpl* file_read_hist, + bool skip_filters, int level); + // If a seek to internal key "k" in specified file finds an entry, // call (*handle_result)(arg, found_key, found_value) repeatedly until // it returns false. diff --git a/db/version_set.cc b/db/version_set.cc index 0032c19a03d..637abb027cc 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -865,6 +865,19 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, } } +void Version::AddRangeDelIteratorsForLevel( + const ReadOptions& read_options, const EnvOptions& soptions, + MergeIteratorBuilder* merge_iter_builder, int level) { + for (size_t i = 0; i < storage_info_.LevelFilesBrief(level).num_files; i++) { + const auto& file = storage_info_.LevelFilesBrief(level).files[i]; + merge_iter_builder->AddIterator( + cfd_->table_cache()->NewRangeTombstoneIterator( + read_options, soptions, cfd_->internal_comparator(), file.fd, + cfd_->internal_stats()->GetFileReadHist(level), + false /* skip_filters */, level)); + } +} + VersionStorageInfo::VersionStorageInfo( const InternalKeyComparator* internal_comparator, const Comparator* user_comparator, int levels, diff --git a/db/version_set.h b/db/version_set.h index d330cbce0a8..4558282e66e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -463,6 +463,11 @@ class Version { MergeIteratorBuilder* merger_iter_builder, int level, RangeDelAggregator* range_del_agg); + void AddRangeDelIteratorsForLevel(const ReadOptions& read_options, + const EnvOptions& soptions, + MergeIteratorBuilder* merge_iter_builder, + int level); + // Lookup the value for key. If found, store it in *val and // return OK. Else return a non-OK status. // Uses *operands to store merge_operator operations to apply later.