Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ADR-040: Implement DBConnection.Revert #10308

Merged
merged 7 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* [\#9952](https://github.com/cosmos/cosmos-sdk/pull/9952) ADR 040: Implement in-memory DB backend
* [\#9848](https://github.com/cosmos/cosmos-sdk/pull/9848) ADR-040: Implement BadgerDB backend
* [\#9851](https://github.com/cosmos/cosmos-sdk/pull/9851) ADR-040: Implement RocksDB backend
* [\#10308](https://github.com/cosmos/cosmos-sdk/pull/10308) ADR-040: Implement DBConnection.Revert


### Client Breaking Changes
Expand Down
137 changes: 118 additions & 19 deletions db/badgerdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package badgerdb

import (
"bytes"
"context"
"encoding/csv"
"errors"
"math"
"os"
"path/filepath"
"strconv"
Expand All @@ -15,6 +15,8 @@ import (
dbutil "github.com/cosmos/cosmos-sdk/db/internal"

"github.com/dgraph-io/badger/v3"
bpb "github.com/dgraph-io/badger/v3/pb"
"github.com/dgraph-io/ristretto/z"
)

var (
Expand Down Expand Up @@ -57,10 +59,10 @@ type badgerIterator struct {

// Map our versions to Badger timestamps.
//
// A badger Txn's commit TS must be strictly greater than a record's "last-read"
// TS in order to detect conflicts, and a Txn must be read at a TS after last
// A badger Txn's commit timestamp must be strictly greater than a record's "last-read"
// timestamp in order to detect conflicts, and a Txn must be read at a timestamp after last
// commit to see current state. So we must use commit increments that are more
// granular than our version interval, and map versions to the corresponding TS.
// granular than our version interval, and map versions to the corresponding timestamp.
type versionManager struct {
*dbm.VersionManager
vmap map[uint64]uint64
Expand Down Expand Up @@ -111,7 +113,10 @@ func readVersionsFile(path string) (*versionManager, error) {
if err != nil {
return nil, err
}
var versions []uint64
var (
versions []uint64
lastTs uint64
)
vmap := map[uint64]uint64{}
for _, row := range rows {
version, err := strconv.ParseUint(row[0], 10, 64)
Expand All @@ -122,14 +127,17 @@ func readVersionsFile(path string) (*versionManager, error) {
if err != nil {
return nil, err
}
if version == 0 { // 0 maps to the latest timestamp
lastTs = ts
}
versions = append(versions, version)
vmap[version] = ts
}
vmgr := dbm.NewVersionManager(versions)
return &versionManager{
VersionManager: vmgr,
vmap: vmap,
lastTs: vmgr.Last(),
lastTs: lastTs,
}, nil
}

Expand All @@ -141,7 +149,9 @@ func writeVersionsFile(vm *versionManager, path string) error {
}
defer file.Close()
w := csv.NewWriter(file)
var rows [][]string
rows := [][]string{
[]string{"0", strconv.FormatUint(vm.lastTs, 10)},
}
for it := vm.Iterator(); it.Next(); {
version := it.Value()
ts, ok := vm.vmap[version]
Expand All @@ -157,16 +167,20 @@ func writeVersionsFile(vm *versionManager, path string) error {
}

func (b *BadgerDB) Reader() dbm.DBReader {
return &badgerTxn{txn: b.db.NewTransactionAt(math.MaxUint64, false), db: b}
b.mtx.RLock()
ts := b.vmgr.lastTs
b.mtx.RUnlock()
return &badgerTxn{txn: b.db.NewTransactionAt(ts, false), db: b}
}

func (b *BadgerDB) ReaderAt(version uint64) (dbm.DBReader, error) {
b.mtx.RLock()
defer b.mtx.RUnlock()
if !b.vmgr.Exists(version) {
ts, has := b.vmgr.versionTs(version)
if !has {
return nil, dbm.ErrVersionDoesNotExist
}
return &badgerTxn{txn: b.db.NewTransactionAt(b.vmgr.versionTs(version), false), db: b}, nil
return &badgerTxn{txn: b.db.NewTransactionAt(ts, false), db: b}, nil
}

func (b *BadgerDB) ReadWriter() dbm.DBReadWriter {
Expand Down Expand Up @@ -232,6 +246,83 @@ func (b *BadgerDB) DeleteVersion(target uint64) error {
return nil
}

func (b *BadgerDB) Revert() error {
b.mtx.RLock()
defer b.mtx.RUnlock()
if b.openWriters > 0 {
return dbm.ErrOpenTransactions
}

// Revert from latest commit timestamp to last "saved" timestamp
// if no versions exist, use 0 as it precedes any possible commit timestamp
var target uint64
last := b.vmgr.Last()
if last == 0 {
target = 0
} else {
var has bool
if target, has = b.vmgr.versionTs(last); !has {
return errors.New("bad version history")
}
}
lastTs := b.vmgr.lastTs
if target == lastTs {
return nil
}

// Badger provides no way to rollback committed data, so we undo all changes
// since the target version using the Stream API
stream := b.db.NewStreamAt(lastTs)
// Skips unchanged keys
stream.ChooseKey = func(item *badger.Item) bool { return item.Version() > target }
// Scans for value at target version
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
kv := bpb.KV{Key: key}
// advance down to <= target version
itr.Next() // we have at least one newer version
for itr.Valid() && bytes.Equal(key, itr.Item().Key()) && itr.Item().Version() > target {
itr.Next()
}
if itr.Valid() && bytes.Equal(key, itr.Item().Key()) && !itr.Item().IsDeletedOrExpired() {
roysc marked this conversation as resolved.
Show resolved Hide resolved
var err error
kv.Value, err = itr.Item().ValueCopy(nil)
if err != nil {
return nil, err
}
}
return &bpb.KVList{Kv: []*bpb.KV{&kv}}, nil
}
txn := b.db.NewTransactionAt(lastTs, true)
defer txn.Discard()
stream.Send = func(buf *z.Buffer) error {
kvl, err := badger.BufferToKVList(buf)
if err != nil {
return err
}
// nil Value indicates a deleted entry
for _, kv := range kvl.Kv {
if kv.Value == nil {
err = txn.Delete(kv.Key)
if err != nil {
return err
}
} else {
err = txn.Set(kv.Key, kv.Value)
if err != nil {
return err
}
}
}
return nil
}

err := stream.Orchestrate(context.Background())
if err != nil {
return err
}
return txn.CommitAt(lastTs, nil)
}

func (b *BadgerDB) Stats() map[string]string { return nil }

func (tx *badgerTxn) Get(key []byte) ([]byte, error) {
Expand Down Expand Up @@ -283,7 +374,7 @@ func (tx *badgerWriter) Commit() (err error) {
return errors.New("transaction has been discarded")
}
defer func() { err = dbutil.CombineErrors(err, tx.Discard(), "Discard also failed") }()
// Commit to the current commit TS, after ensuring it is > ReadTs
// Commit to the current commit timestamp, after ensuring it is > ReadTs
tx.db.mtx.RLock()
tx.db.vmgr.updateCommitTs(tx.txn.ReadTs())
ts := tx.db.vmgr.lastTs
Expand Down Expand Up @@ -385,14 +476,23 @@ func (i *badgerIterator) Value() []byte {
return val
}

func (vm *versionManager) versionTs(ver uint64) uint64 {
return vm.vmap[ver]
func (vm *versionManager) versionTs(ver uint64) (uint64, bool) {
ts, has := vm.vmap[ver]
return ts, has
}

// updateCommitTs increments the lastTs if equal to readts.
func (vm *versionManager) updateCommitTs(readts uint64) {
if vm.lastTs == readts {
vm.lastTs += 1
}
}

// Atomically accesses the last commit timestamp used as a version marker.
func (vm *versionManager) lastCommitTs() uint64 {
return atomic.LoadUint64(&vm.lastTs)
}

func (vm *versionManager) Copy() *versionManager {
vmap := map[uint64]uint64{}
for ver, ts := range vm.vmap {
Expand All @@ -405,12 +505,6 @@ func (vm *versionManager) Copy() *versionManager {
}
}

// updateCommitTs increments the lastTs if equal to readts.
func (vm *versionManager) updateCommitTs(readts uint64) {
if vm.lastTs == readts {
vm.lastTs += 1
}
}
func (vm *versionManager) Save(target uint64) (uint64, error) {
id, err := vm.VersionManager.Save(target)
if err != nil {
Expand All @@ -419,3 +513,8 @@ func (vm *versionManager) Save(target uint64) (uint64, error) {
vm.vmap[id] = vm.lastTs // non-atomic, already guarded by the vmgr mutex
return id, nil
}

func (vm *versionManager) Delete(target uint64) {
vm.VersionManager.Delete(target)
delete(vm.vmap, target)
}
5 changes: 5 additions & 0 deletions db/badgerdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func TestVersioning(t *testing.T) {
dbtest.DoTestVersioning(t, load)
}

func TestRevert(t *testing.T) {
dbtest.DoTestRevert(t, load, false)
dbtest.DoTestRevert(t, load, true)
}

func TestReloadDB(t *testing.T) {
dbtest.DoTestReloadDB(t, load)
}
36 changes: 23 additions & 13 deletions db/dbtest/testcases.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,27 +399,37 @@ func DoTestRevert(t *testing.T, load Loader, reload bool) {
db := load(t, dirname)
var txn dbm.DBWriter

txn = db.Writer()
require.NoError(t, txn.Set([]byte{2}, []byte{2}))
require.NoError(t, txn.Commit())
initContents := func() {
txn = db.Writer()
require.NoError(t, txn.Set([]byte{2}, []byte{2}))
require.NoError(t, txn.Commit())

txn = db.Writer()
for i := byte(6); i < 10; i++ {
require.NoError(t, txn.Set([]byte{i}, []byte{i}))
txn = db.Writer()
for i := byte(6); i < 10; i++ {
require.NoError(t, txn.Set([]byte{i}, []byte{i}))
}
require.NoError(t, txn.Delete([]byte{2}))
require.NoError(t, txn.Delete([]byte{3}))
require.NoError(t, txn.Commit())
}
require.NoError(t, txn.Delete([]byte{2}))
require.NoError(t, txn.Delete([]byte{3}))
require.NoError(t, txn.Commit())

require.Error(t, db.Revert()) // can't revert with no versions
initContents()
require.NoError(t, db.Revert())
view := db.Reader()
it, err := view.Iterator(nil, nil)
require.NoError(t, err)
require.False(t, it.Next()) // db is empty
require.NoError(t, it.Close())
require.NoError(t, view.Discard())

_, err := db.SaveNextVersion()
initContents()
_, err = db.SaveNextVersion()
require.NoError(t, err)

// get snapshot of db state
state := map[string][]byte{}
view := db.Reader()
it, err := view.Iterator(nil, nil)
view = db.Reader()
it, err = view.Iterator(nil, nil)
require.NoError(t, err)
for it.Next() {
state[string(it.Key())] = it.Value()
Expand Down
25 changes: 25 additions & 0 deletions db/memdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,31 @@ func (db *MemDB) DeleteVersion(target uint64) error {
return nil
}

func (db *MemDB) Revert() error {
db.mtx.RLock()
defer db.mtx.RUnlock()
if db.openWriters > 0 {
return dbm.ErrOpenTransactions
}

last := db.vmgr.Last()
if last == 0 {
db.btree = btree.New(bTreeDegree)
return nil
}
var has bool
db.btree, has = db.saved[last]
if !has {
return fmt.Errorf("bad version history: version %v not saved", last)
}
for ver, _ := range db.saved {
if ver > last {
delete(db.saved, ver)
}
}
return nil
}

// Get implements DBReader.
func (tx *dbTxn) Get(key []byte) ([]byte, error) {
if tx.btree == nil {
Expand Down
4 changes: 4 additions & 0 deletions db/memdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func TestVersioning(t *testing.T) {
dbtest.DoTestVersioning(t, load)
}

func TestRevert(t *testing.T) {
dbtest.DoTestRevert(t, load, false)
}

func TestTransactions(t *testing.T) {
dbtest.DoTestTransactions(t, load, false)
}
12 changes: 5 additions & 7 deletions db/rocksdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,20 +284,18 @@ func (mgr *dbManager) Revert() (err error) {
if mgr.openWriters > 0 {
return dbm.ErrOpenTransactions
}
last := mgr.vmgr.Last()
if last == 0 {
return dbm.ErrInvalidVersion
}
// Close current connection and replace it with a checkpoint (created from the last checkpoint)
mgr.current.Close()
dbPath := filepath.Join(mgr.dir, currentDBFileName)
err = os.RemoveAll(dbPath)
if err != nil {
return
}
err = mgr.restoreFromCheckpoint(last, dbPath)
if err != nil {
return
if last := mgr.vmgr.Last(); last != 0 {
err = mgr.restoreFromCheckpoint(last, dbPath)
if err != nil {
return
}
}
mgr.current, err = gorocksdb.OpenOptimisticTransactionDb(mgr.opts.dbo, dbPath)
return
Expand Down
Loading