Skip to content

Commit

Permalink
Backport the psql indexer into v0.34.x (#6906)
Browse files Browse the repository at this point in the history
This change backports the PostgreSQL indexing sink, addressing part of #6828.

Development on the main branch has diverged substantially since the v0.34.x
release. It includes package moves, breaking API and protobuf schema changes,
and new APIs, all of which together have a large footprint on the mapping
between the implementation at tip and the v0.34 release branch.

To avoid the need to retrofit all of those improvements, this change works by
injecting the new indexing sink into the existing (v0.34) indexing interfaces
by delegation. This means the backport does _not_ pull in all the newer APIs
for event handling, and thus has minimal impact on existing code written
against the v0.34 package structure.

This change includes the test for the `psql` implementation, and thus updates
some Go module dependencies. Because it does not interact with any other types,
however, I did not add any unit tests to other packages in this change.

Related changes:
 * Update module dependencies for psql backport.
 * Update test data to be type-compatible with the old protobuf types.
 * Add config settings for the PostgreSQL indexer.
 * Clean up some linter settings.
 * Hook up the psql indexer in the node main.

Co-authored-by: M. J. Fromberger <fromberger@interchain.io>
  • Loading branch information
tnasu and M. J. Fromberger committed Jan 21, 2022
1 parent baba9e0 commit afff36b
Show file tree
Hide file tree
Showing 10 changed files with 852 additions and 923 deletions.
3 changes: 2 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ linters:
- dogsled
- dupl
- errcheck
- exportloopref
# - funlen
# - gochecknoglobals
# - gochecknoinits
Expand All @@ -26,7 +27,7 @@ linters:
# - maligned
- nakedret
- prealloc
- scopelint
# - scopelint
- staticcheck
- structcheck
- stylecheck
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,12 @@ type TxIndexConfig struct {
// 1) "null"
// 2) "kv" (default) - the simplest possible indexer,
// backed by key-value storage (defaults to levelDB; see DBBackend).
// 3) "psql" - the indexer services backed by PostgreSQL.
Indexer string `mapstructure:"indexer"`

// The PostgreSQL connection configuration, the connection format:
// postgresql://<user>:<password>@<host>:<port>/<db>?<opts>
PsqlConn string `mapstructure:"psql-conn"`
}

// DefaultTxIndexConfig returns a default configuration for the transaction indexer.
Expand Down
20 changes: 12 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,45 +3,49 @@ module github.com/line/ostracon
go 1.15

require (
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/BurntSushi/toml v0.3.1
github.com/ChainSafe/go-schnorrkel v0.0.0-20200405005733-88cbf1b4c40d
github.com/Microsoft/go-winio v0.5.0 // indirect
github.com/Workiva/go-datastructures v1.0.52
github.com/adlio/schema v1.1.13
github.com/btcsuite/btcd v0.21.0-beta
github.com/btcsuite/btcutil v1.0.2
github.com/confio/ics23/go v0.6.3
github.com/coniks-sys/coniks-go v0.0.0-20180722014011-11acf4819b71
github.com/containerd/continuity v0.1.0 // indirect
github.com/fortytw2/leaktest v1.3.0
github.com/go-kit/kit v0.10.0
github.com/go-logfmt/logfmt v0.5.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.4.3
github.com/golang/protobuf v1.5.0
github.com/google/orderedcode v0.0.1
github.com/gorilla/websocket v1.4.2
github.com/gtank/merlin v0.1.1
github.com/herumi/bls-eth-go-binary v0.0.0-20200923072303-32b29e5d8cbf
github.com/lib/pq v1.2.0
github.com/libp2p/go-buffer-pool v0.0.2
github.com/line/tm-db/v2 v2.0.0-init.1.0.20210824011847-fcfa67dd3c70
github.com/minio/highwayhash v1.0.1
github.com/pelletier/go-toml v1.6.0 // indirect
github.com/opencontainers/runc v1.0.2 // indirect
github.com/ory/dockertest v3.3.5+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.8.0
github.com/r2ishiguro/vrf v0.0.0-20180716233122-192de52975eb
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0
github.com/rs/cors v1.7.0
github.com/sasha-s/go-deadlock v0.2.1-0.20190427202633-1595213edefa
github.com/snikch/goodman v0.0.0-20171125024755-10e37e294daa
github.com/spf13/afero v1.2.2 // indirect
github.com/spf13/cobra v1.1.1
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.7.1
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.7.0
github.com/tendermint/go-amino v0.16.0
github.com/yahoo/coname v0.0.0-20170609175141-84592ddf8673 // indirect
golang.org/x/crypto v0.0.0-20201117144127-c1f2f97bffc9
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
gonum.org/v1/gonum v0.9.3
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f
golang.org/x/sys v0.0.0-20210903071746-97244b99971b // indirect
gonum.org/v1/gonum v0.8.2
google.golang.org/grpc v1.37.0
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
952 changes: 39 additions & 913 deletions go.sum

Large diffs are not rendered by default.

17 changes: 16 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/line/ostracon/state/indexer"
blockidxkv "github.com/line/ostracon/state/indexer/block/kv"
blockidxnull "github.com/line/ostracon/state/indexer/block/null"
"github.com/line/ostracon/state/indexer/sink/psql"
"github.com/line/ostracon/state/txindex"
"github.com/line/ostracon/state/txindex/kv"
"github.com/line/ostracon/state/txindex/null"
Expand Down Expand Up @@ -270,6 +271,7 @@ func createAndStartEventBus(logger log.Logger) (*types.EventBus, error) {

func createAndStartIndexerService(
config *cfg.Config,
chainID string,
dbProvider DBProvider,
eventBus *types.EventBus,
logger log.Logger,
Expand All @@ -289,6 +291,18 @@ func createAndStartIndexerService(

txIndexer = kv.NewTxIndex(store)
blockIndexer = blockidxkv.New(pdbm.NewDB(store, []byte("block_events")))

case "psql":
if config.TxIndex.PsqlConn == "" {
return nil, nil, nil, errors.New(`no psql-conn is set for the "psql" indexer`)
}
es, err := psql.NewEventSink(config.TxIndex.PsqlConn, chainID)
if err != nil {
return nil, nil, nil, fmt.Errorf("creating psql indexer: %w", err)
}
txIndexer = es.TxIndexer()
blockIndexer = es.BlockIndexer()

default:
txIndexer = &null.TxIndex{}
blockIndexer = &blockidxnull.BlockerIndexer{}
Expand Down Expand Up @@ -703,7 +717,8 @@ func NewNode(config *cfg.Config,
return nil, err
}

indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config, dbProvider, eventBus, logger)
indexerService, txIndexer, blockIndexer, err := createAndStartIndexerService(config,
genDoc.ChainID, dbProvider, eventBus, logger)
if err != nil {
return nil, err
}
Expand Down
88 changes: 88 additions & 0 deletions state/indexer/sink/psql/backport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package psql

// This file adds code to the psql package that is needed for integration with
// v0.34, but which is not part of the original implementation.
//
// In v0.35, ADR 65 was implemented in which the TxIndexer and BlockIndexer
// interfaces were merged into a hybrid EventSink interface. The Backport*
// types defined here bridge the psql EventSink (which was built in terms of
// the v0.35 interface) to the old interfaces.
//
// We took this narrower approach to backporting to avoid pulling in a much
// wider-reaching set of changes in v0.35 that would have broken several of the
// v0.34.x APIs. The result is sufficient to work with the node plumbing as it
// exists in the v0.34 branch.

import (
"context"
"errors"

abci "github.com/line/ostracon/abci/types"
"github.com/line/ostracon/libs/pubsub/query"
"github.com/line/ostracon/state/txindex"
"github.com/line/ostracon/types"
)

const (
eventTypeBeginBlock = "begin_block"
eventTypeEndBlock = "end_block"
)

// TxIndexer returns a bridge from es to the Tendermint v0.34 transaction indexer.
func (es *EventSink) TxIndexer() BackportTxIndexer {
return BackportTxIndexer{psql: es}
}

// BackportTxIndexer implements the txindex.TxIndexer interface by delegating
// indexing operations to an underlying PostgreSQL event sink.
type BackportTxIndexer struct{ psql *EventSink }

// AddBatch indexes a batch of transactions in Postgres, as part of TxIndexer.
func (b BackportTxIndexer) AddBatch(batch *txindex.Batch) error {
return b.psql.IndexTxEvents(batch.Ops)
}

// Index indexes a single transaction result in Postgres, as part of TxIndexer.
func (b BackportTxIndexer) Index(txr *abci.TxResult) error {
return b.psql.IndexTxEvents([]*abci.TxResult{txr})
}

// Get is implemented to satisfy the TxIndexer interface, but is not supported
// by the psql event sink and reports an error for all inputs.
func (BackportTxIndexer) Get([]byte) (*abci.TxResult, error) {
return nil, errors.New("the TxIndexer.Get method is not supported")
}

// Search is implemented to satisfy the TxIndexer interface, but it is not
// supported by the psql event sink and reports an error for all inputs.
func (BackportTxIndexer) Search(context.Context, *query.Query) ([]*abci.TxResult, error) {
return nil, errors.New("the TxIndexer.Search method is not supported")
}

// BlockIndexer returns a bridge that implements the Tendermint v0.34 block
// indexer interface, using the Postgres event sink as a backing store.
func (es *EventSink) BlockIndexer() BackportBlockIndexer {
return BackportBlockIndexer{psql: es}
}

// BackportBlockIndexer implements the indexer.BlockIndexer interface by
// delegating indexing operations to an underlying PostgreSQL event sink.
type BackportBlockIndexer struct{ psql *EventSink }

// Has is implemented to satisfy the BlockIndexer interface, but it is not
// supported by the psql event sink and reports an error for all inputs.
func (BackportBlockIndexer) Has(height int64) (bool, error) {
return false, errors.New("the BlockIndexer.Has method is not supported")
}

// Index indexes block begin and end events for the specified block. It is
// part of the BlockIndexer interface.
func (b BackportBlockIndexer) Index(block types.EventDataNewBlockHeader) error {
return b.psql.IndexBlockEvents(block)
}

// Search is implemented to satisfy the BlockIndexer interface, but it is not
// supported by the psql event sink and reports an error for all inputs.
func (BackportBlockIndexer) Search(context.Context, *query.Query) ([]int64, error) {
return nil, errors.New("the BlockIndexer.Search method is not supported")
}
11 changes: 11 additions & 0 deletions state/indexer/sink/psql/backport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package psql

import (
"github.com/line/ostracon/state/indexer"
"github.com/line/ostracon/state/txindex"
)

var (
_ indexer.BlockIndexer = BackportBlockIndexer{}
_ txindex.TxIndexer = BackportTxIndexer{}
)
Loading

0 comments on commit afff36b

Please sign in to comment.