Skip to content

Commit

Permalink
Replace tm-db dependency with store package (cosmos#268)
Browse files Browse the repository at this point in the history
* replace tm-db with store in indexer
* add prefix iteration functionality to store
  • Loading branch information
tzdybal authored Jan 31, 2022
1 parent 48d3f30 commit 939aa77
Show file tree
Hide file tree
Showing 12 changed files with 162 additions and 133 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Month, DD, YYYY
- [rpc] [Implement NumUnconfirmedTxs #255](https://github.com/celestiaorg/optimint/pull/255) [@tzdybal](https://github.com/tzdybal/)
- [rpc] [Implement BlockByHash #256](https://github.com/celestiaorg/optimint/pull/256) [@mauriceLC92](https://github.com/mauriceLC92)
- [rpc] [Implement BlockResults #263](https://github.com/celestiaorg/optimint/pull/263) [@tzdybal](https://github.com/tzdybal/)
- [store,indexer] [Replace tm-db dependency with store package #268](https://github.com/celestiaorg/optimint/pull/268) [@tzdybal](https://github.com/tzdybal/)

### BUG FIXES

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ require (
github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.7.0
github.com/tendermint/tendermint v0.34.14
github.com/tendermint/tm-db v0.6.6
go.uber.org/multierr v1.7.0
golang.org/x/net v0.0.0-20211005001312-d4b1ae081e3b
google.golang.org/grpc v1.43.0
Expand Down Expand Up @@ -156,6 +155,7 @@ require (
github.com/subosito/gotenv v1.2.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca // indirect
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c // indirect
github.com/tendermint/tm-db v0.6.6 // indirect
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 // indirect
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect
Expand Down
59 changes: 19 additions & 40 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,36 @@ import (
"errors"
"fmt"

"github.com/celestiaorg/optimint/block"

"github.com/celestiaorg/optimint/state/indexer"
blockidxkv "github.com/celestiaorg/optimint/state/indexer/block/kv"
"github.com/celestiaorg/optimint/state/txindex"
"github.com/celestiaorg/optimint/state/txindex/kv"
"github.com/libp2p/go-libp2p-core/crypto"
"go.uber.org/multierr"

abci "github.com/tendermint/tendermint/abci/types"
llcfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
corep2p "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
tmtypes "github.com/tendermint/tendermint/types"
"go.uber.org/multierr"

dbm "github.com/tendermint/tm-db"

"github.com/celestiaorg/optimint/block"
"github.com/celestiaorg/optimint/config"
"github.com/celestiaorg/optimint/da"
"github.com/celestiaorg/optimint/da/registry"
"github.com/celestiaorg/optimint/mempool"
"github.com/celestiaorg/optimint/p2p"
"github.com/celestiaorg/optimint/state/indexer"
blockidxkv "github.com/celestiaorg/optimint/state/indexer/block/kv"
"github.com/celestiaorg/optimint/state/txindex"
"github.com/celestiaorg/optimint/state/txindex/kv"
"github.com/celestiaorg/optimint/store"
"github.com/celestiaorg/optimint/types"
)

type DBContext struct {
ID string
Config *config.NodeConfig
}

type DBProvider func(*DBContext) (dbm.DB, error)

// prefixes used in KV store to separate main node data from DALC data
var (
mainPrefix = []byte{0}
dalcPrefix = []byte{1}
mainPrefix = []byte{0}
dalcPrefix = []byte{1}
indexerPrefix = []byte{2}
)

// Node represents a client node in Optimint network.
Expand Down Expand Up @@ -89,11 +81,6 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey
return nil, err
}

indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(conf, DefaultDBProvider, eventBus, logger)
if err != nil {
return nil, err
}

client, err := p2p.NewClient(conf.P2P, nodeKey, genesis.ChainID, logger.With("module", "p2p"))
if err != nil {
return nil, err
Expand All @@ -108,6 +95,7 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey
}
mainKV := store.NewPrefixKV(baseKV, mainPrefix)
dalcKV := store.NewPrefixKV(baseKV, dalcPrefix)
indexerKV := store.NewPrefixKV(baseKV, indexerPrefix)

s := store.New(mainKV)

Expand All @@ -120,6 +108,11 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey
return nil, fmt.Errorf("data availability layer client initialization error: %w", err)
}

indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(conf, indexerKV, eventBus, logger)
if err != nil {
return nil, err
}

mp := mempool.NewCListMempool(llcfg.DefaultMempoolConfig(), proxyApp.Mempool(), 0)
mpIDs := newMempoolIDs()

Expand Down Expand Up @@ -283,7 +276,7 @@ func (n *Node) newHeaderValidator() p2p.GossipValidator {

func createAndStartIndexerService(
conf config.NodeConfig,
dbProvider DBProvider,
kvStore store.KVStore,
eventBus *tmtypes.EventBus,
logger log.Logger,
) (*txindex.IndexerService, txindex.TxIndexer, indexer.BlockIndexer, error) {
Expand All @@ -293,13 +286,8 @@ func createAndStartIndexerService(
blockIndexer indexer.BlockIndexer
)

store, err := dbProvider(&DBContext{"tx_index", &conf})
if err != nil {
return nil, nil, nil, err
}

txIndexer = kv.NewTxIndex(store)
blockIndexer = blockidxkv.New(dbm.NewPrefixDB(store, []byte("block_events")))
txIndexer = kv.NewTxIndex(kvStore)
blockIndexer = blockidxkv.New(store.NewPrefixKV(kvStore, []byte("block_events")))

indexerService := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
Expand All @@ -310,12 +298,3 @@ func createAndStartIndexerService(

return indexerService, txIndexer, blockIndexer, nil
}

func DefaultDBProvider(ctx *DBContext) (dbm.DB, error) {
if ctx.Config.RootDir == "" && ctx.Config.DBPath == "" { // this is used for testing
dbType := dbm.MemDBBackend
return dbm.NewDB(ctx.ID, dbType, "memdb")
}
dbType := dbm.BadgerDBBackend
return dbm.NewDB(ctx.ID, dbType, ctx.Config.RootDir+ctx.Config.DBPath+"index")
}
49 changes: 21 additions & 28 deletions state/indexer/block/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ import (
"strings"

"github.com/google/orderedcode"
dbm "github.com/tendermint/tm-db"

"github.com/celestiaorg/optimint/state/indexer"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/optimint/state/indexer"
"github.com/celestiaorg/optimint/store"
)

var _ indexer.BlockIndexer = (*BlockerIndexer)(nil)
Expand All @@ -23,10 +24,10 @@ var _ indexer.BlockIndexer = (*BlockerIndexer)(nil)
// events with an underlying KV store. Block events are indexed by their height,
// such that matching search criteria returns the respective block height(s).
type BlockerIndexer struct {
store dbm.DB
store store.KVStore
}

func New(store dbm.DB) *BlockerIndexer {
func New(store store.KVStore) *BlockerIndexer {
return &BlockerIndexer{
store: store,
}
Expand All @@ -40,7 +41,11 @@ func (idx *BlockerIndexer) Has(height int64) (bool, error) {
return false, fmt.Errorf("failed to create block height index key: %w", err)
}

return idx.store.Has(key)
_, err = idx.store.Get(key)
if err == store.ErrKeyNotFound {
return false, nil
}
return err == nil, err
}

// Index indexes BeginBlock and EndBlock events for a given block by its height.
Expand All @@ -51,7 +56,7 @@ func (idx *BlockerIndexer) Has(height int64) (bool, error) {
// EndBlock events: encode(eventType.eventAttr|eventValue|height|end_block) => encode(height)
func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockHeader) error {
batch := idx.store.NewBatch()
defer batch.Close()
defer batch.Discard()

height := bh.Header.Height

Expand All @@ -74,7 +79,7 @@ func (idx *BlockerIndexer) Index(bh types.EventDataNewBlockHeader) error {
return fmt.Errorf("failed to index EndBlock events: %w", err)
}

return batch.WriteSync()
return batch.Commit()
}

// Search performs a query for block heights that match a given BeginBlock
Expand Down Expand Up @@ -239,11 +244,8 @@ func (idx *BlockerIndexer) matchRange(
lowerBound := qr.LowerBoundValue()
upperBound := qr.UpperBoundValue()

it, err := dbm.IteratePrefix(idx.store, startKey)
if err != nil {
return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
}
defer it.Close()
it := idx.store.PrefixIterator(startKey)
defer it.Discard()

LOOP:
for ; it.Valid(); it.Next() {
Expand Down Expand Up @@ -359,11 +361,8 @@ func (idx *BlockerIndexer) match(

switch {
case c.Op == query.OpEqual:
it, err := dbm.IteratePrefix(idx.store, startKeyBz)
if err != nil {
return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
}
defer it.Close()
it := idx.store.PrefixIterator(startKeyBz)
defer it.Discard()

for ; it.Valid(); it.Next() {
tmpHeights[string(it.Value())] = it.Value()
Expand All @@ -383,11 +382,8 @@ func (idx *BlockerIndexer) match(
return nil, err
}

it, err := dbm.IteratePrefix(idx.store, prefix)
if err != nil {
return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
}
defer it.Close()
it := idx.store.PrefixIterator(prefix)
defer it.Discard()

for ; it.Valid(); it.Next() {
cont := true
Expand Down Expand Up @@ -416,11 +412,8 @@ func (idx *BlockerIndexer) match(
return nil, err
}

it, err := dbm.IteratePrefix(idx.store, prefix)
if err != nil {
return nil, fmt.Errorf("failed to create prefix iterator: %w", err)
}
defer it.Close()
it := idx.store.PrefixIterator(prefix)
defer it.Discard()

for ; it.Valid(); it.Next() {
cont := true
Expand Down Expand Up @@ -488,7 +481,7 @@ func (idx *BlockerIndexer) match(
return filteredHeights, nil
}

func (idx *BlockerIndexer) indexEvents(batch dbm.Batch, events []abci.Event, typ string, height int64) error {
func (idx *BlockerIndexer) indexEvents(batch store.Batch, events []abci.Event, typ string, height int64) error {
heightBz := int64ToBytes(height)

for _, event := range events {
Expand Down
9 changes: 5 additions & 4 deletions state/indexer/block/kv/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ import (
"fmt"
"testing"

blockidxkv "github.com/celestiaorg/optimint/state/indexer/block/kv"
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/types"
db "github.com/tendermint/tm-db"

blockidxkv "github.com/celestiaorg/optimint/state/indexer/block/kv"
"github.com/celestiaorg/optimint/store"
)

func TestBlockIndexer(t *testing.T) {
store := db.NewPrefixDB(db.NewMemDB(), []byte("block_events"))
indexer := blockidxkv.New(store)
prefixStore := store.NewPrefixKV(store.NewDefaultInMemoryKVStore(), []byte("block_events"))
indexer := blockidxkv.New(prefixStore)

require.NoError(t, indexer.Index(types.EventDataNewBlockHeader{
Header: types.Header{Height: 1},
Expand Down
14 changes: 7 additions & 7 deletions state/txindex/indexer_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"time"

"github.com/stretchr/testify/require"
db "github.com/tendermint/tm-db"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"

blockidxkv "github.com/celestiaorg/optimint/state/indexer/block/kv"
"github.com/celestiaorg/optimint/state/txindex"
"github.com/celestiaorg/optimint/state/txindex/kv"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
"github.com/celestiaorg/optimint/store"
)

func TestIndexerServiceIndexesBlocks(t *testing.T) {
Expand All @@ -28,9 +28,9 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
})

// tx indexer
store := db.NewMemDB()
txIndexer := kv.NewTxIndex(store)
blockIndexer := blockidxkv.New(db.NewPrefixDB(store, []byte("block_events")))
kvStore := store.NewDefaultInMemoryKVStore()
txIndexer := kv.NewTxIndex(kvStore)
blockIndexer := blockidxkv.New(store.NewPrefixKV(kvStore, []byte("block_events")))

service := txindex.NewIndexerService(txIndexer, blockIndexer, eventBus)
service.SetLogger(log.TestingLogger())
Expand Down
Loading

0 comments on commit 939aa77

Please sign in to comment.