From e6645746501114b5952187379f3871061937f134 Mon Sep 17 00:00:00 2001 From: Jim Walker Date: Thu, 5 Jan 2023 16:07:36 +0000 Subject: [PATCH] MB-54729: Enable history scan for CDC backfill Replace the todo markers with code that now utilises the magma history API - this now means scanAllVersions for example is hooked into the magma history scanning API. Add new tests that validate multiple versions can be stored and returned. Also required are changes to unit tests to respect new expectation checks that occur in magma - primarily that flushing writes ordered batches - this is only a problem for tests which bypass the flusher and call KVStore directly. **** ISSUES **** ep-engine_ep_unit_tests does not pass: 1) Exception from magma MagmaKVStoreRollbackTest.Rollback hits the following exception GSL: Precondition failure: 'levelSize >= compactionState[level].history.Size' at /Users/jimwalker/Code/couchbase/neo/magma/lsm/lsm_tree.cc:895 2) Seg-fault in magma Seen in a number of tests, 1 example: CollectionsDcpEphemeralOrPersistent/CollectionsDcpParameterizedTest.DefaultCollectionDropped/persistent_magma_value_only Process 78731 stopped * thread #1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT) frame #0: 0x00000001012eb7b0 ep-engine_ep_unit_tests`magma::DocSequenceBuffer::GetKey(this=0x0000000118131700) at lsd.cc:75:36 [opt] 72 } 73 74 Slice DocSequenceBuffer::GetKey() { -> 75 seqFmt.Set(sortedList[offset]->seqno); 76 return seqFmt.Encode(); 77 } 78 * thread #1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=EXC_I386_GPFLT) * frame #0: 0x00000001012eb7b0 ep-engine_ep_unit_tests`magma::DocSequenceBuffer::GetKey(this=0x0000000118131700) at lsd.cc:75:36 [opt] frame #1: 0x0000000101361e2e ep-engine_ep_unit_tests`magma::mvccIteratorAdaptor::GetKey(this=0x0000000118536c00) at mvcc.h:249:25 [opt] frame #2: 0x000000010132b688 ep-engine_ep_unit_tests`magma::IteratorWithFilter::filterKeys(this=0x0000000118128350) at iterator.cc:214:32 [opt] frame #3: 0x000000010132de5b ep-engine_ep_unit_tests`magma::KVReader::ReadKVs(this=0x00007ff7bfefd550) at common.cc:70:19 [opt] frame #4: 0x0000000101378f63 ep-engine_ep_unit_tests`magma::LSMTree::writeSSTable(this=0x000000011855a820, w=0x00007ff7bfefd890, itr=0x0000000118128350, maxSn=10, stopFn=function @ 0x00007ff7bfefd860)>) at lsm_tree.cc:719:15 [opt] frame #5: 0x0000000101376ee8 ep-engine_ep_unit_tests`magma::LSMTree::writeSSTable(this=0x000000011855a820, appendMode=, itr=0x0000000118128350, sizeEstimate=, maxSn=10, stopFn=function @ 0x00007ff7bfefdb60)>) at lsm_tree.cc:682:17 [opt] frame #6: 0x00000001013761b2 ep-engine_ep_unit_tests`magma::LSMTree::writeMemtable(this=0x000000011855a820, memtable=0x000000011854c7a0) at lsm_tree.cc:449:21 [opt] frame #7: 0x000000010137753f ep-engine_ep_unit_tests`magma::LSMTree::doMemtableFlushWork(this=0x000000011855a820) at lsm_tree.cc:531:18 [opt] frame #8: 0x000000010139fe62 ep-engine_ep_unit_tests`std::__1::__function::__func, std::__1::tuple ()>::operator()() [inlined] magma::LSMTree::newFlush(this=)::$_16::operator()() const at lsm_tree.cc:993:34 [opt] frame #9: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func, std::__1::tuple ()>::operator()() [inlined] decltype(__f=)::$_16&>(fp)()) std::__1::__invoke(magma::LSMTree::newFlush()::$_16&) at type_traits:3918:1 [opt] frame #10: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func, std::__1::tuple ()>::operator()() [inlined] std::__1::tuple std::__1::__invoke_void_return_wrapper, false>::__call)::$_16&>(magma::LSMTree::newFlush()::$_16&) at invoke.h:30:16 [opt] frame #11: 0x000000010139fe5d ep-engine_ep_unit_tests`std::__1::__function::__func, std::__1::tuple ()>::operator()() [inlined] std::__1::__function::__alloc_func, std::__1::tuple ()>::operator(this=)() at function.h:178:16 [opt] frame #12: 0x000000010139fe59 ep-engine_ep_unit_tests`std::__1::__function::__func, std::__1::tuple ()>::operator(this=)() at function.h:352:12 [opt] frame #13: 0x00000001012f72af ep-engine_ep_unit_tests`magma::FlushWork::Execute() [inlined] std::__1::__function::__value_func ()>::operator(this=)() const at function.h:505:16 [opt] frame #14: 0x00000001012f7296 ep-engine_ep_unit_tests`magma::FlushWork::Execute() [inlined] std::__1::function ()>::operator(this=0x0000000118131560)() const at function.h:1182:12 [opt] frame #15: 0x00000001012f7292 ep-engine_ep_unit_tests`magma::FlushWork::Execute(this=0x0000000118131560) at flush_work.cc:61:29 [opt] frame #16: 0x0000000101389d5e ep-engine_ep_unit_tests`magma::KVStore::flushMemTables(this=0x00007ff7bfefe1c0)::$_38::operator()() at kvstore.cc:515:27 [opt] frame #17: 0x0000000101388fac ep-engine_ep_unit_tests`magma::KVStore::flushMemTables(this=0x000000010442a420, wal=, offset=(SegID = 1, SegOffset = 4096), flushMode=, blockMode=Blocking) at kvstore.cc:582:16 [opt] frame #18: 0x0000000101389a5a ep-engine_ep_unit_tests`magma::KVStore::FlushMemTables(this=, wal=, flushMode=, blockMode=) at kvstore.cc:387:12 [opt] frame #19: 0x00000001012fd9ba ep-engine_ep_unit_tests`magma::Magma::Impl::syncKVStore(this=0x000000011814f000, kvID=, checkpoint=true) at db.cc:1352:21 [opt] frame #20: 0x000000010132678a ep-engine_ep_unit_tests`std::__1::__function::__func > (unsigned short)>)::$_8, std::__1::allocator > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] magma::Magma::Impl::CompactKVStore(this=0x00007ff7bfefe400)>)::$_7::operator()() const at db.cc:880:23 [opt] frame #21: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func > (unsigned short)>)::$_8, std::__1::allocator > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] magma::Magma::Impl::CompactKVStore(this=)>)::$_8::operator()() const at db.cc:891:21 [opt] frame #22: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func > (unsigned short)>)::$_8, std::__1::allocator > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] decltype(__f=)>)::$_8&>(fp)()) std::__1::__invoke > (unsigned short)>)::$_8&>(magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function > (unsigned short)>)::$_8&) at type_traits:3918:1 [opt] frame #23: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func > (unsigned short)>)::$_8, std::__1::allocator > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] void std::__1::__invoke_void_return_wrapper::__call)>)::$_8&>(magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function > (unsigned short)>)::$_8&) at invoke.h:61:9 [opt] frame #24: 0x0000000101326772 ep-engine_ep_unit_tests`std::__1::__function::__func > (unsigned short)>)::$_8, std::__1::allocator > (unsigned short)>)::$_8>, void ()>::operator()() [inlined] std::__1::__function::__alloc_func > (unsigned short)>)::$_8, std::__1::allocator > (unsigned short)>)::$_8>, void ()>::operator(this=)() at function.h:178:16 [opt] frame #25: 0x0000000101326764 ep-engine_ep_unit_tests`std::__1::__function::__func > (unsigned short)>)::$_8, std::__1::allocator > (unsigned short)>)::$_8>, void ()>::operator(this=)() at function.h:352:12 [opt] frame #26: 0x0000000101303138 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function > (unsigned short)>) [inlined] std::__1::__function::__value_func::operator(this=)() const at function.h:505:16 [opt] frame #27: 0x000000010130312d ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function > (unsigned short)>) [inlined] std::__1::function::operator(this=0x00007ff7bfefe4b0)() const at function.h:1182:12 [opt] frame #28: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function > (unsigned short)>) [inlined] magma::defer::~defer(this=0x00007ff7bfefe4b0) at common.h:92:9 [opt] frame #29: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(unsigned short, magma::Slice const&, magma::Slice const&, std::__1::function > (unsigned short)>) [inlined] magma::defer::~defer(this=0x00007ff7bfefe4b0) at common.h:91:14 [opt] frame #30: 0x0000000101303129 ep-engine_ep_unit_tests`magma::Magma::Impl::CompactKVStore(this=, kvID=, lowKey=0x00007ff7bfefe780, highKey=0x00007ff7bfefe780, makeCallback=magma::Magma::CompactionCallbackBuilder @ 0x00007ff7bfefe550)>) at db.cc:895:1 [opt] frame #31: 0x000000010130336c ep-engine_ep_unit_tests`magma::Magma::CompactKVStore(this=, kvID=0, lowKey=0x00007ff7bfefe780, highKey=, makeCallback=)>) at db.cc:901:18 [opt] frame #32: 0x000000010004fd3d ep-engine_ep_unit_tests`MagmaMemoryTrackingProxy::CompactKVStore(this=, kvID=0, lowKey=0x00007ff7bfefe780, highKey=0x00007ff7bfefe780, makeCallback=magma::Magma::CompactionCallbackBuilder @ 0x00007ff7bfefea00)>) at magma-memory-tracking-proxy.cc:190:19 [opt] frame #33: 0x00000001000a9eeb ep-engine_ep_unit_tests`MagmaKVStore::compactDBInternal(this=, vbLock=0x00007ff7bfefeda0, ctx=std::__1::shared_ptr::element_type @ 0x00000001184acc20 strong=3 weak=1) at magma-kvstore.cc:2590:29 [opt] frame #34: 0x00000001000a93ad ep-engine_ep_unit_tests`MagmaKVStore::compactDB(this=0x00000001067e6500, vbLock=0x00007ff7bfefeda0, ctx=nullptr) at magma-kvstore.cc:2445:12 [opt] frame #35: 0x00000001001d7eb0 ep-engine_ep_unit_tests`EPBucket::compactInternal(this=0x00000001067e6000, vb=0x00007ff7bfefed90, config=) at ep_bucket.cc:1398:25 [opt] frame #36: 0x00000001001d83f6 ep-engine_ep_unit_tests`EPBucket::doCompact(this=0x00000001067e6000, vbid=(vbid = 0), config=0x00007ff7bfefedf0, cookies=size=0) at ep_bucket.cc:1476:14 [opt] 3) Key sorting issue Magma now checks for sorted keys - it turns out KV flushing is violating that ordering. Need to know if KV should fix or is the magma check required?? Example: CollectionsDcpEphemeralOrPersistent/CollectionsLegacyDcpTest.default_collection_is_not_vbucket_highseqno_with_pending/persistent_nexus_couchstore_magma_value_only CRITICAL [(SynchronousEPEngine:default) magma_0]Fatal error: Found: preceding key(d2) > current key( _collection). If history is enabled, all keys in the batch must be sorted lexicographicall The problem is that the test flushes a prepare(default collection, key=d2) and create-collection(fruit) together. The flusher orders these... \0d2 \1create_fruit This is correct. But \0d2 is marked as a prepare, when flushed to disk it goes into a special namespace. This occurs in KVStore after the sorting. \0d2 becomes \2\0d2 And magma actually sees \2\0d2 \1create_fruit and we have violated the expects Change-Id: Ica9ea1b52c51f125c9e8839a0fca412834fc25f7 --- engines/ep/src/kv_bucket.cc | 7 + .../kvstore/couch-kvstore/couch-kvstore.cc | 7 + .../src/kvstore/couch-kvstore/couch-kvstore.h | 2 + engines/ep/src/kvstore/kvstore_iface.h | 5 + .../kvstore/magma-kvstore/magma-kvstore.cc | 26 ++-- .../src/kvstore/magma-kvstore/magma-kvstore.h | 5 + .../kvstore/nexus-kvstore/nexus-kvstore.cc | 7 + .../src/kvstore/nexus-kvstore/nexus-kvstore.h | 1 + .../rocksdb-kvstore/rocksdb-kvstore.cc | 7 + .../kvstore/rocksdb-kvstore/rocksdb-kvstore.h | 2 + engines/ep/tests/mock/mock_kvstore.h | 1 + .../collections/collections_dcp_test.h | 18 ++- .../collections/collections_kvstore_test.cc | 69 +++++++--- .../collections/collections_oso_dcp_test.cc | 13 +- .../tests/module_tests/history_scan_test.cc | 121 ++++++++++++++---- .../tests/module_tests/magma-kvstore_test.cc | 112 ++++++++++++---- 16 files changed, 316 insertions(+), 87 deletions(-) diff --git a/engines/ep/src/kv_bucket.cc b/engines/ep/src/kv_bucket.cc index 18b1653462..7ef1b68f20 100644 --- a/engines/ep/src/kv_bucket.cc +++ b/engines/ep/src/kv_bucket.cc @@ -3098,6 +3098,13 @@ std::chrono::seconds KVBucket::getHistoryRetentionSeconds() const { void KVBucket::setHistoryRetentionBytes(size_t bytes) { historyRetentionBytes = bytes; + for (auto& i : vbMap.shards) { + KVShard* shard = i.get(); + // The KVStore needs to know the per vbucket size + shard->getRWUnderlying()->setHistoryRetentionBytes(bytes / + vbMap.getSize()); + } + } size_t KVBucket::getHistoryRetentionBytes() const { diff --git a/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc b/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc index d2c1e6161b..13b07567f3 100644 --- a/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc +++ b/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.cc @@ -4542,3 +4542,10 @@ std::unique_ptr CouchKVStore::begin( return std::make_unique( *this, vbid, std::move(pcb)); } + +void CouchKVStore::setHistoryRetentionBytes(size_t size) { + // no-op. + // Note: StorageProperties reports that history scan is not supported, so + // we accept this attempt to set size, but will fail if a scanAllVersions + // is attempted. +} \ No newline at end of file diff --git a/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.h b/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.h index b6291dc9d7..b5a60c20bc 100644 --- a/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.h +++ b/engines/ep/src/kvstore/couch-kvstore/couch-kvstore.h @@ -410,6 +410,8 @@ class CouchKVStore : public KVStore std::unique_ptr begin( Vbid vbid, std::unique_ptr pcb) override; + void setHistoryRetentionBytes(size_t size) override; + protected: /** * RAII holder for a couchstore LocalDoc object diff --git a/engines/ep/src/kvstore/kvstore_iface.h b/engines/ep/src/kvstore/kvstore_iface.h index dedb8832ba..0f75ed6295 100644 --- a/engines/ep/src/kvstore/kvstore_iface.h +++ b/engines/ep/src/kvstore/kvstore_iface.h @@ -763,6 +763,11 @@ class KVStoreIface { * @param vbid ID of the vbucket being created */ virtual void prepareToCreateImpl(Vbid vbid) = 0; + + /** + * Method to configure the amount of history a vbucket should retain. + */ + virtual void setHistoryRetentionBytes(size_t size) = 0; }; std::string to_string(KVStoreIface::ReadVBStateStatus status); diff --git a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc index e6bab24457..67a4546439 100644 --- a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc +++ b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.cc @@ -869,7 +869,7 @@ StorageProperties MagmaKVStore::getStorageProperties() const { StorageProperties::AutomaticDeduplication::No, StorageProperties::PrepareCounting::No, StorageProperties::CompactionStaleItemCallbacks::Yes, - StorageProperties::HistoryRetentionAvailable::No); + StorageProperties::HistoryRetentionAvailable::Yes); return rv; } @@ -1695,8 +1695,7 @@ std::unique_ptr MagmaKVStore::initBySeqnoScanContext( getDroppedStatus.String()); } - // @todo:assign this using magma->GetOldestHistorySeqno(snapshot); - auto historyStartSeqno = 0; + auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot); if (logger->should_log(spdlog::level::info)) { logger->info( "MagmaKVStore::initBySeqnoScanContext {} seqno:{} endSeqno:{}" @@ -1809,8 +1808,7 @@ std::unique_ptr MagmaKVStore::initByIdScanContext( return nullptr; } - // @todo:assign this using magma->GetOldestHistorySeqno(snapshot); - auto historyStartSeqno = 0; + auto historyStartSeqno = magma->GetOldestHistorySeqno(snapshot); logger->info( "MagmaKVStore::initByIdScanContext {} historyStartSeqno:{} " "KeyIterator:{}", @@ -1830,13 +1828,16 @@ std::unique_ptr MagmaKVStore::initByIdScanContext( historyStartSeqno); } +scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const { + return scan(ctx, magma::Magma::SeqIterator::Mode::Snapshot); +} + scan_error_t MagmaKVStore::scanAllVersions(BySeqnoScanContext& ctx) const { - // @todo use magma's mode - // return scan(ctx, magma::Magma::SeqIterator::Mode::History); - return scan(ctx); + return scan(ctx, magma::Magma::SeqIterator::Mode::History); } -scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const { +scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx, + magma::Magma::SeqIterator::Mode mode) const { if (ctx.lastReadSeqno == ctx.maxSeqno) { logger->TRACE("MagmaKVStore::scan {} lastReadSeqno:{} == maxSeqno:{}", ctx.vbid, @@ -1849,7 +1850,8 @@ scan_error_t MagmaKVStore::scan(BySeqnoScanContext& ctx) const { startSeqno = ctx.lastReadSeqno + 1; } auto& mctx = dynamic_cast(ctx); - for (mctx.itr->Seek(startSeqno, ctx.maxSeqno); mctx.itr->Valid(); + for (mctx.itr->Initialize(startSeqno, ctx.maxSeqno, mode); + mctx.itr->Valid(); mctx.itr->Next()) { Slice keySlice, metaSlice, valSlice; uint64_t seqno; @@ -3722,3 +3724,7 @@ std::pair MagmaKVStore::getOldestRollbackableHighSeqno( return {status, seqno}; } + +void MagmaKVStore::setHistoryRetentionBytes(size_t size) { + magma->SetHistoryRetentionSize(size); +} diff --git a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h index b46bdbc676..8af00cb5d9 100644 --- a/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h +++ b/engines/ep/src/kvstore/magma-kvstore/magma-kvstore.h @@ -568,6 +568,8 @@ class MagmaKVStore : public KVStore { std::unique_ptr begin( Vbid vbid, std::unique_ptr pcb) override; + void setHistoryRetentionBytes(size_t size) override; + // Magma uses a unique logger with a prefix of magma so that all logging // calls from the wrapper thru magma will be prefixed with magma. std::shared_ptr logger; @@ -782,6 +784,9 @@ class MagmaKVStore : public KVStore { folly::assume_unreachable(); } + scan_error_t scan(BySeqnoScanContext& ctx, + magma::Magma::SeqIterator::Mode mode) const; + MagmaKVStoreConfig& configuration; /** diff --git a/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc b/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc index ce23c96aa0..ac0cb48fb0 100644 --- a/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc +++ b/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.cc @@ -3299,3 +3299,10 @@ Vbid::id_type NexusKVStore::getCacheSlot(Vbid vbid) const { uint64_t NexusKVStore::getPurgeSeqno(Vbid vbid) const { return purgeSeqno.at(getCacheSlot(vbid)); } + +void NexusKVStore::setHistoryRetentionBytes(size_t size) { + // no-op. + // Note: StorageProperties reports that history scan is not supported, so + // we accept this attempt to set size, but will fail if a scanAllVersions + // is attempted. +} diff --git a/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h b/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h index f7d86eccab..adffb91ce2 100644 --- a/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h +++ b/engines/ep/src/kvstore/nexus-kvstore/nexus-kvstore.h @@ -143,6 +143,7 @@ class NexusKVStore : public KVStoreIface { void delSystemEvent(TransactionContext& txnCtx, const queued_item item) override; void endTransaction(Vbid vbid) override; + void setHistoryRetentionBytes(size_t size) override; /** * Unit test only hook called before we compact the first KVStore. Public as diff --git a/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.cc b/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.cc index e45b88ce8f..689edfe061 100644 --- a/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.cc +++ b/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.cc @@ -1973,3 +1973,10 @@ RocksDBKVStoreTransactionContext::RocksDBKVStoreTransactionContext( : TransactionContext(kvstore, vbid, std::move(cb)), pendingReqs(std::make_unique()) { } + +void RocksDBKVStore::setHistoryRetentionBytes(size_t size) { + // no-op. + // Note: StorageProperties reports that history scan is not supported, so + // we accept this attempt to set size, but will fail if a scanAllVersions + // is attempted. +} diff --git a/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.h b/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.h index cbeb71a388..0b28dadeb6 100644 --- a/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.h +++ b/engines/ep/src/kvstore/rocksdb-kvstore/rocksdb-kvstore.h @@ -333,6 +333,8 @@ class RocksDBKVStore : public KVStore { std::unique_ptr begin( Vbid vbid, std::unique_ptr pcb) override; + void setHistoryRetentionBytes(size_t size) override; + protected: // Write a batch of updates to the given database; measuring the time // taken and adding the timer to the commit histogram. diff --git a/engines/ep/tests/mock/mock_kvstore.h b/engines/ep/tests/mock/mock_kvstore.h index 406d4da360..7d31011fe8 100644 --- a/engines/ep/tests/mock/mock_kvstore.h +++ b/engines/ep/tests/mock/mock_kvstore.h @@ -225,6 +225,7 @@ class MockKVStore : public KVStore { prepareToRollback, (Vbid vbid), (override)); + MOCK_METHOD(void, setHistoryRetentionBytes, (size_t size), (override)); /** * Helper function to replace the existing read-write KVStore in the given diff --git a/engines/ep/tests/module_tests/collections/collections_dcp_test.h b/engines/ep/tests/module_tests/collections/collections_dcp_test.h index f5ac61b4c7..e1c52f7c54 100644 --- a/engines/ep/tests/module_tests/collections/collections_dcp_test.h +++ b/engines/ep/tests/module_tests/collections/collections_dcp_test.h @@ -103,17 +103,27 @@ class CollectionsDcpTest : virtual public SingleThreadedKVBucketTest { const CollectionEntry::Entry& entry, uint64_t seqno); + /** + * This function (created for OSO tests) creates two collections (fruit + * and vegetable) and calls writeTwoCollections + * + * @param endOnVegetable true and the last item written will be for the + * vegetable collection + * @return current manifest and vbucket (::vbid) high-seqno + */ + std::pair setupTwoCollections( + bool endOnVegetable = false); + /** * This function (created for OSO tests) writes to two collections (fruit * and vegetable). The keys are "a", "b", "c" and "d" to demonstrate the * lexicographical ordering of an OSO snapshot. * * @param endOnVegetable true and the last item written will be for the - * vegetable collection - * @return manifest and high-seqno + * vegetable collection + * @return vbucket (::vbid) high-seqno */ - std::pair setupTwoCollections( - bool endOnVegetable = false); + uint64_t writeTwoCollectios(bool endOnTarget); static cb::engine_errc dcpAddFailoverLog( const std::vector&); diff --git a/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc b/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc index 0441f7a80c..6756db6e07 100644 --- a/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc +++ b/engines/ep/tests/module_tests/collections/collections_kvstore_test.cc @@ -92,7 +92,8 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { void applyEvents(TransactionContext& txnCtx, VB::Commit& commitData, - const CollectionsManifest& cm) { + const CollectionsManifest& cm, + bool writeEventNow = true) { manifest.update(*vbucket, makeManifest(cm)); std::vector events; @@ -101,17 +102,44 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { for (auto& ev : events) { commitData.collections.recordSystemEvent(*ev); + if (writeEventNow) { + if (ev->isDeleted()) { + kvstore->delSystemEvent(txnCtx, ev); + } else { + kvstore->setSystemEvent(txnCtx, ev); + } + } + } + if (!writeEventNow) { + std::move(events.begin(), + events.end(), + std::back_inserter(allEvents)); + } + } + + void applyEvents(TransactionContext& txnCtx, + const CollectionsManifest& cm, + bool writeEventNow = true) { + return applyEvents(txnCtx, flush, cm, writeEventNow); + } + + // This function is to be used in conjunction with applyEvents when + // writeEventNow=false allowing a test to better emulate the flusher and + // write keys in a sorted batch. Tests can applyEvents so that collection + // metadata management does updates, but defer the system event writing + // until ready to commit + void sortAndWriteAllEvents(TransactionContext& txnCtx) { + std::sort(allEvents.begin(), + allEvents.end(), + OrderItemsForDeDuplication{}); + for (auto& ev : allEvents) { if (ev->isDeleted()) { kvstore->delSystemEvent(txnCtx, ev); } else { kvstore->setSystemEvent(txnCtx, ev); } } - } - - void applyEvents(TransactionContext& txnCtx, - const CollectionsManifest& cm) { - applyEvents(txnCtx, flush, cm); + allEvents.clear(); } void checkUid(const Collections::KVStore::Manifest& md, @@ -224,7 +252,8 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { VB::Commit commitData(manifest); auto ctx = kvstore->begin(vbucket->getId(), std::make_unique()); - applyEvents(*ctx, commitData, cm); + applyEvents(*ctx, commitData, cm, false); + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), commitData); auto [status, md] = kvstore->getCollectionsManifest(Vbid(0)); EXPECT_TRUE(status); @@ -240,6 +269,7 @@ class CollectionsKVStoreTestBase : public KVStoreBackend, public KVStoreTest { VBucketPtr vbucket; WriteCallback wc; DeleteCallback dc; + std::vector allEvents; }; class CollectionsKVStoreTest @@ -583,19 +613,21 @@ class CollectionRessurectionKVStoreTest auto ctx = kvstore->begin(vbucket->getId(), std::make_unique()); cm.add(targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); cm.add(target, targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), flush); } // runs a flush batch that will leave the target collection in dropped state void dropScope() { openScopeOpenCollection(); - cm.remove(targetScope); auto ctx = kvstore->begin(vbucket->getId(), std::make_unique()); - applyEvents(*ctx, cm); + cm.remove(targetScope); + applyEvents(*ctx, cm, false); + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), flush); } @@ -709,9 +741,9 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() { std::make_unique()); if (!cm.exists(targetScope)) { cm.add(targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); cm.add(target, targetScope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); } std::string expectedName = target.name; @@ -720,22 +752,23 @@ void CollectionRessurectionKVStoreTest::resurectionScopesTest() { // iterate cycles of remove/add for (int ii = 0; ii < getCycles(); ii++) { cm.remove(scope); - applyEvents(*ctx, cm); - + applyEvents(*ctx, cm, false); if (resurectWithNewName()) { expectedName = target.name + "_" + std::to_string(ii); scope.name = targetScope.name + "_" + std::to_string(ii); } cm.add(scope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); cm.add({expectedName, target.uid}, scope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); } if (dropCollectionAtEnd()) { cm.remove(scope); - applyEvents(*ctx, cm); + applyEvents(*ctx, cm, false); } + + sortAndWriteAllEvents(*ctx); kvstore->commit(std::move(ctx), flush); // Now validate diff --git a/engines/ep/tests/module_tests/collections/collections_oso_dcp_test.cc b/engines/ep/tests/module_tests/collections/collections_oso_dcp_test.cc index ebc0756c62..4f0c6cd3b6 100644 --- a/engines/ep/tests/module_tests/collections/collections_oso_dcp_test.cc +++ b/engines/ep/tests/module_tests/collections/collections_oso_dcp_test.cc @@ -45,11 +45,15 @@ class CollectionsOSODcpTest : public CollectionsDcpParameterizedTest { std::pair CollectionsDcpTest::setupTwoCollections(bool endOnTarget) { - VBucketPtr vb = store->getVBucket(vbid); CollectionsManifest cm(CollectionEntry::fruit); setCollections(cookie, cm.add(CollectionEntry::vegetable)); + flush_vbucket_to_disk(vbid, 2); + return {cm, writeTwoCollectios(endOnTarget)}; +} - // Interleave the writes to two collections and then OSO backfill one +uint64_t CollectionsDcpTest::writeTwoCollectios(bool endOnTarget) { + // Interleave the writes to two collections, this is linked to expectations + // in CollectionsOSODcpTest test harness store_item(vbid, makeStoredDocKey("b", CollectionEntry::fruit), "q"); store_item(vbid, makeStoredDocKey("b", CollectionEntry::vegetable), "q"); store_item(vbid, makeStoredDocKey("d", CollectionEntry::fruit), "a"); @@ -66,8 +70,8 @@ CollectionsDcpTest::setupTwoCollections(bool endOnTarget) { vbid, makeStoredDocKey("c", CollectionEntry::vegetable), "q"); store_item(vbid, makeStoredDocKey("c", CollectionEntry::fruit), "y"); } - flush_vbucket_to_disk(vbid, 10); // 8 keys + 2 events - return {cm, vb->getHighSeqno()}; + flush_vbucket_to_disk(vbid, 8); + return store->getVBucket(vbid)->getHighSeqno(); } // Run through how we expect OSO to work, this is a minimal test which will @@ -674,7 +678,6 @@ TEST_P(CollectionsOSODcpTest, MB_43700) { // snapshots class CollectionsOSOEphemeralTest : public CollectionsDcpParameterizedTest { public: - std::pair setupTwoCollections(); }; // Run through how we expect OSO to work, this is a minimal test which will diff --git a/engines/ep/tests/module_tests/history_scan_test.cc b/engines/ep/tests/module_tests/history_scan_test.cc index cbc345fa2d..1ae26a5487 100644 --- a/engines/ep/tests/module_tests/history_scan_test.cc +++ b/engines/ep/tests/module_tests/history_scan_test.cc @@ -12,11 +12,13 @@ #include "../mock/mock_synchronous_ep_engine.h" #include "checkpoint_manager.h" #include "collections/collections_dcp_test.h" +#include "dcp/backfill_by_seqno_disk.h" #include "kv_bucket.h" #include "tests/mock/mock_dcp.h" #include "tests/mock/mock_dcp_consumer.h" #include "tests/mock/mock_dcp_producer.h" #include "tests/mock/mock_magma_kvstore.h" +#include "tests/mock/mock_stream.h" #include "tests/module_tests/test_helpers.h" #include "vbucket.h" @@ -34,7 +36,11 @@ class HistoryScanTest : public CollectionsDcpParameterizedTest { CollectionsDcpParameterizedTest::SetUp(); // To allow tests to set where history begins, use MockMagmaKVStore replaceMagmaKVStore(); - // @todo: Setup to retain history using setHistoryRetentionBytes + // For all tests, use big history window - all tests here will use a + // combination of magma's history retention + setHistoryStartSeqno to + // configure the test. + store->getRWUnderlying(vbid)->setHistoryRetentionBytes(100 * 1024 * + 1024); } void setHistoryStartSeqno(uint64_t seqno) { @@ -109,9 +115,6 @@ void HistoryScanTest::validateSnapshot( // Validate that 1 disk snapshot is produced and that it is marked as history // and duplicates TEST_P(HistoryScanTest, basic_unique) { - // The entire disk is "history", from seqno 1 - setHistoryStartSeqno(1); - std::vector items; items.emplace_back(store_item( vbid, makeStoredDocKey("a", CollectionID::Default), "val-a")); @@ -143,23 +146,22 @@ TEST_P(HistoryScanTest, basic_unique) { items); } -// Following test cannot be enabled until magma history support exists -TEST_P(HistoryScanTest, DISABLED_basic_duplicates) { +TEST_P(HistoryScanTest, basic_duplicates) { CollectionsManifest cm; setCollections(cookie, cm.add(CollectionEntry::vegetable, {}, true)); std::vector items; // Create a "dummy" Item that marks where the system-event is expected - items.emplace_back(makeStoredDocKey("a", CollectionEntry::vegetable), + items.emplace_back(makeStoredDocKey("ignored", CollectionEntry::vegetable), vbid, queue_op::system_event, 0, 1); items.emplace_back(store_item( - vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v0")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v0")); // temp flush in two batches as dedup is still on in the flusher flush_vbucket_to_disk(vbid, 1 + 1); items.emplace_back(store_item( - vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v1")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v1")); flush_vbucket_to_disk(vbid, 1); ensureDcpWillBackfill(); @@ -196,26 +198,26 @@ TEST_P(HistoryScanTest, TwoSnapshots) { std::vector items1, items2; // items1 represents the first snapshot, only the crate of vegetable will - // exist in this snapshot. The second history snapshot will have the 'a' + // exist in this snapshot. The second history snapshot will have the 'k0' // keys (both versions). items1.emplace_back(makeStoredDocKey("", CollectionEntry::vegetable), vbid, queue_op::system_event, 0, 1); - store_item(vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v0"); + store_item(vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v0"); flush_vbucket_to_disk(vbid, 1 + 1); - store_item(vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v1"); + store_item(vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v1"); flush_vbucket_to_disk(vbid, 1); // Now we must force history to begin from the next flush items2.emplace_back(store_item( - vbid, makeStoredDocKey("a", CollectionEntry::vegetable), "v2")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v2")); flush_vbucket_to_disk(vbid, 1); // @todo: switch to a using key 'a' when magma history support is available // then we can verify two versions of 'a' are returned items2.emplace_back(store_item( - vbid, makeStoredDocKey("b", CollectionEntry::vegetable), "v3")); + vbid, makeStoredDocKey("k0", CollectionEntry::vegetable), "v3")); flush_vbucket_to_disk(vbid, 1); ensureDcpWillBackfill(); @@ -272,10 +274,10 @@ TEST_P(HistoryScanTest, TwoSnapshots) { // Test OSO switches to history TEST_P(HistoryScanTest, OSOThenHistory) { - setHistoryStartSeqno(1); - - // This writes to fruit and vegetable - auto highSeqno = setupTwoCollections().second; + // Setup (which calls writeTwoCollections), then call writeTwoCollectios + // to generate some duplicates (history) + setupTwoCollections(); + auto highSeqno = writeTwoCollectios(true); ensureDcpWillBackfill(); @@ -304,7 +306,6 @@ TEST_P(HistoryScanTest, OSOThenHistory) { EXPECT_EQ(mcbp::systemevent::id::CreateCollection, producers->last_system_event); - uint64_t txHighSeqno = 0; std::array keys = {{"a", "b", "c", "d"}}; for (auto& k : keys) { @@ -314,7 +315,6 @@ TEST_P(HistoryScanTest, OSOThenHistory) { EXPECT_EQ(ClientOpcode::DcpMutation, producers->last_op); EXPECT_EQ(k, producers->last_key) << producers->last_byseqno; EXPECT_EQ(CollectionUid::vegetable, producers->last_collection_id); - txHighSeqno = std::max(txHighSeqno, producers->last_byseqno.load()); } // Now we get the end message @@ -323,26 +323,95 @@ TEST_P(HistoryScanTest, OSOThenHistory) { EXPECT_EQ(uint32_t(DcpOsoSnapshotFlags::End), producers->last_oso_snapshot_flags); - // Now we get the second snapshot + auto vb = store->getVBucket(vbid); + // Now we get the second snapshot, which is history stepAndExpect(ClientOpcode::DcpSnapshotMarker); - + EXPECT_EQ(vbid, producers->last_vbucket); + EXPECT_EQ(0, producers->last_snap_start_seqno); + EXPECT_EQ(vb->getPersistenceSeqno(), producers->last_snap_end_seqno); EXPECT_EQ(MARKER_FLAG_DISK | MARKER_FLAG_CHK | MARKER_FLAG_HISTORY | MARKER_FLAG_MAY_CONTAIN_DUPLICATE_KEYS, producers->last_flags); + stepAndExpect(ClientOpcode::DcpSystemEvent); + EXPECT_EQ(mcbp::systemevent::id::CreateCollection, + producers->last_system_event); + EXPECT_EQ(CollectionUid::vegetable, producers->last_collection_id); - // And all keys in seq order. Setup created in order b, d, a, c + // And all keys in seq order. writeTwoCollectios created in order b, d, a, c std::array keySeqnoOrder = {{"b", "d", "a", "c"}}; for (auto& k : keySeqnoOrder) { - // Now we get the mutations, they aren't guaranteed to be in seqno - // order, but we know that for now they will be in key order. stepAndExpect(ClientOpcode::DcpMutation); EXPECT_EQ(k, producers->last_key); EXPECT_EQ(CollectionUid::vegetable, producers->last_collection_id); } + // twice.. as we wrote them twice + for (auto& k : keySeqnoOrder) { + stepAndExpect(ClientOpcode::DcpMutation); + EXPECT_EQ(k, producers->last_key); + EXPECT_EQ(CollectionUid::vegetable, producers->last_collection_id); + } +} + +// Tests which don't need executing in two eviction modes +class HistoryScanTestSingleEvictionMode : public HistoryScanTest {}; + +// Test covers state machine transitions when a history scanHistory gets false +// from markDiskSnapshot +TEST_P(HistoryScanTestSingleEvictionMode, HistoryScanFailMarkDiskSnapshot) { + // Store an items, create new checkpoint and flush so we have something to + // backfill from disk + setVBucketStateAndRunPersistTask(vbid, vbucket_state_active); + store_item(vbid, makeStoredDocKey("key1"), "value"); + flushAndRemoveCheckpoints(vbid); + + // Create producer now we have items only on disk. + auto producer = std::make_shared( + *engine, cookie, "test-producer", 0 /*flags*/, false /*startTask*/); + ASSERT_EQ(cb::engine_errc::success, + producer->control(0, DcpControlKeys::ChangeStreams, "true")); + + auto vb = engine->getVBucket(vbid); + ASSERT_TRUE(vb); + auto stream = + std::make_shared(engine.get(), + producer, + DCP_ADD_STREAM_FLAG_DISKONLY, + 0, + *vb, + 0, + 1, + 0, + 0, + 0, + IncludeValue::Yes, + IncludeXattrs::Yes, + IncludeDeletedUserXattrs::No, + std::string{}); + + ASSERT_TRUE(stream->areChangeStreamsEnabled()); + stream->setActive(); + + // Create our own backfill to test + auto backfill = std::make_unique( + *engine->getKVBucket(), stream, 1, vb->getPersistenceSeqno()); + EXPECT_EQ(backfill_state_init, backfill->getState()); + EXPECT_EQ(backfill_success, backfill->run()); + EXPECT_EQ(backfill_state_scanning_history_snapshot, backfill->getState()); + // stream will error markDiskSnapshot + stream->setDead(cb::mcbp::DcpStreamEndStatus::Ok); + EXPECT_EQ(backfill_finished, backfill->run()); } INSTANTIATE_TEST_SUITE_P(HistoryScanTests, HistoryScanTest, STParameterizedBucketTest::magmaConfigValues(), - STParameterizedBucketTest::PrintToStringParamName); \ No newline at end of file + STParameterizedBucketTest::PrintToStringParamName); + +INSTANTIATE_TEST_SUITE_P( + HistoryScanTests, + HistoryScanTestSingleEvictionMode, + ::testing::Values("bucket_type=persistent:" + "backend=magma:" + "item_eviction_policy=full_eviction"), + STParameterizedBucketTest::PrintToStringParamName); \ No newline at end of file diff --git a/engines/ep/tests/module_tests/magma-kvstore_test.cc b/engines/ep/tests/module_tests/magma-kvstore_test.cc index bbcd77c3bf..ff220c96e6 100644 --- a/engines/ep/tests/module_tests/magma-kvstore_test.cc +++ b/engines/ep/tests/module_tests/magma-kvstore_test.cc @@ -20,6 +20,7 @@ #include "test_helpers.h" #include "thread_gate.h" #include +#include using namespace std::string_literals; using namespace testing; @@ -94,7 +95,8 @@ TEST_F(MagmaKVStoreRollbackTest, Rollback) { auto ctx = kvstore->begin(vbid, std::make_unique()); for (int j = 0; j < 5; j++) { - auto key = makeStoredDocKey("key" + std::to_string(seqno)); + // pad the key so key09 < key10 + auto key = makeStoredDocKey(fmt::format("key_{:02}", seqno)); auto qi = makeCommittedItem(key, "value"); flush.proposedVBState.lastSnapStart = seqno; flush.proposedVBState.lastSnapEnd = seqno; @@ -104,22 +106,22 @@ TEST_F(MagmaKVStoreRollbackTest, Rollback) { kvstore->commit(std::move(ctx), flush); } - auto rv = kvstore->get(makeDiskDocKey("key5"), Vbid(0)); + auto rv = kvstore->get(makeDiskDocKey("key_05"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); - rv = kvstore->get(makeDiskDocKey("key6"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_06"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); auto rollbackResult = kvstore->rollback(Vbid(0), 5, std::make_unique()); ASSERT_TRUE(rollbackResult.success); - rv = kvstore->get(makeDiskDocKey("key1"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_01"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); - rv = kvstore->get(makeDiskDocKey("key5"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_05"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::success); - rv = kvstore->get(makeDiskDocKey("key6"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_06"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::no_such_key); - rv = kvstore->get(makeDiskDocKey("key10"), Vbid(0)); + rv = kvstore->get(makeDiskDocKey("key_10"), Vbid(0)); EXPECT_EQ(rv.getStatus(), cb::engine_errc::no_such_key); auto vbs = kvstore->getCachedVBucketState(Vbid(0)); @@ -139,7 +141,8 @@ TEST_F(MagmaKVStoreRollbackTest, RollbackNoValidCheckpoint) { auto ctx = kvstore->begin(vbid, std::make_unique()); for (int j = 0; j < 5; j++) { - auto key = makeStoredDocKey("key" + std::to_string(seqno)); + // pad the key so key09 < key10 + auto key = makeStoredDocKey(fmt::format("key_{:02}", seqno)); auto qi = makeCommittedItem(key, "value"); qi->setBySeqno(seqno++); kvstore->set(*ctx, qi); @@ -619,17 +622,6 @@ class MockDirectory : public magma::Directory { TEST_F(MagmaKVStoreTest, readOnlyMode) { initialize_kv_store(kvstore.get(), vbid); - auto doWrite = [this](uint64_t seqno, bool expected) { - auto ctx = - kvstore->begin(vbid, std::make_unique()); - auto qi = makeCommittedItem(makeStoredDocKey("key"), "value"); - qi->setBySeqno(seqno); - flush.proposedVBState.lastSnapStart = seqno; - flush.proposedVBState.lastSnapEnd = seqno; - kvstore->set(*ctx, qi); - EXPECT_EQ(expected, kvstore->commit(std::move(ctx), flush)); - }; - // Add an item to test that we can read it doWrite(1, true /*success*/); @@ -715,10 +707,11 @@ TEST_F(MagmaKVStoreTest, makeFileHandleSyncFailed) { EXPECT_FALSE(fileHandle); } -// @todo: This is a basic test that will be expanded to cover scanning history -// at the moment this test is equivalent to "scan" -TEST_F(MagmaKVStoreTest, scanAllVersions) { +// Test scanAllVersions returns the expected number of keys and the expected +// history start seqno. +TEST_F(MagmaKVStoreTest, scanAllVersions1) { initialize_kv_store(kvstore.get(), vbid); + kvstore->setHistoryRetentionBytes(1024 * 1024); std::vector expectedItems; expectedItems.push_back(doWrite(1, true, "k1")); expectedItems.push_back(doWrite(2, true, "k2")); @@ -736,8 +729,7 @@ TEST_F(MagmaKVStoreTest, scanAllVersions) { ValueFilter::VALUES_COMPRESSED, SnapshotSource::Head); ASSERT_TRUE(bySeq); - // @todo: This must be the expected seqno where history begins - EXPECT_EQ(0, bySeq->historyStartSeqno); + EXPECT_EQ(1, bySeq->historyStartSeqno); EXPECT_EQ(scan_success, kvstore->scanAllVersions(*bySeq)); auto& cb = @@ -770,4 +762,76 @@ TEST_F(MagmaKVStoreTest, preparePendingRequests) { EXPECT_EQ(itr->second, req->getItem().getKey().c_str()); ++itr; } +} + +// Test scanAllVersions returns the expected number of keys and the expected +// history start seqno. This test uses the same key for all mutations. +TEST_F(MagmaKVStoreTest, scanAllVersions2) { + initialize_kv_store(kvstore.get(), vbid); + kvstore->setHistoryRetentionBytes(1024 * 1024); + std::vector expectedItems; + // doWrite writes the same key + expectedItems.push_back(doWrite(1, true)); + expectedItems.push_back(doWrite(2, true)); + auto validate = [&expectedItems](GetValue gv) { + ASSERT_TRUE(gv.item); + ASSERT_GE(expectedItems.size(), size_t(gv.item->getBySeqno())); + EXPECT_EQ(*expectedItems[gv.item->getBySeqno() - 1], *gv.item); + }; + auto bySeq = kvstore->initBySeqnoScanContext( + std::make_unique>(validate), + std::make_unique>(), + vbid, + 1, + DocumentFilter::ALL_ITEMS, + ValueFilter::VALUES_COMPRESSED, + SnapshotSource::Head); + ASSERT_TRUE(bySeq); + EXPECT_EQ(1, bySeq->historyStartSeqno); + EXPECT_EQ(scan_success, kvstore->scanAllVersions(*bySeq)); + + auto& cb = + static_cast&>(bySeq->getValueCallback()); + EXPECT_EQ(2, cb.getProcessedCount()); +} + +// ScanContext now exposes historyStartSeqno which is tracked by magma provided +// it is retaining history. +TEST_F(MagmaKVStoreTest, historyStartSeqno) { + initialize_kv_store(kvstore.get(), vbid); + + auto validate = [this](uint64_t expectedSeqno) { + auto bySeq = kvstore->initBySeqnoScanContext( + std::make_unique(true /*expectcompressed*/), + std::make_unique(1, 5, Vbid(0)), + vbid, + 1, + DocumentFilter::ALL_ITEMS, + ValueFilter::VALUES_COMPRESSED, + SnapshotSource::Head); + auto byId = kvstore->initByIdScanContext( + std::make_unique(true /*expectcompressed*/), + std::make_unique(1, 5, Vbid(0)), + vbid, + {}, + DocumentFilter::ALL_ITEMS, + ValueFilter::VALUES_COMPRESSED); + ASSERT_TRUE(bySeq); + ASSERT_TRUE(byId); + + EXPECT_EQ(expectedSeqno, bySeq->historyStartSeqno); + EXPECT_EQ(bySeq->historyStartSeqno, byId->historyStartSeqno); + }; + + kvstore->setHistoryRetentionBytes(0); + validate(0); + doWrite(2, true); // write seqno 2 + validate(0); + kvstore->setHistoryRetentionBytes(1024 * 1024); + doWrite(3, true); // write seqno 3 + validate(3); + doWrite(4, true); // write seqno 4 + validate(3); // history still starts at 3 + kvstore->setHistoryRetentionBytes(0); + validate(0); // back to no history } \ No newline at end of file