Skip to content

Commit

Permalink
Support Clip DB to KeyRange (#11379)
Browse files Browse the repository at this point in the history
Summary:
This PR is part of the request #11317.
(Another part is #11378)

ClipDB() will clip the entries in the CF according to the range [begin_key, end_key). All the entries outside this range will be completely deleted (including tombstones).
 This feature is mainly used to ensure that there is no overlapping Key when calling CreateColumnFamilyWithImports() to import multiple CFs.

When Calling ClipDB [begin, end), there are the following steps

1.  Quickly and directly delete files without overlap
 DeleteFilesInRanges(nullptr, begin) + DeleteFilesInRanges(end, nullptr)
2. Delete the Key outside the range
Delete[smallest_key, begin) + Delete[end, largest_key]
3. Delete the tombstone through Manul Compact
CompactRange(option, nullptr, nullptr)

Pull Request resolved: #11379

Reviewed By: ajkr

Differential Revision: D45840358

Pulled By: cbi42

fbshipit-source-id: 54152e8a45fd8ede137f99787eb252f0b51440a4
  • Loading branch information
mayuehappy authored and facebook-github-bot committed May 18, 2023
1 parent 7263f51 commit 8d8eb0e
Show file tree
Hide file tree
Showing 15 changed files with 327 additions and 1 deletion.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1290,6 +1290,7 @@ if(WITH_TESTS)
db/db_bloom_filter_test.cc
db/db_compaction_filter_test.cc
db/db_compaction_test.cc
db/db_clip_test.cc
db/db_dynamic_level_test.cc
db/db_encryption_test.cc
db/db_flush_test.cc
Expand Down
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* New statistics `rocksdb.file.read.db.open.micros` that measures read time of block-based SST tables or blob files during db open.

### Public API Changes
* EXPERIMENTAL: Add new API `DB::ClipColumnFamily` to clip the key in CF to a certain range. It will physically deletes all keys outside the range including tombstones.
* Add `MakeSharedCache()` construction functions to various cache Options objects, and deprecated the `NewWhateverCache()` functions with long parameter lists.

### Behavior changes
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -1480,6 +1480,9 @@ db_compaction_filter_test: $(OBJ_DIR)/db/db_compaction_filter_test.o $(TEST_LIBR
db_compaction_test: $(OBJ_DIR)/db/db_compaction_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_clip_test: $(OBJ_DIR)/db/db_clip_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

db_dynamic_level_test: $(OBJ_DIR)/db/db_dynamic_level_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

Expand Down
6 changes: 6 additions & 0 deletions TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -4750,6 +4750,12 @@ cpp_unittest_wrapper(name="db_bloom_filter_test",
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_clip_test",
srcs=["db/db_clip_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])


cpp_unittest_wrapper(name="db_compaction_filter_test",
srcs=["db/db_compaction_filter_test.cc"],
deps=[":rocksdb_test_lib"],
Expand Down
142 changes: 142 additions & 0 deletions db/db_clip_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

#include "db/db_test_util.h"
#include "port/port.h"
#include "util/random.h"

namespace ROCKSDB_NAMESPACE {

class DBClipTest : public DBTestBase {
public:
DBClipTest() : DBTestBase("db_clip_test", /*env_do_fsync=*/true) {}
};

TEST_F(DBClipTest, TestClipRange) {
Options options = CurrentOptions();
options.write_buffer_size = 10 * 1024 * 1024;
options.max_bytes_for_level_multiplier = 2;
options.num_levels = 3;
options.max_background_compactions = 3;
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();

DestroyAndReopen(options);
int32_t value_size = 10 * 1024; // 10 KB

Random rnd(301);
std::map<int32_t, std::string> values;

// file [0 => 100), [100 => 200), ... [900, 1000)
for (auto i = 0; i < 10; i++) {
for (auto j = 0; j < 100; j++) {
auto k = i * 100 + j;
values[k] = rnd.RandomString(value_size);
ASSERT_OK(Put(Key(k), values[k]));
}
ASSERT_OK(Flush());
}
ASSERT_EQ("10", FilesPerLevel(0));
auto begin_key = Key(251), end_key = Key(751);
ASSERT_OK(
db_->ClipColumnFamily(db_->DefaultColumnFamily(), begin_key, end_key));

for (auto i = 0; i < 251; i++) {
ReadOptions ropts;
std::string result;
auto s = db_->Get(ropts, Key(i), &result);
ASSERT_TRUE(s.IsNotFound());
}
for (auto i = 251; i < 751; i++) {
ASSERT_EQ(Get(Key(i)), values[i]);
}
for (auto i = 751; i < 1000; i++) {
ReadOptions ropts;
std::string result;
auto s = db_->Get(ropts, Key(i), &result);
ASSERT_TRUE(s.IsNotFound());
}

std::vector<LiveFileMetaData> all_metadata;
db_->GetLiveFilesMetaData(&all_metadata);
for (auto& md : all_metadata) {
// make sure clip_begin_key <= file_smallestkey <= file_largestkey <=
// clip_end_key
bool in_range = false;

if (options.comparator->Compare(begin_key, md.smallestkey) <= 0 &&
options.comparator->Compare(end_key, md.largestkey) > 0) {
in_range = true;
}
ASSERT_TRUE(in_range);
}

CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
ASSERT_EQ("0,0,3", FilesPerLevel(0));

for (auto i = 0; i < 10; i += 2) {
for (auto j = 0; j < 100; j++) {
auto k = i * 100 + j;
ASSERT_OK(Put(Key(k), values[k]));
}
ASSERT_OK(Flush());
}
ASSERT_EQ("5,0,3", FilesPerLevel(0));
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
ASSERT_EQ("0,5,3", FilesPerLevel(0));

for (auto i = 1; i < 10; i += 2) {
for (auto j = 0; j < 100; j++) {
auto k = i * 100 + j;
ASSERT_OK(Put(Key(k), values[k]));
}
ASSERT_OK(Flush());
}
ASSERT_EQ("5,5,3", FilesPerLevel(0));

auto begin_key_2 = Key(222), end_key_2 = Key(888);

ASSERT_OK(db_->ClipColumnFamily(db_->DefaultColumnFamily(), begin_key_2,
end_key_2));

for (auto i = 0; i < 222; i++) {
ReadOptions ropts;
std::string result;
auto s = db_->Get(ropts, Key(i), &result);
ASSERT_TRUE(s.IsNotFound());
}
for (auto i = 222; i < 888; i++) {
ASSERT_EQ(Get(Key(i)), values[i]);
}
for (auto i = 888; i < 1000; i++) {
ReadOptions ropts;
std::string result;
auto s = db_->Get(ropts, Key(i), &result);
ASSERT_TRUE(s.IsNotFound());
}

std::vector<LiveFileMetaData> all_metadata_2;
db_->GetLiveFilesMetaData(&all_metadata_2);
for (auto& md : all_metadata_2) {
// make sure clip_begin_key <= file_smallestkey <= file_largestkey <=
// clip_end_key
bool in_range = false;
if (begin_key_2.compare(md.smallestkey) <= 0 &&
end_key_2.compare(md.largestkey) > 0) {
in_range = true;
}
ASSERT_TRUE(in_range);
}
}
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
7 changes: 7 additions & 0 deletions db/db_impl/compacted_db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ class CompactedDBImpl : public DBImpl {
return Status::NotSupported("Not supported in compacted db mode.");
}

using DB::ClipColumnFamily;
virtual Status ClipColumnFamily(ColumnFamilyHandle* /*column_family*/,
const Slice& /*begin*/,
const Slice& /*end*/) override {
return Status::NotSupported("Not supported in compacted db mode.");
}

// FIXME: some missing overrides for more "write" functions
// Share with DBImplReadOnly?

Expand Down
76 changes: 75 additions & 1 deletion db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4522,7 +4522,6 @@ void DBImpl::GetAllColumnFamilyMetaData(
}
}


Status DBImpl::CheckConsistency() {
mutex_.AssertHeld();
std::vector<LiveFileMetaData> metadata;
Expand Down Expand Up @@ -5705,6 +5704,81 @@ Status DBImpl::CreateColumnFamilyWithImport(
return status;
}

Status DBImpl::ClipColumnFamily(ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) {
assert(column_family);
Status status;
// Flush memtable
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
auto* cfd =
static_cast_with_check<ColumnFamilyHandleImpl>(column_family)->cfd();
if (immutable_db_options_.atomic_flush) {
status = AtomicFlushMemTables(flush_opts, FlushReason::kDeleteFiles,
{} /* provided_candidate_cfds */,
false /* entered_write_thread */);
} else {
status = FlushMemTable(cfd, flush_opts, FlushReason::kDeleteFiles,
false /* entered_write_thread */);
}

if (status.ok()) {
// DeleteFilesInRanges non-overlap files except L0
std::vector<RangePtr> ranges;
ranges.push_back(RangePtr(nullptr, &begin_key));
ranges.push_back(RangePtr(&end_key, nullptr));
status = DeleteFilesInRanges(column_family, ranges.data(), ranges.size());
}

// DeleteRange the remaining overlapping keys
bool empty_after_delete = false;
if (status.ok()) {
Slice smallest_user_key, largest_user_key;
{
// Lock db mutex
InstrumentedMutexLock l(&mutex_);
cfd->current()->GetSstFilesBoundaryKeys(&smallest_user_key,
&largest_user_key);
}
// all the files has been deleted after DeleteFilesInRanges;
if (smallest_user_key.empty() && largest_user_key.empty()) {
empty_after_delete = true;
} else {
const Comparator* const ucmp = column_family->GetComparator();
WriteOptions wo;
// Delete [smallest_user_key, clip_begin_key)
if (ucmp->Compare(smallest_user_key, begin_key) < 0) {
status = DeleteRange(wo, column_family, smallest_user_key, begin_key);
}

if (status.ok()) {
// Delete [clip_end_key, largest_use_key]
if (ucmp->Compare(end_key, largest_user_key) < 0) {
status = DeleteRange(wo, column_family, end_key, largest_user_key);
if (status.ok()) {
status = Delete(wo, column_family, largest_user_key);
}
}
}
}
}

if (status.ok() && !empty_after_delete) {
// CompactRange delete all the tombstones
CompactRangeOptions compact_options;
compact_options.exclusive_manual_compaction = true;
compact_options.bottommost_level_compaction =
BottommostLevelCompaction::kForceOptimized;
// We could just compact the ranges [null, clip_begin_key] and
// [clip_end_key, null]. But due to how manual compaction calculates the
// last level to compact to and that range tombstones are not dropped
// during non-bottommost compactions, calling CompactRange() on these two
// ranges may not clear all range tombstones.
status = CompactRange(compact_options, nullptr, nullptr);
}
return status;
}

Status DBImpl::VerifyFileChecksums(const ReadOptions& read_options) {
return VerifyChecksumInternal(read_options, /*use_file_checksum=*/true);
}
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ class DBImpl : public DB {
const ExportImportFilesMetaData& metadata,
ColumnFamilyHandle** handle) override;

using DB::ClipColumnFamily;
virtual Status ClipColumnFamily(ColumnFamilyHandle* column_family,
const Slice& begin_key,
const Slice& end_key) override;

using DB::VerifyFileChecksums;
Status VerifyFileChecksums(const ReadOptions& read_options) override;

Expand Down
7 changes: 7 additions & 0 deletions db/db_impl/db_impl_readonly.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ class DBImplReadOnly : public DBImpl {
return Status::NotSupported("Not supported operation in read only mode.");
}

using DB::ClipColumnFamily;
virtual Status ClipColumnFamily(ColumnFamilyHandle* /*column_family*/,
const Slice& /*begin*/,
const Slice& /*end*/) override {
return Status::NotSupported("Not supported operation in read only mode.");
}

// FIXME: some missing overrides for more "write" functions

protected:
Expand Down
7 changes: 7 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3107,6 +3107,13 @@ class ModelDB : public DB {
return Status::NotSupported("Not implemented.");
}

using DB::ClipColumnFamily;
virtual Status ClipColumnFamily(ColumnFamilyHandle* /*column_family*/,
const Slice& /*begin*/,
const Slice& /*end*/) override {
return Status::NotSupported("Not implemented.");
}

using DB::GetPropertiesOfAllTables;
Status GetPropertiesOfAllTables(
ColumnFamilyHandle* /*column_family*/,
Expand Down
43 changes: 43 additions & 0 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1826,6 +1826,49 @@ uint64_t Version::GetSstFilesSize() {
return sst_files_size;
}

void Version::GetSstFilesBoundaryKeys(Slice* smallest_user_key,
Slice* largest_user_key) {
smallest_user_key->clear();
largest_user_key->clear();
bool initialized = false;
const Comparator* ucmp = storage_info_.user_comparator_;
for (int level = 0; level < cfd_->NumberLevels(); level++) {
if (storage_info_.LevelFiles(level).size() == 0) {
continue;
}
if (level == 0) {
// we need to consider all files on level 0
for (const auto& file : storage_info_.LevelFiles(level)) {
const Slice& start_user_key = file->smallest.user_key();
if (!initialized ||
ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
*smallest_user_key = start_user_key;
}
const Slice& end_user_key = file->largest.user_key();
if (!initialized ||
ucmp->Compare(end_user_key, *largest_user_key) > 0) {
*largest_user_key = end_user_key;
}
initialized = true;
}
} else {
// we only need to consider the first and last file
const Slice& start_user_key =
storage_info_.LevelFiles(level)[0]->smallest.user_key();
if (!initialized ||
ucmp->Compare(start_user_key, *smallest_user_key) < 0) {
*smallest_user_key = start_user_key;
}
const Slice& end_user_key =
storage_info_.LevelFiles(level).back()->largest.user_key();
if (!initialized || ucmp->Compare(end_user_key, *largest_user_key) > 0) {
*largest_user_key = end_user_key;
}
initialized = true;
}
}
}

void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
uint64_t oldest_time = std::numeric_limits<uint64_t>::max();
for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
Expand Down
3 changes: 3 additions & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,9 @@ class Version {

void GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta);

void GetSstFilesBoundaryKeys(Slice* smallest_user_key,
Slice* largest_user_key);

uint64_t GetSstFilesSize();

// Retrieves the file_creation_time of the oldest file in the DB.
Expand Down
Loading

0 comments on commit 8d8eb0e

Please sign in to comment.