From e632de38fb59bcac9f103e2e35a4b542d6c25a58 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 5 Oct 2021 00:15:36 +0800 Subject: [PATCH 1/6] memdb, badger: implement DBConnection.Revert --- db/badgerdb/db.go | 122 ++++++++++++++++++++++++++++++++++++----- db/badgerdb/db_test.go | 5 ++ db/memdb/db.go | 24 ++++++++ db/memdb/db_test.go | 4 ++ 4 files changed, 142 insertions(+), 13 deletions(-) diff --git a/db/badgerdb/db.go b/db/badgerdb/db.go index c52c6172bba..2791d56af51 100644 --- a/db/badgerdb/db.go +++ b/db/badgerdb/db.go @@ -2,9 +2,9 @@ package badgerdb import ( "bytes" + "context" "encoding/csv" "errors" - "math" "os" "path/filepath" "strconv" @@ -15,6 +15,8 @@ import ( dbutil "github.com/cosmos/cosmos-sdk/db/internal" "github.com/dgraph-io/badger/v3" + bpb "github.com/dgraph-io/badger/v3/pb" + "github.com/dgraph-io/ristretto/z" ) var ( @@ -112,6 +114,7 @@ func readVersionsFile(path string) (*versionManager, error) { return nil, err } var versions []uint64 + var lastTs uint64 vmap := map[uint64]uint64{} for _, row := range rows { version, err := strconv.ParseUint(row[0], 10, 64) @@ -122,6 +125,9 @@ func readVersionsFile(path string) (*versionManager, error) { if err != nil { return nil, err } + if version == 0 { // 0 maps to the latest TS + lastTs = ts + } versions = append(versions, version) vmap[version] = ts } @@ -129,7 +135,7 @@ func readVersionsFile(path string) (*versionManager, error) { return &versionManager{ VersionManager: vmgr, vmap: vmap, - lastTs: vmgr.Last(), + lastTs: lastTs, }, nil } @@ -142,6 +148,10 @@ func writeVersionsFile(vm *versionManager, path string) error { defer file.Close() w := csv.NewWriter(file) var rows [][]string + rows = append(rows, []string{ + strconv.FormatUint(0, 10), + strconv.FormatUint(vm.lastTs, 10), + }) for it := vm.Iterator(); it.Next(); { version := it.Value() ts, ok := vm.vmap[version] @@ -157,16 +167,20 @@ func writeVersionsFile(vm *versionManager, path string) error { } func (b *BadgerDB) Reader() dbm.DBReader { - return &badgerTxn{txn: b.db.NewTransactionAt(math.MaxUint64, false), db: b} + b.mtx.RLock() + ts := b.vmgr.lastTs + b.mtx.RUnlock() + return &badgerTxn{txn: b.db.NewTransactionAt(ts, false), db: b} } func (b *BadgerDB) ReaderAt(version uint64) (dbm.DBReader, error) { b.mtx.RLock() defer b.mtx.RUnlock() - if !b.vmgr.Exists(version) { + ts, has := b.vmgr.versionTs(version) + if !has { return nil, dbm.ErrVersionDoesNotExist } - return &badgerTxn{txn: b.db.NewTransactionAt(b.vmgr.versionTs(version), false), db: b}, nil + return &badgerTxn{txn: b.db.NewTransactionAt(ts, false), db: b}, nil } func (b *BadgerDB) ReadWriter() dbm.DBReadWriter { @@ -232,6 +246,80 @@ func (b *BadgerDB) DeleteVersion(target uint64) error { return nil } +func (b *BadgerDB) Revert() error { + b.mtx.RLock() + defer b.mtx.RUnlock() + if b.openWriters > 0 { + return dbm.ErrOpenTransactions + } + + last := b.vmgr.Last() + if last == 0 { + return dbm.ErrInvalidVersion + } + // Revert from latest commit TS to last "saved" TS + target, has := b.vmgr.versionTs(last) + if !has { + return errors.New("bad version history") + } + lastTs := b.vmgr.lastTs + if target == lastTs { + return nil + } + + // Badger provides no way to rollback committed data, so we undo all changes + // since the target version using the Stream API + stream := b.db.NewStreamAt(lastTs) + // Skips unchanged keys + stream.ChooseKey = func(item *badger.Item) bool { return item.Version() > target } + // Scans for value at target version + stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { + kv := bpb.KV{Key: key} + // advance down to <= target version + itr.Next() // we have at least one newer version + for itr.Valid() && bytes.Equal(key, itr.Item().Key()) && itr.Item().Version() > target { + itr.Next() + } + if itr.Valid() && bytes.Equal(key, itr.Item().Key()) && !itr.Item().IsDeletedOrExpired() { + var err error + kv.Value, err = itr.Item().ValueCopy(nil) + if err != nil { + return nil, err + } + } + return &bpb.KVList{Kv: []*bpb.KV{&kv}}, nil + } + txn := b.db.NewTransactionAt(lastTs, true) + defer txn.Discard() + stream.Send = func(buf *z.Buffer) error { + kvl, err := badger.BufferToKVList(buf) + if err != nil { + return err + } + // nil Value indicates a deleted entry + for _, kv := range kvl.Kv { + if kv.Value == nil { + err = txn.Delete(kv.Key) + if err != nil { + return err + } + } else { + err = txn.Set(kv.Key, kv.Value) + if err != nil { + return err + } + } + } + return nil + } + + err := stream.Orchestrate(context.Background()) + if err != nil { + return err + } + return txn.CommitAt(lastTs, nil) +} + func (b *BadgerDB) Stats() map[string]string { return nil } func (tx *badgerTxn) Get(key []byte) ([]byte, error) { @@ -385,14 +473,23 @@ func (i *badgerIterator) Value() []byte { return val } -func (vm *versionManager) versionTs(ver uint64) uint64 { - return vm.vmap[ver] +func (vm *versionManager) versionTs(ver uint64) (uint64, bool) { + ts, has := vm.vmap[ver] + return ts, has +} + +// updateCommitTs increments the lastTs if equal to readts. +func (vm *versionManager) updateCommitTs(readts uint64) { + if vm.lastTs == readts { + vm.lastTs += 1 + } } // Atomically accesses the last commit timestamp used as a version marker. func (vm *versionManager) lastCommitTs() uint64 { return atomic.LoadUint64(&vm.lastTs) } + func (vm *versionManager) Copy() *versionManager { vmap := map[uint64]uint64{} for ver, ts := range vm.vmap { @@ -405,12 +502,6 @@ func (vm *versionManager) Copy() *versionManager { } } -// updateCommitTs increments the lastTs if equal to readts. -func (vm *versionManager) updateCommitTs(readts uint64) { - if vm.lastTs == readts { - vm.lastTs += 1 - } -} func (vm *versionManager) Save(target uint64) (uint64, error) { id, err := vm.VersionManager.Save(target) if err != nil { @@ -419,3 +510,8 @@ func (vm *versionManager) Save(target uint64) (uint64, error) { vm.vmap[id] = vm.lastTs // non-atomic, already guarded by the vmgr mutex return id, nil } + +func (vm *versionManager) Delete(target uint64) { + vm.VersionManager.Delete(target) + delete(vm.vmap, target) +} diff --git a/db/badgerdb/db_test.go b/db/badgerdb/db_test.go index d451a63f433..419f595d60c 100644 --- a/db/badgerdb/db_test.go +++ b/db/badgerdb/db_test.go @@ -31,6 +31,11 @@ func TestVersioning(t *testing.T) { dbtest.DoTestVersioning(t, load) } +func TestRevert(t *testing.T) { + dbtest.DoTestRevert(t, load, false) + dbtest.DoTestRevert(t, load, true) +} + func TestReloadDB(t *testing.T) { dbtest.DoTestReloadDB(t, load) } diff --git a/db/memdb/db.go b/db/memdb/db.go index e80688eff7f..86a2e01cdd5 100644 --- a/db/memdb/db.go +++ b/db/memdb/db.go @@ -159,6 +159,30 @@ func (db *MemDB) DeleteVersion(target uint64) error { return nil } +func (db *MemDB) Revert() error { + db.mtx.RLock() + defer db.mtx.RUnlock() + if db.openWriters > 0 { + return dbm.ErrOpenTransactions + } + + last := db.vmgr.Last() + if last == 0 { + return dbm.ErrInvalidVersion // revert needs at least one version + } + var has bool + db.btree, has = db.saved[last] + if !has { + return fmt.Errorf("bad version history: version %v not saved", last) + } + for ver, _ := range db.saved { + if ver > last { + delete(db.saved, ver) + } + } + return nil +} + // Get implements DBReader. func (tx *dbTxn) Get(key []byte) ([]byte, error) { if tx.btree == nil { diff --git a/db/memdb/db_test.go b/db/memdb/db_test.go index a3ac242eac1..cd5bd890816 100644 --- a/db/memdb/db_test.go +++ b/db/memdb/db_test.go @@ -44,6 +44,10 @@ func TestVersioning(t *testing.T) { dbtest.DoTestVersioning(t, load) } +func TestRevert(t *testing.T) { + dbtest.DoTestRevert(t, load, false) +} + func TestTransactions(t *testing.T) { dbtest.DoTestTransactions(t, load, false) } From b7023ed862279a0d47fade0933a4a60a27c46a84 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Wed, 6 Oct 2021 15:44:21 +0800 Subject: [PATCH 2/6] group vars --- db/badgerdb/db.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/db/badgerdb/db.go b/db/badgerdb/db.go index 2791d56af51..5a49ed7388d 100644 --- a/db/badgerdb/db.go +++ b/db/badgerdb/db.go @@ -113,8 +113,10 @@ func readVersionsFile(path string) (*versionManager, error) { if err != nil { return nil, err } - var versions []uint64 - var lastTs uint64 + var ( + versions []uint64 + lastTs uint64 + ) vmap := map[uint64]uint64{} for _, row := range rows { version, err := strconv.ParseUint(row[0], 10, 64) From 6296555b32c65c0e1182fb527964d04ba49498cb Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Wed, 6 Oct 2021 17:59:04 +0800 Subject: [PATCH 3/6] allow reverting to empty db --- db/badgerdb/db.go | 15 +++++++++------ db/dbtest/testcases.go | 36 +++++++++++++++++++++++------------- db/memdb/db.go | 3 ++- db/rocksdb/db.go | 12 +++++------- db/types.go | 3 +-- 5 files changed, 40 insertions(+), 29 deletions(-) diff --git a/db/badgerdb/db.go b/db/badgerdb/db.go index 5a49ed7388d..224e7f6d7f7 100644 --- a/db/badgerdb/db.go +++ b/db/badgerdb/db.go @@ -255,14 +255,17 @@ func (b *BadgerDB) Revert() error { return dbm.ErrOpenTransactions } + // Revert from latest commit TS to last "saved" TS + // if no versions exist, use 0 as it precedes any possible commit TS + var target uint64 last := b.vmgr.Last() if last == 0 { - return dbm.ErrInvalidVersion - } - // Revert from latest commit TS to last "saved" TS - target, has := b.vmgr.versionTs(last) - if !has { - return errors.New("bad version history") + target = 0 + } else { + var has bool + if target, has = b.vmgr.versionTs(last); !has { + return errors.New("bad version history") + } } lastTs := b.vmgr.lastTs if target == lastTs { diff --git a/db/dbtest/testcases.go b/db/dbtest/testcases.go index 304fde5582b..dde135fa36e 100644 --- a/db/dbtest/testcases.go +++ b/db/dbtest/testcases.go @@ -399,27 +399,37 @@ func DoTestRevert(t *testing.T, load Loader, reload bool) { db := load(t, dirname) var txn dbm.DBWriter - txn = db.Writer() - require.NoError(t, txn.Set([]byte{2}, []byte{2})) - require.NoError(t, txn.Commit()) + initContents := func() { + txn = db.Writer() + require.NoError(t, txn.Set([]byte{2}, []byte{2})) + require.NoError(t, txn.Commit()) - txn = db.Writer() - for i := byte(6); i < 10; i++ { - require.NoError(t, txn.Set([]byte{i}, []byte{i})) + txn = db.Writer() + for i := byte(6); i < 10; i++ { + require.NoError(t, txn.Set([]byte{i}, []byte{i})) + } + require.NoError(t, txn.Delete([]byte{2})) + require.NoError(t, txn.Delete([]byte{3})) + require.NoError(t, txn.Commit()) } - require.NoError(t, txn.Delete([]byte{2})) - require.NoError(t, txn.Delete([]byte{3})) - require.NoError(t, txn.Commit()) - require.Error(t, db.Revert()) // can't revert with no versions + initContents() + require.NoError(t, db.Revert()) + view := db.Reader() + it, err := view.Iterator(nil, nil) + require.NoError(t, err) + require.False(t, it.Next()) // db is empty + require.NoError(t, it.Close()) + require.NoError(t, view.Discard()) - _, err := db.SaveNextVersion() + initContents() + _, err = db.SaveNextVersion() require.NoError(t, err) // get snapshot of db state state := map[string][]byte{} - view := db.Reader() - it, err := view.Iterator(nil, nil) + view = db.Reader() + it, err = view.Iterator(nil, nil) require.NoError(t, err) for it.Next() { state[string(it.Key())] = it.Value() diff --git a/db/memdb/db.go b/db/memdb/db.go index 86a2e01cdd5..b656d60edad 100644 --- a/db/memdb/db.go +++ b/db/memdb/db.go @@ -168,7 +168,8 @@ func (db *MemDB) Revert() error { last := db.vmgr.Last() if last == 0 { - return dbm.ErrInvalidVersion // revert needs at least one version + db.btree = btree.New(bTreeDegree) + return nil } var has bool db.btree, has = db.saved[last] diff --git a/db/rocksdb/db.go b/db/rocksdb/db.go index f906ea5316b..4b69172b517 100644 --- a/db/rocksdb/db.go +++ b/db/rocksdb/db.go @@ -284,10 +284,6 @@ func (mgr *dbManager) Revert() (err error) { if mgr.openWriters > 0 { return dbm.ErrOpenTransactions } - last := mgr.vmgr.Last() - if last == 0 { - return dbm.ErrInvalidVersion - } // Close current connection and replace it with a checkpoint (created from the last checkpoint) mgr.current.Close() dbPath := filepath.Join(mgr.dir, currentDBFileName) @@ -295,9 +291,11 @@ func (mgr *dbManager) Revert() (err error) { if err != nil { return } - err = mgr.restoreFromCheckpoint(last, dbPath) - if err != nil { - return + if last := mgr.vmgr.Last(); last != 0 { + err = mgr.restoreFromCheckpoint(last, dbPath) + if err != nil { + return + } } mgr.current, err = gorocksdb.OpenOptimisticTransactionDb(mgr.opts.dbo, dbPath) return diff --git a/db/types.go b/db/types.go index d360f57f4a7..137cd7d5575 100644 --- a/db/types.go +++ b/db/types.go @@ -61,8 +61,7 @@ type DBConnection interface { // Deletes a saved version. Returns ErrVersionDoesNotExist for invalid versions. DeleteVersion(uint64) error - // Reverts the DB state to the last saved version. - // Returns an error if no saved versions exist. + // Reverts the DB state to the last saved version; if none exist, this clears the DB. // Returns an error if any open DBWriter transactions exist. Revert() error From 73bdf055254c033914e06a22c5e3afc6fdbd75a2 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 7 Oct 2021 16:46:58 +0800 Subject: [PATCH 4/6] PR revisions --- db/badgerdb/db.go | 8 +++----- db/types.go | 28 ++++++++++++++-------------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/db/badgerdb/db.go b/db/badgerdb/db.go index 224e7f6d7f7..9b785fe414c 100644 --- a/db/badgerdb/db.go +++ b/db/badgerdb/db.go @@ -149,11 +149,9 @@ func writeVersionsFile(vm *versionManager, path string) error { } defer file.Close() w := csv.NewWriter(file) - var rows [][]string - rows = append(rows, []string{ - strconv.FormatUint(0, 10), - strconv.FormatUint(vm.lastTs, 10), - }) + rows := [][]string{ + []string{"0", strconv.FormatUint(vm.lastTs, 10)}, + } for it := vm.Iterator(); it.Next(); { version := it.Value() ts, ok := vm.vmap[version] diff --git a/db/types.go b/db/types.go index 137cd7d5575..bab74bf8c5e 100644 --- a/db/types.go +++ b/db/types.go @@ -31,37 +31,37 @@ var ( // and read and write access. // Past versions are only accessible read-only. type DBConnection interface { - // Opens a read-only transaction at the current working version. + // Reader opens a read-only transaction at the current working version. Reader() DBReader - // Opens a read-only transaction at a specified version. + // ReaderAt opens a read-only transaction at a specified version. // Returns ErrVersionDoesNotExist for invalid versions. ReaderAt(uint64) (DBReader, error) - // Opens a read-write transaction at the current version. + // ReadWriter opens a read-write transaction at the current version. ReadWriter() DBReadWriter - // Opens a write-only transaction at the current version. + // Writer opens a write-only transaction at the current version. Writer() DBWriter - // Returns all saved versions as an immutable set which is safe for concurrent access. + // Versions returns all saved versions as an immutable set which is safe for concurrent access. Versions() (VersionSet, error) - // Saves the current contents of the database and returns the next version ID, which will be - // `Versions().Last()+1`. + // SaveNextVersion saves the current contents of the database and returns the next version ID, + // which will be `Versions().Last()+1`. // Returns an error if any open DBWriter transactions exist. // TODO: rename to something more descriptive? SaveNextVersion() (uint64, error) - // Attempts to save database at a specific version ID, which must be greater than or equal to - // what would be returned by `SaveNextVersion`. + // SaveVersion attempts to save database at a specific version ID, which must be greater than or + // equal to what would be returned by `SaveNextVersion`. // Returns an error if any open DBWriter transactions exist. SaveVersion(uint64) error - // Deletes a saved version. Returns ErrVersionDoesNotExist for invalid versions. + // DeleteVersion deletes a saved version. Returns ErrVersionDoesNotExist for invalid versions. DeleteVersion(uint64) error - // Reverts the DB state to the last saved version; if none exist, this clears the DB. + // Revert reverts the DB state to the last saved version; if none exist, this clears the DB. // Returns an error if any open DBWriter transactions exist. Revert() error @@ -100,7 +100,7 @@ type DBReader interface { // TODO: replace with an extra argument to Iterator()? ReverseIterator(start, end []byte) (Iterator, error) - // Discards the transaction, invalidating any future operations on it. + // Discard discards the transaction, invalidating any future operations on it. Discard() error } @@ -117,10 +117,10 @@ type DBWriter interface { // CONTRACT: key readonly []byte Delete([]byte) error - // Flushes pending writes and discards the transaction. + // Commit flushes pending writes and discards the transaction. Commit() error - // Discards the transaction, invalidating any future operations on it. + // Discard discards the transaction, invalidating any future operations on it. Discard() error } From 1ca0957f6b017d5fdd84d4aae247bfeca502b5ce Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Thu, 7 Oct 2021 18:05:33 +0800 Subject: [PATCH 5/6] changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4dd5611d6f6..6e666876719 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -189,6 +189,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * [\#9952](https://github.com/cosmos/cosmos-sdk/pull/9952) ADR 040: Implement in-memory DB backend * [\#9848](https://github.com/cosmos/cosmos-sdk/pull/9848) ADR-040: Implement BadgerDB backend * [\#9851](https://github.com/cosmos/cosmos-sdk/pull/9851) ADR-040: Implement RocksDB backend +* [\#10308](https://github.com/cosmos/cosmos-sdk/pull/10308) ADR-040: Implement DBConnection.Revert ### Client Breaking Changes From 7352f471de054ca06cb6ecbec0db43bdb40ab928 Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Fri, 8 Oct 2021 14:11:51 +0800 Subject: [PATCH 6/6] don't abbreviate timestamp --- db/badgerdb/db.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/db/badgerdb/db.go b/db/badgerdb/db.go index 9b785fe414c..894537a57cf 100644 --- a/db/badgerdb/db.go +++ b/db/badgerdb/db.go @@ -59,10 +59,10 @@ type badgerIterator struct { // Map our versions to Badger timestamps. // -// A badger Txn's commit TS must be strictly greater than a record's "last-read" -// TS in order to detect conflicts, and a Txn must be read at a TS after last +// A badger Txn's commit timestamp must be strictly greater than a record's "last-read" +// timestamp in order to detect conflicts, and a Txn must be read at a timestamp after last // commit to see current state. So we must use commit increments that are more -// granular than our version interval, and map versions to the corresponding TS. +// granular than our version interval, and map versions to the corresponding timestamp. type versionManager struct { *dbm.VersionManager vmap map[uint64]uint64 @@ -127,7 +127,7 @@ func readVersionsFile(path string) (*versionManager, error) { if err != nil { return nil, err } - if version == 0 { // 0 maps to the latest TS + if version == 0 { // 0 maps to the latest timestamp lastTs = ts } versions = append(versions, version) @@ -253,8 +253,8 @@ func (b *BadgerDB) Revert() error { return dbm.ErrOpenTransactions } - // Revert from latest commit TS to last "saved" TS - // if no versions exist, use 0 as it precedes any possible commit TS + // Revert from latest commit timestamp to last "saved" timestamp + // if no versions exist, use 0 as it precedes any possible commit timestamp var target uint64 last := b.vmgr.Last() if last == 0 { @@ -374,7 +374,7 @@ func (tx *badgerWriter) Commit() (err error) { return errors.New("transaction has been discarded") } defer func() { err = dbutil.CombineErrors(err, tx.Discard(), "Discard also failed") }() - // Commit to the current commit TS, after ensuring it is > ReadTs + // Commit to the current commit timestamp, after ensuring it is > ReadTs tx.db.mtx.RLock() tx.db.vmgr.updateCommitTs(tx.txn.ReadTs()) ts := tx.db.vmgr.lastTs