diff --git a/engines/ep/src/kv_bucket.cc b/engines/ep/src/kv_bucket.cc index 18b1653462..d32cda1775 100644 --- a/engines/ep/src/kv_bucket.cc +++ b/engines/ep/src/kv_bucket.cc @@ -3098,6 +3098,11 @@ std::chrono::seconds KVBucket::getHistoryRetentionSeconds() const { void KVBucket::setHistoryRetentionBytes(size_t bytes) { historyRetentionBytes = bytes; + for (auto& i : vbMap.shards) { + KVShard* shard = i.get(); + shard->getRWUnderlying()->setHistoryRetentionBytes(bytes); + } + } size_t KVBucket::getHistoryRetentionBytes() const { diff --git a/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc b/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc index d2c1e6161b..13b07567f3 100644 --- a/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc +++ b/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc @@ -4542,3 +4542,10 @@ std::unique_ptr CouchKVStore::begin( return std::make_unique( *this, vbid, std::move(pcb)); } + +void CouchKVStore::setHistoryRetentionBytes(size_t size) { + // no-op. + // Note: StorageProperties reports that history scan is not supported, so + // we accept this attempt to set size, but will fail if a scanAllVersions + // is attempted. +} \ No newline at end of file diff --git a/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.h b/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.h index b6291dc9d7..b5a60c20bc 100644 --- a/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.h +++ b/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.h @@ -410,6 +410,8 @@ class CouchKVStore : public KVStore std::unique_ptr begin( Vbid vbid, std::unique_ptr pcb) override; + void setHistoryRetentionBytes(size_t size) override; + protected: /** * RAII holder for a couchstore LocalDoc object diff --git a/engines/ep/src/kvstore/kvstore_iface.h b/engines/ep/src/kvstore/kvstore_iface.h index dedb8832ba..0f75ed6295 100644 --- a/engines/ep/src/kvstore/kvstore_iface.h +++ b/engines/ep/src/kvstore/kvstore_iface.h @@ -763,6 +763,11 @@ class KVStoreIface { * @param vbid ID of the vbucket being created */ virtual void prepareToCreateImpl(Vbid vbid) = 0; + + /** + * Method to configure the amount of history a vbucket should retain. + */ + virtual void setHistoryRetentionBytes(size_t size) = 0; }; std::string to_string(KVStoreIface::ReadVBStateStatus status); diff --git a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc index 198ecd0dcd..6b8c0db086 100644 --- a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc +++ b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc @@ -863,7 +863,7 @@ StorageProperties MagmaKVStore::getStorageProperties() const { StorageProperties::AutomaticDeduplication::No, StorageProperties::PrepareCounting::No, StorageProperties::CompactionStaleItemCallbacks::Yes, - StorageProperties::HistoryRetentionAvailable::No); + StorageProperties::HistoryRetentionAvailable::Yes); return rv; } @@ -1683,8 +1683,7 @@ std::unique_ptr MagmaKVStore::initBySeqnoScanContext( getDroppedStatus.String()); } - // @todo:assign this using magma->GetOldestHistorySeqno(snapshot); - auto historyStartSeqno = 0; + auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot); if (logger->should_log(spdlog::level::info)) { logger->info( "MagmaKVStore::initBySeqnoScanContext {} seqno:{} endSeqno:{}" @@ -1797,8 +1796,7 @@ std::unique_ptr MagmaKVStore::initByIdScanContext( return nullptr; } - // @todo:assign this using magma->GetOldestHistorySeqno(snapshot); - auto historyStartSeqno = 0; + auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot); logger->info( "MagmaKVStore::initByIdScanContext {} historyStartSeqno:{} " "KeyIterator:{}", @@ -1818,13 +1816,16 @@ std::unique_ptr MagmaKVStore::initByIdScanContext( historyStartSeqno); } +scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const { + return scan(ctx, magma::Magma::SeqIterator::Mode::Snapshot); +} + scan_error_t MagmaKVStore::scanAllVersions(BySeqnoScanContext& ctx) const { - // @todo use magma's mode - // return scan(ctx, magma::Magma::SeqIterator::Mode::History); - return scan(ctx); + return scan(ctx, magma::Magma::SeqIterator::Mode::History); } -scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const { +scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx, + magma::Magma::SeqIterator::Mode mode) const { if (ctx.lastReadSeqno == ctx.maxSeqno) { logger->TRACE("MagmaKVStore::scan {} lastReadSeqno:{} == maxSeqno:{}", ctx.vbid, @@ -1837,7 +1838,8 @@ scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const { startSeqno = ctx.lastReadSeqno + 1; } auto& mctx = dynamic_cast(ctx); - for (mctx.itr->Seek(startSeqno, ctx.maxSeqno); mctx.itr->Valid(); + for (mctx.itr->Initialize(startSeqno, ctx.maxSeqno, mode); + mctx.itr->Valid(); mctx.itr->Next()) { Slice keySlice, metaSlice, valSlice; uint64_t seqno; @@ -3710,3 +3712,7 @@ std::pair MagmaKVStore::getOldestRollbackableHighSeqno( return {status, seqno}; } + +void MagmaKVStore::setHistoryRetentionBytes(size_t size) { + magma->SetHistoryRetentionSize(size); +} diff --git a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h index 36f83b34b8..93c4ad1245 100644 --- a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h +++ b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h @@ -567,6 +567,8 @@ class MagmaKVStore : public KVStore { std::unique_ptr begin( Vbid vbid, std::unique_ptr pcb) override; + void setHistoryRetentionBytes(size_t size) override; + // Magma uses a unique logger with a prefix of magma so that all logging // calls from the wrapper thru magma will be prefixed with magma. std::shared_ptr logger; @@ -768,6 +770,9 @@ class MagmaKVStore : public KVStore { const magma::Slice& valSlice, std::function valueRead) const; + scan_error_t scan(BySeqnoScanContext& ctx, + magma::Magma::SeqIterator::Mode mode) const; + MagmaKVStoreConfig& configuration; /** diff --git a/engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.cc b/engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.cc index a0f3652fb2..819cdae063 100644 --- a/engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.cc +++ b/engines/ep/src/kvstore/magma-kvstore/magma-memory-tracking-proxy.cc @@ -510,7 +510,9 @@ magma::Status MagmaMemoryTrackingProxy::WriteDocs( docOperations, kvsRev, wrappedDocCallback, - wrappedPostCallback); + wrappedPostCallback, + // @todo: don't force this - must be passed down + magma::Magma::HistoryMode::Enabled); } magma::Status MagmaMemoryTrackingProxy::NewCheckpoint( diff --git a/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc b/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc index ce23c96aa0..ac0cb48fb0 100644 --- a/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc +++ b/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc @@ -3299,3 +3299,10 @@ Vbid::id_type NexusKVStore::getCacheSlot(Vbid vbid) const { uint64_t NexusKVStore::getPurgeSeqno(Vbid vbid) const { return purgeSeqno.at(getCacheSlot(vbid)); } + +void NexusKVStore::setHistoryRetentionBytes(size_t size) { + // no-op. + // Note: StorageProperties reports that history scan is not supported, so + // we accept this attempt to set size, but will fail if a scanAllVersions + // is attempted. +} diff --git a/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h b/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h index f7d86eccab..adffb91ce2 100644 --- a/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h +++ b/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h @@ -143,6 +143,7 @@ class NexusKVStore : public KVStoreIface { void delSystemEvent(TransactionContext& txnCtx, const queued_item item) override; void endTransaction(Vbid vbid) override; + void setHistoryRetentionBytes(size_t size) override; /** * Unit test only hook called before we compact the first KVStore. Public as diff --git a/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.cc b/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.cc index e45b88ce8f..689edfe061 100644 --- a/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.cc +++ b/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.cc @@ -1973,3 +1973,10 @@ RocksDBKVStoreTransactionContext::RocksDBKVStoreTransactionContext( : TransactionContext(kvstore, vbid, std::move(cb)), pendingReqs(std::make_unique()) { } + +void RocksDBKVStore::setHistoryRetentionBytes(size_t size) { + // no-op. + // Note: StorageProperties reports that history scan is not supported, so + // we accept this attempt to set size, but will fail if a scanAllVersions + // is attempted. +} diff --git a/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.h b/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.h index cbeb71a388..0b28dadeb6 100644 --- a/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.h +++ b/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.h @@ -333,6 +333,8 @@ class RocksDBKVStore : public KVStore { std::unique_ptr begin( Vbid vbid, std::unique_ptr pcb) override; + void setHistoryRetentionBytes(size_t size) override; + protected: // Write a batch of updates to the given database; measuring the time // taken and adding the timer to the commit histogram. diff --git a/engines/ep/tests/mock/mock_kvstore.h b/engines/ep/tests/mock/mock_kvstore.h index 406d4da360..7d31011fe8 100644 --- a/engines/ep/tests/mock/mock_kvstore.h +++ b/engines/ep/tests/mock/mock_kvstore.h @@ -225,6 +225,7 @@ class MockKVStore : public KVStore { prepareToRollback, (Vbid vbid), (override)); + MOCK_METHOD(void, setHistoryRetentionBytes, (size_t size), (override)); /** * Helper function to replace the existing read-write KVStore in the given diff --git a/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc b/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc index 438dff9d34..8a1f931452 100644 --- a/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc +++ b/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc @@ -88,7 +88,8 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { void applyEvents(TransactionContext& txnCtx, VB::Commit& commitData, - const CollectionsManifest& cm) { + const CollectionsManifest& cm, + bool writeEventNow = true) { manifest.update(*vbucket, makeManifest(cm)); std::vector events; @@ -96,17 +97,44 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { for (auto& ev : events) { commitData.collections.recordSystemEvent(*ev); + if (writeEventNow) { + if (ev->isDeleted()) { + kvstore->delSystemEvent(txnCtx, ev); + } else { + kvstore->setSystemEvent(txnCtx, ev); + } + } + } + if (!writeEventNow) { + std::move(events.begin(), + events.end(), + std::back_inserter(allEvents)); + } + } + + void applyEvents(TransactionContext& txnCtx, + const CollectionsManifest& cm, + bool writeEventNow = true) { + return applyEvents(txnCtx, flush, cm, writeEventNow); + } + + // This function is to be used in conjunction with applyEvents when + // writeEventNow=false allowing a test to better emulate the flusher and + // write keys in a sorted batch. Tests can applyEvents so that collection + // metadata management does updates, but defer the system event writing + // until ready to commit + void sortAndWriteAllEvents(TransactionContext& txnCtx) { + std::sort(allEvents.begin(), + allEvents.end(), + OrderItemsForDeDuplication{}); + for (auto& ev : allEvents) { if (ev->isDeleted()) { kvstore->delSystemEvent(txnCtx, ev); } else { kvstore->setSystemEvent(txnCtx, ev); } } - } - - void applyEvents(TransactionContext& txnCtx, - const CollectionsManifest& cm) { - applyEvents(txnCtx, flush, cm); + allEvents.clear(); } void checkUid(const Collections::KVStore::Manifest& md, @@ -219,7 +247,8 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { VB::Commit commitData(manifest); auto ctx = kvstore->begin(vbucket->getId(), std::make_unique()); - applyEvents(*ctx, commitData, cm); + applyEvents(*ctx, commitData, cm, false); + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), commitData); auto [status, md] = kvstore->getCollectionsManifest(Vbid(0)); EXPECT_TRUE(status); @@ -235,6 +264,7 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { VBucketPtr vbucket; WriteCallback wc; DeleteCallback dc; + std::vector allEvents; }; class CollectionsKVStoreTest @@ -578,19 +608,21 @@ class CollectionRessurectionKVStoreTest auto ctx = kvstore->begin(vbucket->getId(), std::make_unique()); cm.add(targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); cm.add(target, targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), flush); } // runs a flush batch that will leave the target collection in dropped state void dropScope() { openScopeOpenCollection(); - cm.remove(targetScope); auto ctx = kvstore->begin(vbucket->getId(), std::make_unique()); - applyEvents(*ctx, cm); + cm.remove(targetScope); + applyEvents(*ctx, cm, false); + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), flush); } @@ -704,9 +736,9 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() { std::make_unique()); if (!cm.exists(targetScope)) { cm.add(targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); cm.add(target, targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); } std::string expectedName = target.name; @@ -715,22 +747,23 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() { // iterate cycles of remove/add for (int ii = 0; ii < getCycles(); ii++) { cm.remove(scope); - applyEvents(*ctx, cm); - + applyEvents(*ctx, cm, false); if (resurectWithNewName()) { expectedName = target.name + "_" + std::to_string(ii); scope.name = targetScope.name + "_" + std::to_string(ii); } cm.add(scope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); cm.add({expectedName, target.uid}, scope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); } if (dropCollectionAtEnd()) { cm.remove(scope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); } + + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), flush); // Now validate diff --git a/engines/ep/tests/module_tests/history_scan_test.cc b/engines/ep/tests/module_tests/history_scan_test.cc index 3ad61cb7d4..5fda024f10 100644 --- a/engines/ep/tests/module_tests/history_scan_test.cc +++ b/engines/ep/tests/module_tests/history_scan_test.cc @@ -33,7 +33,11 @@ class HistoryScanTest : public CollectionsDcpParameterizedTest { CollectionsDcpParameterizedTest::SetUp(); // To allow tests to set where history begins, use MockMagmaKVStore replaceMagmaKVStore(); - // @todo: Setup to retain history using setHistoryRetentionBytes + // For all tests, use big history window - all tests here will use a + // combination of magma's history retention + setHistoryStartSeqno to + // configure the test. + store->getRWUnderlying(vbid)->setHistoryRetentionBytes(100 * 1024 * + 1024); } void setHistoryStartSeqno(uint64_t seqno) { @@ -108,9 +112,6 @@ void HistoryScanTest::validateSnapshot( // Validate that 1 disk snapshot is produced and that it is marked as history // and duplicates TEST_P(HistoryScanTest, basic_unique) { - // The entire disk is "history", from seqno 1 - setHistoryStartSeqno(1); - std::vector items; items.emplace_back(store_item( vbid, makeStoredDocKey("a", CollectionID::Default), "val-a")); @@ -142,23 +143,22 @@ TEST_P(HistoryScanTest, basic_unique) { items); } -// Following test cannot be enabled until magma history support exists -TEST_P(HistoryScanTest, DISABLED_basic_duplicates) { +TEST_P(HistoryScanTest, basic_duplicates) { CollectionsManifest cm; setCollections(cookie, cm.add(CollectionEntry::vegetable, {}, true)); std::vector items; // Create a "dummy" Item that marks where the system-event is expected - items.emplace_back(makeStoredDocKey("a", CollectionEntry::vegetable), + items.emplace_back(makeStoredDocKey("ignored", CollectionEntry::vegetable), vbid, queue_op::system_event, 0, 1); items.emplace_back(store_item( - vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v0")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v0")); // temp flush in two batches as dedup is still on in the flusher flush_vbucket_to_disk(vbid, 1 + 1); items.emplace_back(store_item( - vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v1")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v1")); flush_vbucket_to_disk(vbid, 1); ensureDcpWillBackfill(); @@ -195,26 +195,26 @@ TEST_P(HistoryScanTest, TwoSnapshots) { std::vector items1, items2; // items1 represents the first snapshot, only the crate of vegetable will - // exist in this snapshot. The second history snapshot will have the 'a' + // exist in this snapshot. The second history snapshot will have the 'k0' // keys (both versions). items1.emplace_back(makeStoredDocKey("", CollectionEntry::vegetable), vbid, queue_op::system_event, 0, 1); - store_item(vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v0"); + store_item(vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v0"); flush_vbucket_to_disk(vbid, 1 + 1); - store_item(vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v1"); + store_item(vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v1"); flush_vbucket_to_disk(vbid, 1); // Now we must force history to begin from the next flush items2.emplace_back(store_item( - vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v2")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v2")); flush_vbucket_to_disk(vbid, 1); // @todo: switch to a using key 'a' when magma history support is available // then we can verify two versions of 'a' are returned items2.emplace_back(store_item( - vbid, makeStoredDocKey("b", CollectionEntry::vegetable), "v3")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v3")); flush_vbucket_to_disk(vbid, 1); ensureDcpWillBackfill(); @@ -260,8 +260,6 @@ TEST_P(HistoryScanTest, TwoSnapshots) { // Test OSO switches to history TEST_P(HistoryScanTest, OSOThenHistory) { - setHistoryStartSeqno(1); - // This writes to fruit and vegetable auto highSeqno = setupTwoCollections().second; diff --git a/engines/ep/tests/module_tests/magma-kvstore_test.cc b/engines/ep/tests/module_tests/magma-kvstore_test.cc index 6cec59edec..48cb5747ca 100644 --- a/engines/ep/tests/module_tests/magma-kvstore_test.cc +++ b/engines/ep/tests/module_tests/magma-kvstore_test.cc @@ -19,6 +19,7 @@ #include "test_helpers.h" #include "thread_gate.h" #include +#include using namespace std::string_literals; using namespace testing; @@ -93,7 +94,8 @@ TEST_F(MagmaKVStoreRollbackTest, Rollback) { auto ctx = kvstore->begin(vbid, std::make_unique()); for (int j = 0; j < 5; j++) { - auto key = makeStoredDocKey("key" + std::to_string(seqno)); + // pad the key so key09 < key10 + auto key = makeStoredDocKey(fmt::format("key_{:02}", seqno)); auto qi = makeCommittedItem(key, "value"); flush.proposedVBState.lastSnapStart = seqno; flush.proposedVBState.lastSnapEnd = seqno; @@ -103,22 +105,22 @@ TEST_F(MagmaKVStoreRollbackTest, Rollback) { kvstore->commit(std::move(ctx), flush); } - auto rv = kvstore->get(makeDiskDocKey("key5"), Vbid(0)); + auto rv = kvstore->get(makeDiskDocKey("key_05"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); - rv = kvstore->get(makeDiskDocKey("key6"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_06"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); auto rollbackResult = kvstore->rollback(Vbid(0), 5, std::make_unique()); ASSERT_TRUE(rollbackResult.success); - rv = kvstore->get(makeDiskDocKey("key1"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_01"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); - rv = kvstore->get(makeDiskDocKey("key5"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_05"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); - rv = kvstore->get(makeDiskDocKey("key6"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_06"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::no_such_key); - rv = kvstore->get(makeDiskDocKey("key10"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_10"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::no_such_key); auto vbs = kvstore->getCachedVBucketState(Vbid(0)); @@ -138,7 +140,8 @@ TEST_F(MagmaKVStoreRollbackTest, RollbackNoValidCheckpoint) { auto ctx = kvstore->begin(vbid, std::make_unique()); for (int j = 0; j < 5; j++) { - auto key = makeStoredDocKey("key" + std::to_string(seqno)); + // pad the key so key09 < key10 + auto key = makeStoredDocKey(fmt::format("key_{:02}", seqno)); auto qi = makeCommittedItem(key, "value"); qi->setBySeqno(seqno++); kvstore->set(*ctx, qi); @@ -618,17 +621,6 @@ class MockDirectory : public magma::Directory { TEST_F(MagmaKVStoreTest, readOnlyMode) { initialize_kv_store(kvstore.get(), vbid); - auto doWrite = [this](uint64_t seqno, bool expected) { - auto ctx = - kvstore->begin(vbid, std::make_unique()); - auto qi = makeCommittedItem(makeStoredDocKey("key"), "value"); - qi->setBySeqno(seqno); - flush.proposedVBState.lastSnapStart = seqno; - flush.proposedVBState.lastSnapEnd = seqno; - kvstore->set(*ctx, qi); - EXPECT_EQ(expected, kvstore->commit(std::move(ctx), flush)); - }; - // Add an item to test that we can read it doWrite(1, true /*success*/); @@ -714,10 +706,11 @@ TEST_F(MagmaKVStoreTest, makeFileHandleSyncFailed) { EXPECT_FALSE(fileHandle); } -// @todo: This is a basic test that will be expanded to cover scanning history -// at the moment this test is equivalent to "scan" -TEST_F(MagmaKVStoreTest, scanAllVersions) { +// Test scanAllVersions returns the expected number of keys and the expected +// history start seqno. +TEST_F(MagmaKVStoreTest, scanAllVersions1) { initialize_kv_store(kvstore.get(), vbid); + kvstore->setHistoryRetentionBytes(1024 * 1024); std::vector expectedItems; expectedItems.push_back(doWrite(1, true, "k1")); expectedItems.push_back(doWrite(2, true, "k2")); @@ -735,11 +728,82 @@ TEST_F(MagmaKVStoreTest, scanAllVersions) { ValueFilter::VALUES_COMPRESSED, SnapshotSource::Head); ASSERT_TRUE(bySeq); - // @todo: This must be the expected seqno where history begins - EXPECT_EQ(0, bySeq->historyStartSeqno); + EXPECT_EQ(1, bySeq->historyStartSeqno); + EXPECT_EQ(scan_success, kvstore->scanAllVersions(*bySeq)); + + auto& cb = + static_cast&>(bySeq->getValueCallback()); + EXPECT_EQ(2, cb.getProcessedCount()); +} + +// Test scanAllVersions returns the expected number of keys and the expected +// history start seqno. This test uses the same key for all mutations. +TEST_F(MagmaKVStoreTest, scanAllVersions2) { + initialize_kv_store(kvstore.get(), vbid); + kvstore->setHistoryRetentionBytes(1024 * 1024); + std::vector expectedItems; + // doWrite writes the same key + expectedItems.push_back(doWrite(1, true)); + expectedItems.push_back(doWrite(2, true)); + auto validate = [&expectedItems](GetValue gv) { + ASSERT_TRUE(gv.item); + ASSERT_GE(expectedItems.size(), size_t(gv.item->getBySeqno())); + EXPECT_EQ(*expectedItems[gv.item->getBySeqno() - 1], *gv.item); + }; + auto bySeq = kvstore->initBySeqnoScanContext( + std::make_unique>(validate), + std::make_unique>(), + vbid, + 1, + DocumentFilter::ALL_ITEMS, + ValueFilter::VALUES_COMPRESSED, + SnapshotSource::Head); + ASSERT_TRUE(bySeq); + EXPECT_EQ(1, bySeq->historyStartSeqno); EXPECT_EQ(scan_success, kvstore->scanAllVersions(*bySeq)); auto& cb = static_cast&>(bySeq->getValueCallback()); EXPECT_EQ(2, cb.getProcessedCount()); +} + +// ScanContext now exposes historyStartSeqno which is tracked by magma provided +// it is retaining history. +TEST_F(MagmaKVStoreTest, historyStartSeqno) { + initialize_kv_store(kvstore.get(), vbid); + + auto validate = [this](uint64_t expectedSeqno) { + auto bySeq = kvstore->initBySeqnoScanContext( + std::make_unique(true /*expectcompressed*/), + std::make_unique(1, 5, Vbid(0)), + vbid, + 1, + DocumentFilter::ALL_ITEMS, + ValueFilter::VALUES_COMPRESSED, + SnapshotSource::Head); + auto byId = kvstore->initByIdScanContext( + std::make_unique(true /*expectcompressed*/), + std::make_unique(1, 5, Vbid(0)), + vbid, + {}, + DocumentFilter::ALL_ITEMS, + ValueFilter::VALUES_COMPRESSED); + ASSERT_TRUE(bySeq); + ASSERT_TRUE(byId); + + EXPECT_EQ(expectedSeqno, bySeq->historyStartSeqno); + EXPECT_EQ(bySeq->historyStartSeqno, byId->historyStartSeqno); + }; + + kvstore->setHistoryRetentionBytes(0); + validate(0); + doWrite(2, true); // write seqno 2 + validate(0); + kvstore->setHistoryRetentionBytes(1024 * 1024); + doWrite(3, true); // write seqno 3 + validate(3); + doWrite(4, true); // write seqno 4 + validate(3); // history still starts at 3 + kvstore->setHistoryRetentionBytes(0); + validate(0); // back to no history } \ No newline at end of file