Skip to content

Commit

Permalink
fix(server/v2/cometbft): fix mock store (#22293)
Browse files Browse the repository at this point in the history
(cherry picked from commit f01baf3)

# Conflicts:
#	store/v2/mock/types.go
#	store/v2/storage/README.md
#	store/v2/storage/store.go
  • Loading branch information
julienrbrt authored and mergify[bot] committed Oct 17, 2024
1 parent 0eac5e1 commit a597600
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 4 deletions.
8 changes: 4 additions & 4 deletions server/v2/cometbft/internal/mock/mock_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
)

type MockStore struct {
Storage storev2.VersionedDatabase
Storage storev2.VersionedWriter
Committer storev2.Committer
}

func NewMockStorage(logger log.Logger, dir string) storev2.VersionedDatabase {
func NewMockStorage(logger log.Logger, dir string) storev2.VersionedWriter {
storageDB, _ := sqlite.New(dir)
ss := storage.NewStorageStore(storageDB, logger)
return ss
Expand All @@ -36,7 +36,7 @@ func NewMockCommiter(logger log.Logger, actors ...string) storev2.Committer {
return sc
}

func NewMockStore(ss storev2.VersionedDatabase, sc storev2.Committer) *MockStore {
func NewMockStore(ss storev2.VersionedWriter, sc storev2.Committer) *MockStore {
return &MockStore{Storage: ss, Committer: sc}
}

Expand Down Expand Up @@ -83,7 +83,7 @@ func (s *MockStore) StateAt(version uint64) (corestore.ReaderMap, error) {
return NewMockReaderMap(version, s), nil
}

func (s *MockStore) GetStateStorage() storev2.VersionedDatabase {
func (s *MockStore) GetStateStorage() storev2.VersionedWriter {
return s.Storage
}

Expand Down
19 changes: 19 additions & 0 deletions store/v2/mock/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package mock

import "cosmossdk.io/store/v2"

Check failure on line 3 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / split-test-files

no required module provides package cosmossdk.io/store/v2; to add it:

Check failure on line 3 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

no required module provides package cosmossdk.io/store/v2; to add it:

Check failure on line 3 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

no required module provides package cosmossdk.io/store/v2; to add it:

Check failure on line 3 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

could not import cosmossdk.io/store/v2 (invalid package name: "")

// StateCommitter is a mock of store.Committer
type StateCommitter interface {
store.Committer

Check failure on line 7 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: store

Check failure on line 7 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: store (typecheck)
store.Pruner

Check failure on line 8 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: store

Check failure on line 8 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: store (typecheck)
store.PausablePruner

Check failure on line 9 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: store

Check failure on line 9 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: store (typecheck)
store.UpgradeableStore

Check failure on line 10 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: store

Check failure on line 10 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: store (typecheck)
}

// StateStorage is a mock of store.VersionedWriter
type StateStorage interface {
store.VersionedWriter

Check failure on line 15 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: store

Check failure on line 15 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: store (typecheck)
store.UpgradableDatabase

Check failure on line 16 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: store

Check failure on line 16 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: store (typecheck)
store.Pruner

Check failure on line 17 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: store

Check failure on line 17 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: store (typecheck)
store.PausablePruner

Check failure on line 18 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / dependency-review

undefined: store

Check failure on line 18 in store/v2/mock/types.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: store (typecheck)
}
107 changes: 107 additions & 0 deletions store/v2/storage/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# State Storage (SS)

The `storage` package contains the state storage (SS) implementation. Specifically,
it contains RocksDB, PebbleDB, and SQLite (Btree) backend implementations of the
`VersionedWriter` interface.

The goal of SS is to provide a modular storage backend, i.e. multiple implementations,
to facilitate storing versioned raw key/value pairs in a fast embedded database,
although an embedded database is not required, i.e. you could use a replicated
RDBMS system.

The responsibility and functions of SS include the following:

* Provide fast and efficient queries for versioned raw key/value pairs
* Provide versioned CRUD operations
* Provide versioned batching functionality
* Provide versioned iteration (forward and reverse) functionality
* Provide pruning functionality

All of the functionality provided by an SS backend should work under a versioned
scheme, i.e. a user should be able to get, store, and iterate over keys for the
latest and historical versions efficiently.

## Backends

### RocksDB

The RocksDB implementation is a CGO-based SS implementation. It fully supports
the `VersionedWriter` API and is arguably the most efficient implementation. It
also supports versioning out-of-the-box using User-defined Timestamps in
ColumnFamilies (CF). However, it requires the CGO dependency which can complicate
an app’s build process.

### PebbleDB

The PebbleDB implementation is a native Go SS implementation that is primarily an
alternative to RocksDB. Since it does not support CF, results in the fact that we
need to implement versioning (MVCC) ourselves. This comes with added implementation
complexity and potential performance overhead. However, it is a pure Go implementation
and does not require CGO.

### SQLite (Btree)

The SQLite implementation is another CGO-based SS implementation. It fully supports
the `VersionedWriter` API. The implementation is relatively straightforward and
easy to understand as it’s entirely SQL-based. However, benchmarks show that this
options is least performant, even for reads. This SS backend has a lot of promise,
but needs more benchmarking and potential SQL optimizations, like dedicated tables
for certain aspects of state, e.g. latest state, to be extremely performant.

## Benchmarks

Benchmarks for basic operations on all supported native SS implementations can
be found in `store/storage/storage_bench_test.go`.

At the time of writing, the following benchmarks were performed:

```shell
name time/op
Get/backend_rocksdb_versiondb_opts-10 7.41µs ± 0%
Get/backend_pebbledb_default_opts-10 6.17µs ± 0%
Get/backend_btree_sqlite-10 29.1µs ± 0%
ApplyChangeset/backend_pebbledb_default_opts-10 5.73ms ± 0%
ApplyChangeset/backend_btree_sqlite-10 56.9ms ± 0%
ApplyChangeset/backend_rocksdb_versiondb_opts-10 4.07ms ± 0%
Iterate/backend_pebbledb_default_opts-10 1.04s ± 0%
Iterate/backend_btree_sqlite-10 1.59s ± 0%
Iterate/backend_rocksdb_versiondb_opts-10 778ms ± 0%
```

## Pruning

Pruning is the process of efficiently managing and removing outdated or redundant
data from the State Storage (SS). To facilitate this, the SS backend must implement
the `Pruner` interface, allowing the `PruningManager` to execute data pruning operations
according to the specified `PruningOption`.

## State Sync

State storage (SS) does not have a direct notion of state sync. Rather, `snapshots.Manager`
is responsible for creating and restoring snapshots of the entire state. The
`snapshots.Manager` has a `StorageSnapshotter` field which is fulfilled by the
`StorageStore` type, specifically it implements the `Restore` method. The `Restore`
method reads off of a provided channel and writes key/value pairs directly to a
batch object which is committed to the underlying SS engine.

## Non-Consensus Data

<!-- TODO -->

## Usage

An SS backend is meant to be used within a broader store implementation, as it
only stores data for direct and historical query purposes. We define a `Database`
interface in the `storage` package which is mean to be represent a `VersionedWriter`
with only the necessary methods. The `StorageStore` interface is meant to wrap or
accept this `Database` type, e.g. RocksDB.

The `StorageStore` interface is an abstraction or wrapper around the backing SS
engine can be seen as the main entry point to using SS.

Higher up the stack, there should exist a `root.Store` implementation. The `root.Store`
is meant to encapsulate both an SS backend and an SC backend. The SS backend is
defined by this `StorageStore` implementation.

In short, initialize your SS engine of choice and then provide that to `NewStorageStore`
which will further be provided to `root.Store` as the SS backend.
161 changes: 161 additions & 0 deletions store/v2/storage/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package storage

import (
"errors"
"fmt"

"cosmossdk.io/core/log"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/snapshots"

Check failure on line 10 in store/v2/storage/store.go

View workflow job for this annotation

GitHub Actions / split-test-files

no required module provides package cosmossdk.io/store/v2/snapshots; to add it:

Check failure on line 10 in store/v2/storage/store.go

View workflow job for this annotation

GitHub Actions / dependency-review

no required module provides package cosmossdk.io/store/v2/snapshots; to add it:
)

const (
// TODO: it is a random number, need to be tuned
defaultBatchBufferSize = 100000
)

var (
_ store.VersionedWriter = (*StorageStore)(nil)

Check failure on line 19 in store/v2/storage/store.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: store (typecheck)
_ snapshots.StorageSnapshotter = (*StorageStore)(nil)
_ store.Pruner = (*StorageStore)(nil)
_ store.UpgradableDatabase = (*StorageStore)(nil)
)

// StorageStore is a wrapper around the store.VersionedWriter interface.
type StorageStore struct {
logger log.Logger
db Database

Check failure on line 28 in store/v2/storage/store.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: Database (typecheck)
}

// NewStorageStore returns a reference to a new StorageStore.
func NewStorageStore(db Database, logger log.Logger) *StorageStore {
return &StorageStore{
logger: logger,
db: db,
}
}

// Has returns true if the key exists in the store.
func (ss *StorageStore) Has(storeKey []byte, version uint64, key []byte) (bool, error) {
return ss.db.Has(storeKey, version, key)
}

// Get returns the value associated with the given key.
func (ss *StorageStore) Get(storeKey []byte, version uint64, key []byte) ([]byte, error) {
return ss.db.Get(storeKey, version, key)
}

// ApplyChangeset applies the given changeset to the storage.
func (ss *StorageStore) ApplyChangeset(version uint64, cs *corestore.Changeset) error {
b, err := ss.db.NewBatch(version)
if err != nil {
return err
}

for _, pairs := range cs.Changes {
for _, kvPair := range pairs.StateChanges {
if kvPair.Remove {
if err := b.Delete(pairs.Actor, kvPair.Key); err != nil {
return err
}
} else {
if err := b.Set(pairs.Actor, kvPair.Key, kvPair.Value); err != nil {
return err
}
}
}
}

if err := b.Write(); err != nil {
return err
}

return nil
}

// GetLatestVersion returns the latest version of the store.
func (ss *StorageStore) GetLatestVersion() (uint64, error) {
return ss.db.GetLatestVersion()
}

// SetLatestVersion sets the latest version of the store.
func (ss *StorageStore) SetLatestVersion(version uint64) error {
return ss.db.SetLatestVersion(version)
}

// VersionExists returns true if the given version exists in the store.
func (ss *StorageStore) VersionExists(version uint64) (bool, error) {
return ss.db.VersionExists(version)
}

// Iterator returns an iterator over the specified domain and prefix.
func (ss *StorageStore) Iterator(storeKey []byte, version uint64, start, end []byte) (corestore.Iterator, error) {
return ss.db.Iterator(storeKey, version, start, end)
}

// ReverseIterator returns an iterator over the specified domain and prefix in reverse.
func (ss *StorageStore) ReverseIterator(storeKey []byte, version uint64, start, end []byte) (corestore.Iterator, error) {
return ss.db.ReverseIterator(storeKey, version, start, end)
}

// Prune prunes the store up to the given version.
func (ss *StorageStore) Prune(version uint64) error {
return ss.db.Prune(version)
}

// Restore restores the store from the given channel.
func (ss *StorageStore) Restore(version uint64, chStorage <-chan *corestore.StateChanges) error {
latestVersion, err := ss.db.GetLatestVersion()
if err != nil {
return fmt.Errorf("failed to get latest version: %w", err)
}
if version <= latestVersion {
return fmt.Errorf("the snapshot version %d is not greater than latest version %d", version, latestVersion)
}

b, err := ss.db.NewBatch(version)
if err != nil {
return err
}

for kvPair := range chStorage {
for _, kv := range kvPair.StateChanges {
if err := b.Set(kvPair.Actor, kv.Key, kv.Value); err != nil {
return err
}
if b.Size() > defaultBatchBufferSize {
if err := b.Write(); err != nil {
return err
}
if err := b.Reset(); err != nil {
return err
}
}
}
}

if b.Size() > 0 {
if err := b.Write(); err != nil {
return err
}
}

return nil
}

// PruneStoreKeys prunes the store keys which implements the store.UpgradableDatabase
// interface.
func (ss *StorageStore) PruneStoreKeys(storeKeys []string, version uint64) error {
gdb, ok := ss.db.(store.UpgradableDatabase)
if !ok {
return errors.New("db does not implement UpgradableDatabase interface")
}

return gdb.PruneStoreKeys(storeKeys, version)
}

// Close closes the store.
func (ss *StorageStore) Close() error {
return ss.db.Close()
}

0 comments on commit a597600

Please sign in to comment.