Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store/v2): snapshot manager #18458

Merged
merged 18 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 149 additions & 2 deletions store/commitment/iavl/tree.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,39 @@
package iavl

import (
"errors"
"fmt"
"io"
"math"

dbm "github.com/cosmos/cosmos-db"
protoio "github.com/cosmos/gogoproto/io"
"github.com/cosmos/iavl"
ics23 "github.com/cosmos/ics23/go"

log "cosmossdk.io/log"
"cosmossdk.io/store/v2"
snapshottypes "cosmossdk.io/store/v2/snapshots/types"
)

var _ store.Committer = (*IavlTree)(nil)

var _ snapshottypes.CommitSnapshotter = (*IavlTree)(nil)

// IavlTree is a wrapper around iavl.MutableTree.
type IavlTree struct {
tree *iavl.MutableTree

// storeKey is the identifier of the store.
storeKey string
}

// NewIavlTree creates a new IavlTree instance.
func NewIavlTree(db dbm.DB, logger log.Logger, cfg *Config) *IavlTree {
func NewIavlTree(db dbm.DB, logger log.Logger, storeKey string, cfg *Config) *IavlTree {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought there was discussion around abstracting storekey away? does this change the direction of that conversation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the expectation is that dbm.DB is a PrefixDb (for iavl v0 & v1) so the store key prefix is already baked into that. I guess storekey is required for the snapshot API? (still reading this PR).

Copy link
Member

@kocubinski kocubinski Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

db dbm.DB is not required or used in iavl v2 (since it's an interface to an underlying generic KV store). Maybe we should fold this into Config if we want to support multiple iavl versions in store v2?

tree := iavl.NewMutableTree(db, cfg.CacheSize, cfg.SkipFastStorageUpgrade, logger)
return &IavlTree{
tree: tree,
tree: tree,
storeKey: storeKey,
}
tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -83,6 +94,142 @@ func (t *IavlTree) Prune(version uint64) error {
return t.tree.DeleteVersionsTo(int64(version))
}

// Snapshot implements snapshottypes.CommitSnapshotter.
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
func (t *IavlTree) Snapshot(version uint64, protoWriter protoio.Writer) error {
if version == 0 {
return fmt.Errorf("the snapshot version must be greater than 0")
}

latestVersion := t.GetLatestVersion()
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
if version > latestVersion {
return fmt.Errorf("the snapshot version %d is greater than the latest version %d", version, latestVersion)
}

tree, err := t.tree.GetImmutable(int64(version))
if err != nil {
return fmt.Errorf("failed to get immutable tree for version %d: %w", version, err)
}

exporter, err := tree.Export()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we please call this itr or iter instead of exporter? exporter makes it seem like its purpose is to write out content, but really the purpose is to iterate over the exported content.

tac0turtle marked this conversation as resolved.
Show resolved Hide resolved
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("failed to export tree for version %d: %w", version, err)
}

cool-develope marked this conversation as resolved.
Show resolved Hide resolved
defer exporter.Close()

err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_Store{
Store: &snapshottypes.SnapshotStoreItem{
Name: t.storeKey,
},
},
})
if err != nil {
return fmt.Errorf("failed to write store name: %w", err)
}

for {
node, err := exporter.Next()
if errors.Is(err, iavl.ErrorExportDone) {
break
} else if err != nil {
return fmt.Errorf("failed to get the next export node: %w", err)
}

if err = protoWriter.WriteMsg(&snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_IAVL{
IAVL: &snapshottypes.SnapshotIAVLItem{
Key: node.Key,
Value: node.Value,
Height: int32(node.Height),
Version: node.Version,
},
},
}); err != nil {
return fmt.Errorf("failed to write iavl node: %w", err)
}
}

return nil
}

// Restore implements snapshottypes.CommitSnapshotter.
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
func (t *IavlTree) Restore(version uint64, format uint32, protoReader protoio.Reader, chStorage chan<- *store.KVPair) (snapshottypes.SnapshotItem, error) {
var (
importer *iavl.Importer
snapshotItem snapshottypes.SnapshotItem
)

loop:
for {
snapshotItem = snapshottypes.SnapshotItem{}
err := protoReader.ReadMsg(&snapshotItem)
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return snapshottypes.SnapshotItem{}, fmt.Errorf("invalid protobuf message: %w", err)
}

switch item := snapshotItem.Item.(type) {
case *snapshottypes.SnapshotItem_Store:
t.storeKey = item.Store.Name
importer, err = t.tree.Import(int64(version))
if err != nil {
return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to import tree for version %d: %w", version, err)
}
defer importer.Close()
cool-develope marked this conversation as resolved.
Show resolved Hide resolved

case *snapshottypes.SnapshotItem_IAVL:
if importer == nil {
return snapshottypes.SnapshotItem{}, fmt.Errorf("received IAVL node item before store item")
}
if item.IAVL.Height > int32(math.MaxInt8) {
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
return snapshottypes.SnapshotItem{}, fmt.Errorf("node height %v cannot exceed %v",
item.IAVL.Height, math.MaxInt8)
}
node := &iavl.ExportNode{
Key: item.IAVL.Key,
Value: item.IAVL.Value,
Height: int8(item.IAVL.Height),
Version: item.IAVL.Version,
}
// Protobuf does not differentiate between []byte{} and nil, but fortunately IAVL does
// not allow nil keys nor nil values for leaf nodes, so we can always set them to empty.
if node.Key == nil {
node.Key = []byte{}
}
if node.Height == 0 {
if node.Value == nil {
node.Value = []byte{}
}
// If the node is a leaf node, it will be written to the storage.
chStorage <- &store.KVPair{
StoreKey: t.storeKey,
Key: node.Key,
Value: node.Value,
}
}
err := importer.Add(node)
if err != nil {
return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to add node to importer: %w", err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err := import.Add(node); err != nil {
   return si, fmt.Errorf("failed to add node to impoorter: %w", err)
}

default:
break loop
}
}

if importer != nil {
err := importer.Commit()
if err != nil {
return snapshottypes.SnapshotItem{}, fmt.Errorf("failed to commit importer: %w", err)
}
cool-develope marked this conversation as resolved.
Show resolved Hide resolved
}

_, err := t.tree.LoadVersion(int64(version))

return snapshotItem, err
}

// Close closes the iavl tree.
func (t *IavlTree) Close() error {
return nil
Expand Down
68 changes: 66 additions & 2 deletions store/commitment/iavl/tree_test.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package iavl

import (
"fmt"
"io"
"testing"

dbm "github.com/cosmos/cosmos-db"
"github.com/stretchr/testify/require"

"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/snapshots"
snapshottypes "cosmossdk.io/store/v2/snapshots/types"
)

func generateTree(treeType string) *IavlTree {
func generateTree(storeKey string) *IavlTree {
cfg := DefaultConfig()
db := dbm.NewMemDB()
return NewIavlTree(db, log.NewNopLogger(), cfg)
return NewIavlTree(db, log.NewNopLogger(), storeKey, cfg)
}

func TestIavlTree(t *testing.T) {
Expand Down Expand Up @@ -90,3 +94,63 @@ func TestIavlTree(t *testing.T) {
// close the db
require.NoError(t, tree.Close())
}

func TestSnapshotter(t *testing.T) {
// generate a new tree
storeKey := "store"
tree := generateTree(storeKey)
require.NotNil(t, tree)

latestVersion := uint64(10)
kvCount := 10
for i := uint64(1); i <= latestVersion; i++ {
cs := store.NewChangeset()
for j := 0; j < kvCount; j++ {
key := []byte(fmt.Sprintf("key-%d-%d", i, j))
value := []byte(fmt.Sprintf("value-%d-%d", i, j))
cs.Add(key, value)
}
err := tree.WriteBatch(cs)
require.NoError(t, err)

_, err = tree.Commit()
require.NoError(t, err)
}

latestHash := tree.WorkingHash()

// create a snapshot
dummyExtensionItem := snapshottypes.SnapshotItem{
Item: &snapshottypes.SnapshotItem_Extension{
Extension: &snapshottypes.SnapshotExtensionMeta{
Name: "test",
Format: 1,
},
},
}
target := generateTree("")
chunks := make(chan io.ReadCloser, kvCount*int(latestVersion))
go func() {
streamWriter := snapshots.NewStreamWriter(chunks)
require.NotNil(t, streamWriter)
defer streamWriter.Close()
err := tree.Snapshot(latestVersion, streamWriter)
require.NoError(t, err)
// write an extension metadata
err = streamWriter.WriteMsg(&dummyExtensionItem)
require.NoError(t, err)
}()

streamReader, err := snapshots.NewStreamReader(chunks)
chStorage := make(chan *store.KVPair, 100)
require.NoError(t, err)
nextItem, err := target.Restore(latestVersion, snapshottypes.CurrentFormat, streamReader, chStorage)
require.NoError(t, err)
require.Equal(t, *dummyExtensionItem.GetExtension(), *nextItem.GetExtension())

// check the store key
require.Equal(t, storeKey, target.storeKey)

// check the restored tree hash
require.Equal(t, latestHash, target.WorkingHash())
}
6 changes: 2 additions & 4 deletions store/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ require (
github.com/cosmos/gogoproto v1.4.11
github.com/cosmos/iavl v1.0.0
github.com/cosmos/ics23/go v0.10.0
github.com/mattn/go-sqlite3 v1.14.17
github.com/linxGnu/grocksdb v1.8.5
github.com/mattn/go-sqlite3 v1.14.17
github.com/stretchr/testify v1.8.4
github.com/tidwall/btree v1.7.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
Expand Down Expand Up @@ -56,13 +56,11 @@ require (
github.com/sasha-s/go-deadlock v0.3.1 // indirect
github.com/spf13/cast v1.5.1 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.4.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion store/pruning/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (s *PruningTestSuite) SetupTest() {
ss, err := sqlite.New(s.T().TempDir())
s.Require().NoError(err)

sc := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig())
sc := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, "", iavl.DefaultConfig())

s.manager = NewManager(noopLog, ss, sc)
s.ss = ss
Expand Down
2 changes: 1 addition & 1 deletion store/root/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (s *RootStoreTestSuite) SetupTest() {
ss, err := sqlite.New(s.T().TempDir())
s.Require().NoError(err)

sc := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, iavl.DefaultConfig())
sc := iavl.NewIavlTree(dbm.NewMemDB(), noopLog, defaultStoreKey, iavl.DefaultConfig())

rs, err := New(noopLog, 1, ss, sc)
s.Require().NoError(err)
Expand Down
Loading
Loading