From 939aa77843dbbcde18966a3627161c87cac28d5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Zdyba=C5=82?= Date: Mon, 31 Jan 2022 20:30:29 +0100 Subject: [PATCH] Replace tm-db dependency with store package (#268) * replace tm-db with store in indexer * add prefix iteration functionality to store --- CHANGELOG-PENDING.md | 1 + go.mod | 2 +- node/node.go | 59 +++++++++------------------ state/indexer/block/kv/kv.go | 49 ++++++++++------------ state/indexer/block/kv/kv_test.go | 9 ++-- state/txindex/indexer_service_test.go | 14 +++---- state/txindex/kv/kv.go | 49 +++++++++------------- state/txindex/kv/kv_bench_test.go | 6 +-- state/txindex/kv/kv_test.go | 22 +++++----- store/badger.go | 51 +++++++++++++++++++++++ store/kv.go | 29 +++++++++---- store/prefix.go | 4 ++ 12 files changed, 162 insertions(+), 133 deletions(-) diff --git a/CHANGELOG-PENDING.md b/CHANGELOG-PENDING.md index 33f4916c556..b293e4aba66 100644 --- a/CHANGELOG-PENDING.md +++ b/CHANGELOG-PENDING.md @@ -20,6 +20,7 @@ Month, DD, YYYY - [rpc] [Implement NumUnconfirmedTxs #255](https://github.com/celestiaorg/optimint/pull/255) [@tzdybal](https://github.com/tzdybal/) - [rpc] [Implement BlockByHash #256](https://github.com/celestiaorg/optimint/pull/256) [@mauriceLC92](https://github.com/mauriceLC92) - [rpc] [Implement BlockResults #263](https://github.com/celestiaorg/optimint/pull/263) [@tzdybal](https://github.com/tzdybal/) +- [store,indexer] [Replace tm-db dependency with store package #268](https://github.com/celestiaorg/optimint/pull/268) [@tzdybal](https://github.com/tzdybal/) ### BUG FIXES diff --git a/go.mod b/go.mod index edb55c0b247..8bc0b855864 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,6 @@ require ( github.com/spf13/viper v1.10.1 github.com/stretchr/testify v1.7.0 github.com/tendermint/tendermint v0.34.14 - github.com/tendermint/tm-db v0.6.6 go.uber.org/multierr v1.7.0 golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b google.golang.org/grpc v1.43.0 @@ -156,6 +155,7 @@ require ( github.com/subosito/gotenv v1.2.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca // indirect github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c // indirect + github.com/tendermint/tm-db v0.6.6 // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect diff --git a/node/node.go b/node/node.go index 959abe7820e..a477c82f436 100644 --- a/node/node.go +++ b/node/node.go @@ -5,13 +5,9 @@ import ( "errors" "fmt" - "github.com/celestiaorg/optimint/block" - - "github.com/celestiaorg/optimint/state/indexer" - blockidxkv "github.com/celestiaorg/optimint/state/indexer/block/kv" - "github.com/celestiaorg/optimint/state/txindex" - "github.com/celestiaorg/optimint/state/txindex/kv" "github.com/libp2p/go-libp2p-core/crypto" + "go.uber.org/multierr" + abci "github.com/tendermint/tendermint/abci/types" llcfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/libs/log" @@ -19,30 +15,26 @@ import ( corep2p "github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/proxy" tmtypes "github.com/tendermint/tendermint/types" - "go.uber.org/multierr" - - dbm "github.com/tendermint/tm-db" + "github.com/celestiaorg/optimint/block" "github.com/celestiaorg/optimint/config" "github.com/celestiaorg/optimint/da" "github.com/celestiaorg/optimint/da/registry" "github.com/celestiaorg/optimint/mempool" "github.com/celestiaorg/optimint/p2p" + "github.com/celestiaorg/optimint/state/indexer" + blockidxkv "github.com/celestiaorg/optimint/state/indexer/block/kv" + "github.com/celestiaorg/optimint/state/txindex" + "github.com/celestiaorg/optimint/state/txindex/kv" "github.com/celestiaorg/optimint/store" "github.com/celestiaorg/optimint/types" ) -type DBContext struct { - ID string - Config *config.NodeConfig -} - -type DBProvider func(*DBContext) (dbm.DB, error) - // prefixes used in KV store to separate main node data from DALC data var ( - mainPrefix = []byte{0} - dalcPrefix = []byte{1} + mainPrefix = []byte{0} + dalcPrefix = []byte{1} + indexerPrefix = []byte{2} ) // Node represents a client node in Optimint network. @@ -89,11 +81,6 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey return nil, err } - indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(conf, DefaultDBProvider, eventBus, logger) - if err != nil { - return nil, err - } - client, err := p2p.NewClient(conf.P2P, nodeKey, genesis.ChainID, logger.With("module", "p2p")) if err != nil { return nil, err @@ -108,6 +95,7 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey } mainKV := store.NewPrefixKV(baseKV, mainPrefix) dalcKV := store.NewPrefixKV(baseKV, dalcPrefix) + indexerKV := store.NewPrefixKV(baseKV, indexerPrefix) s := store.New(mainKV) @@ -120,6 +108,11 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey return nil, fmt.Errorf("data availability layer client initialization error: %w", err) } + indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(conf, indexerKV, eventBus, logger) + if err != nil { + return nil, err + } + mp := mempool.NewCListMempool(llcfg.DefaultMempoolConfig(), proxyApp.Mempool(), 0) mpIDs := newMempoolIDs() @@ -283,7 +276,7 @@ func (n *Node) newHeaderValidator() p2p.GossipValidator { func createAndStartIndexerService( conf config.NodeConfig, - dbProvider DBProvider, + kvStore store.KVStore, eventBus *tmtypes.EventBus, logger log.Logger, ) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) { @@ -293,13 +286,8 @@ func createAndStartIndexerService( blockIndexer indexer.BlockIndexer ) - store, err := dbProvider(&DBContext{"tx_index", &conf}) - if err != nil { - return nil, nil, nil, err - } - - txIndexer = kv.NewTxIndex(store) - blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events"))) + txIndexer = kv.NewTxIndex(kvStore) + blockIndexer = blockidxkv.New(store.NewPrefixKV(kvStore, []byte("block_events"))) indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) indexerService.SetLogger(logger.With("module", "txindex")) @@ -310,12 +298,3 @@ func createAndStartIndexerService( return indexerService, txIndexer, blockIndexer, nil } - -func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) { - if ctx.Config.RootDir == "" && ctx.Config.DBPath == "" { // this is used for testing - dbType := dbm.MemDBBackend - return dbm.NewDB(ctx.ID, dbType, "memdb") - } - dbType := dbm.BadgerDBBackend - return dbm.NewDB(ctx.ID, dbType, ctx.Config.RootDir+ctx.Config.DBPath+"index") -} diff --git a/state/indexer/block/kv/kv.go b/state/indexer/block/kv/kv.go index e50f3cc47d9..74f59c882af 100644 --- a/state/indexer/block/kv/kv.go +++ b/state/indexer/block/kv/kv.go @@ -9,12 +9,13 @@ import ( "strings" "github.com/google/orderedcode" - dbm "github.com/tendermint/tm-db" - "github.com/celestiaorg/optimint/state/indexer" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" + + "github.com/celestiaorg/optimint/state/indexer" + "github.com/celestiaorg/optimint/store" ) var _ indexer.BlockIndexer = (*BlockerIndexer)(nil) @@ -23,10 +24,10 @@ var _ indexer.BlockIndexer = (*BlockerIndexer)(nil) // events with an underlying KV store. Block events are indexed by their height, // such that matching search criteria returns the respective block height(s). type BlockerIndexer struct { - store dbm.DB + store store.KVStore } -func New(store dbm.DB) *BlockerIndexer { +func New(store store.KVStore) *BlockerIndexer { return &BlockerIndexer{ store: store, } @@ -40,7 +41,11 @@ func (idx *BlockerIndexer) Has(height int64) (bool, error) { return false, fmt.Errorf("failed to create block height index key: %w", err) } - return idx.store.Has(key) + _, err = idx.store.Get(key) + if err == store.ErrKeyNotFound { + return false, nil + } + return err == nil, err } // Index indexes BeginBlock and EndBlock events for a given block by its height. @@ -51,7 +56,7 @@ func (idx *BlockerIndexer) Has(height int64) (bool, error) { // EndBlock events: encode(eventType.eventAttr|eventValue|height|end_block) => encode(height) func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockHeader) error { batch := idx.store.NewBatch() - defer batch.Close() + defer batch.Discard() height := bh.Header.Height @@ -74,7 +79,7 @@ func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockHeader) error { return fmt.Errorf("failed to index EndBlock events: %w", err) } - return batch.WriteSync() + return batch.Commit() } // Search performs a query for block heights that match a given BeginBlock @@ -239,11 +244,8 @@ func (idx *BlockerIndexer) matchRange( lowerBound := qr.LowerBoundValue() upperBound := qr.UpperBoundValue() - it, err := dbm.IteratePrefix(idx.store, startKey) - if err != nil { - return nil, fmt.Errorf("failed to create prefix iterator: %w", err) - } - defer it.Close() + it := idx.store.PrefixIterator(startKey) + defer it.Discard() LOOP: for ; it.Valid(); it.Next() { @@ -359,11 +361,8 @@ func (idx *BlockerIndexer) match( switch { case c.Op == query.OpEqual: - it, err := dbm.IteratePrefix(idx.store, startKeyBz) - if err != nil { - return nil, fmt.Errorf("failed to create prefix iterator: %w", err) - } - defer it.Close() + it := idx.store.PrefixIterator(startKeyBz) + defer it.Discard() for ; it.Valid(); it.Next() { tmpHeights[string(it.Value())] = it.Value() @@ -383,11 +382,8 @@ func (idx *BlockerIndexer) match( return nil, err } - it, err := dbm.IteratePrefix(idx.store, prefix) - if err != nil { - return nil, fmt.Errorf("failed to create prefix iterator: %w", err) - } - defer it.Close() + it := idx.store.PrefixIterator(prefix) + defer it.Discard() for ; it.Valid(); it.Next() { cont := true @@ -416,11 +412,8 @@ func (idx *BlockerIndexer) match( return nil, err } - it, err := dbm.IteratePrefix(idx.store, prefix) - if err != nil { - return nil, fmt.Errorf("failed to create prefix iterator: %w", err) - } - defer it.Close() + it := idx.store.PrefixIterator(prefix) + defer it.Discard() for ; it.Valid(); it.Next() { cont := true @@ -488,7 +481,7 @@ func (idx *BlockerIndexer) match( return filteredHeights, nil } -func (idx *BlockerIndexer) indexEvents(batch dbm.Batch, events []abci.Event, typ string, height int64) error { +func (idx *BlockerIndexer) indexEvents(batch store.Batch, events []abci.Event, typ string, height int64) error { heightBz := int64ToBytes(height) for _, event := range events { diff --git a/state/indexer/block/kv/kv_test.go b/state/indexer/block/kv/kv_test.go index 8dcaceecdba..44c83a1ee33 100644 --- a/state/indexer/block/kv/kv_test.go +++ b/state/indexer/block/kv/kv_test.go @@ -5,17 +5,18 @@ import ( "fmt" "testing" - blockidxkv "github.com/celestiaorg/optimint/state/indexer/block/kv" "github.com/stretchr/testify/require" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" - db "github.com/tendermint/tm-db" + + blockidxkv "github.com/celestiaorg/optimint/state/indexer/block/kv" + "github.com/celestiaorg/optimint/store" ) func TestBlockIndexer(t *testing.T) { - store := db.NewPrefixDB(db.NewMemDB(), []byte("block_events")) - indexer := blockidxkv.New(store) + prefixStore := store.NewPrefixKV(store.NewDefaultInMemoryKVStore(), []byte("block_events")) + indexer := blockidxkv.New(prefixStore) require.NoError(t, indexer.Index(types.EventDataNewBlockHeader{ Header: types.Header{Height: 1}, diff --git a/state/txindex/indexer_service_test.go b/state/txindex/indexer_service_test.go index 52a6a17f9a1..b1908846d39 100644 --- a/state/txindex/indexer_service_test.go +++ b/state/txindex/indexer_service_test.go @@ -5,14 +5,14 @@ import ( "time" "github.com/stretchr/testify/require" - db "github.com/tendermint/tm-db" + abci "github.com/tendermint/tendermint/abci/types" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/types" blockidxkv "github.com/celestiaorg/optimint/state/indexer/block/kv" "github.com/celestiaorg/optimint/state/txindex" "github.com/celestiaorg/optimint/state/txindex/kv" - abci "github.com/tendermint/tendermint/abci/types" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/types" + "github.com/celestiaorg/optimint/store" ) func TestIndexerServiceIndexesBlocks(t *testing.T) { @@ -28,9 +28,9 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { }) // tx indexer - store := db.NewMemDB() - txIndexer := kv.NewTxIndex(store) - blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events"))) + kvStore := store.NewDefaultInMemoryKVStore() + txIndexer := kv.NewTxIndex(kvStore) + blockIndexer := blockidxkv.New(store.NewPrefixKV(kvStore, []byte("block_events"))) service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus) service.SetLogger(log.TestingLogger()) diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index f162bd6ce2d..12a36cdd2db 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -9,13 +9,14 @@ import ( "strings" "github.com/gogo/protobuf/proto" - dbm "github.com/tendermint/tm-db" - "github.com/celestiaorg/optimint/state/indexer" - "github.com/celestiaorg/optimint/state/txindex" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" + + "github.com/celestiaorg/optimint/state/indexer" + "github.com/celestiaorg/optimint/state/txindex" + "github.com/celestiaorg/optimint/store" ) const ( @@ -26,11 +27,11 @@ var _ txindex.TxIndexer = (*TxIndex)(nil) // TxIndex is the simplest possible indexer, backed by key-value storage (levelDB). type TxIndex struct { - store dbm.DB + store store.KVStore } // NewTxIndex creates new KV indexer. -func NewTxIndex(store dbm.DB) *TxIndex { +func NewTxIndex(store store.KVStore) *TxIndex { return &TxIndex{ store: store, } @@ -66,7 +67,7 @@ func (txi *TxIndex) Get(hash []byte) (*abci.TxResult, error) { // Any event with an empty type is not indexed. func (txi *TxIndex) AddBatch(b *txindex.Batch) error { storeBatch := txi.store.NewBatch() - defer storeBatch.Close() + defer storeBatch.Discard() for _, result := range b.Ops { hash := types.Tx(result.Tx).Hash() @@ -94,7 +95,7 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error { } } - return storeBatch.WriteSync() + return storeBatch.Commit() } // Index indexes a single transaction using the given list of events. Each key @@ -103,7 +104,7 @@ func (txi *TxIndex) AddBatch(b *txindex.Batch) error { // Any event with an empty type is not indexed. func (txi *TxIndex) Index(result *abci.TxResult) error { b := txi.store.NewBatch() - defer b.Close() + defer b.Discard() hash := types.Tx(result.Tx).Hash() @@ -129,10 +130,10 @@ func (txi *TxIndex) Index(result *abci.TxResult) error { return err } - return b.WriteSync() + return b.Commit() } -func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store dbm.Batch) error { +func (txi *TxIndex) indexEvents(result *abci.TxResult, hash []byte, store store.Batch) error { for _, event := range result.Result.Events { // only index events with a non-empty type if len(event.Type) == 0 { @@ -318,11 +319,8 @@ func (txi *TxIndex) match( switch { case c.Op == query.OpEqual: - it, err := dbm.IteratePrefix(txi.store, startKeyBz) - if err != nil { - panic(err) - } - defer it.Close() + it := txi.store.PrefixIterator(startKeyBz) + defer it.Discard() for ; it.Valid(); it.Next() { cont := true @@ -347,11 +345,8 @@ func (txi *TxIndex) match( case c.Op == query.OpExists: // XXX: can't use startKeyBz here because c.Operand is nil // (e.g. "account.owner//" won't match w/ a single row) - it, err := dbm.IteratePrefix(txi.store, startKey(c.CompositeKey)) - if err != nil { - panic(err) - } - defer it.Close() + it := txi.store.PrefixIterator(startKey(c.CompositeKey)) + defer it.Discard() for ; it.Valid(); it.Next() { cont := true @@ -377,11 +372,8 @@ func (txi *TxIndex) match( // XXX: startKey does not apply here. // For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an" // we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/" - it, err := dbm.IteratePrefix(txi.store, startKey(c.CompositeKey)) - if err != nil { - panic(err) - } - defer it.Close() + it := txi.store.PrefixIterator(startKey(c.CompositeKey)) + defer it.Discard() for ; it.Valid(); it.Next() { cont := true @@ -469,11 +461,8 @@ func (txi *TxIndex) matchRange( lowerBound := qr.LowerBoundValue() upperBound := qr.UpperBoundValue() - it, err := dbm.IteratePrefix(txi.store, startKey) - if err != nil { - panic(err) - } - defer it.Close() + it := txi.store.PrefixIterator(startKey) + defer it.Discard() LOOP: for ; it.Valid(); it.Next() { diff --git a/state/txindex/kv/kv_bench_test.go b/state/txindex/kv/kv_bench_test.go index fdfe550f392..f757c0281be 100644 --- a/state/txindex/kv/kv_bench_test.go +++ b/state/txindex/kv/kv_bench_test.go @@ -7,11 +7,11 @@ import ( "io/ioutil" "testing" - dbm "github.com/tendermint/tm-db" - abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/types" + + "github.com/celestiaorg/optimint/store" ) func BenchmarkTxSearch(b *testing.B) { @@ -20,7 +20,7 @@ func BenchmarkTxSearch(b *testing.B) { b.Errorf("failed to create temporary directory: %s", err) } - db, err := dbm.NewGoLevelDB("benchmark_tx_search_test", dbDir) + db := store.NewDefaultKVStore(dbDir, "db", "benchmark_tx_search_test") if err != nil { b.Errorf("failed to create database: %s", err) } diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index 3cbe2ca3512..5578d2fea8e 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -11,17 +11,17 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - db "github.com/tendermint/tm-db" - - "github.com/celestiaorg/optimint/state/txindex" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/pubsub/query" tmrand "github.com/tendermint/tendermint/libs/rand" "github.com/tendermint/tendermint/types" + + "github.com/celestiaorg/optimint/state/txindex" + "github.com/celestiaorg/optimint/store" ) func TestTxIndex(t *testing.T) { - indexer := NewTxIndex(db.NewMemDB()) + indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) tx := types.Tx("HELLO WORLD") txResult := &abci.TxResult{ @@ -67,7 +67,7 @@ func TestTxIndex(t *testing.T) { } func TestTxSearch(t *testing.T) { - indexer := NewTxIndex(db.NewMemDB()) + indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) txResult := txResultWithEvents([]abci.Event{ {Type: "account", Attributes: []abci.EventAttribute{{Key: []byte("number"), Value: []byte("1"), Index: true}}}, @@ -141,7 +141,7 @@ func TestTxSearch(t *testing.T) { } func TestTxSearchWithCancelation(t *testing.T) { - indexer := NewTxIndex(db.NewMemDB()) + indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) txResult := txResultWithEvents([]abci.Event{ {Type: "account", Attributes: []abci.EventAttribute{{Key: []byte("number"), Value: []byte("1"), Index: true}}}, @@ -159,7 +159,7 @@ func TestTxSearchWithCancelation(t *testing.T) { } func TestTxSearchDeprecatedIndexing(t *testing.T) { - indexer := NewTxIndex(db.NewMemDB()) + indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) // index tx using events indexing (composite key) txResult1 := txResultWithEvents([]abci.Event{ @@ -193,7 +193,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { require.NoError(t, err) err = b.Set(hash2, rawBytes) require.NoError(t, err) - err = b.Write() + err = b.Commit() require.NoError(t, err) testCases := []struct { @@ -238,7 +238,7 @@ func TestTxSearchDeprecatedIndexing(t *testing.T) { } func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { - indexer := NewTxIndex(db.NewMemDB()) + indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) txResult := txResultWithEvents([]abci.Event{ {Type: "account", Attributes: []abci.EventAttribute{{Key: []byte("number"), Value: []byte("1"), Index: true}}}, @@ -260,7 +260,7 @@ func TestTxSearchOneTxWithMultipleSameTagsButDifferentValues(t *testing.T) { } func TestTxSearchMultipleTxs(t *testing.T) { - indexer := NewTxIndex(db.NewMemDB()) + indexer := NewTxIndex(store.NewDefaultInMemoryKVStore()) // indexed first, but bigger height (to test the order of transactions) txResult := txResultWithEvents([]abci.Event{ @@ -333,7 +333,7 @@ func benchmarkTxIndex(txsCount int64, b *testing.B) { require.NoError(b, err) defer os.RemoveAll(dir) - store, err := db.NewDB("tx_index", "goleveldb", dir) + store := store.NewDefaultKVStore(dir, "db", "tx_index") require.NoError(b, err) indexer := NewTxIndex(store) diff --git a/store/badger.go b/store/badger.go index 519eaa87d73..50ac8a7a78a 100644 --- a/store/badger.go +++ b/store/badger.go @@ -91,3 +91,54 @@ func (bb *BadgerBatch) Commit() error { func (bb *BadgerBatch) Discard() { bb.txn.Discard() } + +var _ Iterator = &BadgerIterator{} + +func (b *BadgerKV) PrefixIterator(prefix []byte) Iterator { + txn := b.db.NewTransaction(false) + iter := txn.NewIterator(badger.DefaultIteratorOptions) + iter.Seek(prefix) + return &BadgerIterator{ + txn: txn, + iter: iter, + prefix: prefix, + lastError: nil, + } +} + +// BadgerIterator encapsulates prefix iterator for badger kv store. +type BadgerIterator struct { + txn *badger.Txn + iter *badger.Iterator + prefix []byte + lastError error +} + +func (i *BadgerIterator) Valid() bool { + return i.iter.ValidForPrefix(i.prefix) +} + +func (i *BadgerIterator) Next() { + i.iter.Next() +} + +func (i *BadgerIterator) Key() []byte { + return i.iter.Item().Key() +} + +func (i *BadgerIterator) Value() []byte { + val, err := i.iter.Item().ValueCopy(nil) + if err != nil { + i.lastError = err + } + return val +} + +func (i *BadgerIterator) Error() error { + return i.lastError +} + +func (i *BadgerIterator) Discard() { + i.iter.Close() + i.txn.Discard() +} diff --git a/store/kv.go b/store/kv.go index 35931f1e722..f813d51b39c 100644 --- a/store/kv.go +++ b/store/kv.go @@ -10,18 +10,29 @@ import ( // // KVStore MUST be thread safe. type KVStore interface { - Get(key []byte) ([]byte, error) // Get gets the value for a key. - Set(key []byte, value []byte) error // Set updates the value for a key. - Delete(key []byte) error // Delete deletes a key. - NewBatch() Batch + Get(key []byte) ([]byte, error) // Get gets the value for a key. + Set(key []byte, value []byte) error // Set updates the value for a key. + Delete(key []byte) error // Delete deletes a key. + NewBatch() Batch // NewBatch creates a new batch. + PrefixIterator(prefix []byte) Iterator // PrefixIterator creates iterator to traverse given prefix. } -// Batch enables batching of transactions +// Batch enables batching of transactions. type Batch interface { - Set(key, value []byte) error // Accumulates KV entries in a transaction - Delete(key []byte) error // Deletes the given key - Commit() error // Commits the transaction - Discard() // Discards the transaction + Set(key, value []byte) error // Accumulates KV entries in a transaction. + Delete(key []byte) error // Deletes the given key. + Commit() error // Commits the transaction. + Discard() // Discards the transaction. +} + +// Iterator enables traversal over a given prefix. +type Iterator interface { + Valid() bool + Next() + Key() []byte + Value() []byte + Error() error + Discard() } // NewInMemoryKVStore builds KVStore that works in-memory (without accessing disk). diff --git a/store/prefix.go b/store/prefix.go index 829ddefcf5e..978dae6f215 100644 --- a/store/prefix.go +++ b/store/prefix.go @@ -33,6 +33,10 @@ func (p *PrefixKV) NewBatch() Batch { } } +func (p *PrefixKV) PrefixIterator(prefix []byte) Iterator { + return p.kv.PrefixIterator(append(p.prefix, prefix...)) +} + type PrefixKVBatch struct { b Batch prefix []byte