Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Egaxhaj figure/adr 038 plugin system with release pio v0.45.x #64

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
b5372cc
patch adr-038 doc
i-norden Nov 29, 2021
1533238
plugin interfaces
i-norden Nov 29, 2021
bf41043
plugin loader/preloader
i-norden Nov 29, 2021
a34d829
plugin documentation
i-norden Nov 29, 2021
6348618
file writing plugin
i-norden Nov 29, 2021
eb76c54
baseapp pkg updates
i-norden Nov 29, 2021
66c69e5
simapp integration
i-norden Nov 29, 2021
1fec7c4
update CHANGELOG
i-norden Nov 29, 2021
06d0584
fixes
i-norden Nov 29, 2021
d5d7973
add trace plugin
Feb 3, 2022
4b7e019
update readme
Feb 3, 2022
59c1acb
updates trace plugin
Feb 4, 2022
d5d8987
fix trace test
Feb 4, 2022
0712565
fix reference
Feb 18, 2022
036ab56
remove unused params
Feb 4, 2022
eb970d7
add kafka plugin
Feb 4, 2022
41412c6
setup non-deterministic testing
Feb 4, 2022
eb49d6a
opt-in approach to enabling plugins
Feb 4, 2022
29295b6
synchronize work between kafka and app.Commit()
Feb 4, 2022
d52d1c6
remove dependency on ack channel and use listener error response to act
Feb 5, 2022
5766c8f
fromatting
Feb 11, 2022
a381c9e
concurrent listener calls
Feb 18, 2022
f636ae4
pass loaded appOpts
Feb 18, 2022
8e21ee4
async fire-and-forget when halt_app_on_delivery_error = false
Mar 2, 2022
8c4d736
updated comments for HaltAppOnDeliveryError
Mar 3, 2022
bc1fa9a
improve non-determinism tests for state listening
Mar 4, 2022
c9ae384
continue with testing when docker-compose returns error
Mar 10, 2022
fc98110
add fallback timer to kill indefinite running listener goroutines
Mar 15, 2022
2b62472
update comment
Mar 16, 2022
ac8633a
fix typo
Mar 16, 2022
c68e2cd
disable delivery report when in fire-and-forget mode
Mar 23, 2022
30ba641
code improvement
Mar 23, 2022
4b580e3
fix config param
Mar 29, 2022
a8ef9d4
remove fallback timer, long upgrades may trigger it
Mar 29, 2022
5c88705
add missing dependency from prev cherry-pick
Apr 5, 2022
5bf457e
use golang enum type for message key
Mar 30, 2022
0f86bb8
default to fire-and-forget for sim testing
Mar 30, 2022
6e9bb25
serialize to protobuf binary
Apr 5, 2022
99eb30b
remove global wait timout
Apr 11, 2022
3915566
update plugin docs
Apr 12, 2022
f8587aa
update ADR to reflect latest proposal
Apr 12, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ vagrant
# IDE
.idea
*.iml
*.ipr
*.iws
.dir-locals.el
.vscode

Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [Unreleased]

### Features

* [\10639](https://github.com/cosmos/cosmos-sdk/pull/10639) Plugin architecture for ADR-038 + FileStreamingService plugin.

# Provenance Specific releases

## [v0.45-pio-2](https://github.com/provenance-io/cosmos-sdk/releases/tag/v0.45-pio-2) - 2022-02-15
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ endif

.PHONY: run-tests test test-all $(TEST_TARGETS)

# Sim tests with state listening plugins enabled
include sim-state-listening.mk

test-sim-nondeterminism:
@echo "Running non-determinism test..."
@go test -mod=readonly $(SIMAPP) -run TestAppStateDeterminism -Enabled=true \
Expand Down
87 changes: 85 additions & 2 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"sort"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -196,6 +197,33 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
}
// set the signed validators for addition to context in deliverTx
app.voteInfos = req.LastCommitInfo.GetVotes()

// call the hooks with the BeginBlock messages
wg := new(sync.WaitGroup)
for _, streamingListener := range app.abciListeners {
streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines
if streamingListener.HaltAppOnDeliveryError() {
// increment the wait group counter
wg.Add(1)
go func() {
// decrement the counter when the go routine completes
defer wg.Done()
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
app.halt()
}
}()
} else {
go func() {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
}
}()
}
}
// wait for all the listener calls to finish
wg.Wait()

return res
}

Expand All @@ -216,6 +244,32 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
res.ConsensusParamUpdates = cp
}

// call the hooks with the BeginBlock messages
wg := new(sync.WaitGroup)
for _, streamingListener := range app.abciListeners {
streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines
if streamingListener.HaltAppOnDeliveryError() {
// increment the wait group counter
wg.Add(1)
go func() {
// decrement the counter when the go routine completes
defer wg.Done()
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
app.halt()
}
}()
} else {
go func() {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
}
}()
}
}
// wait for all the listener calls to finish
wg.Wait()

return res
}

Expand Down Expand Up @@ -266,26 +320,55 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
gInfo := sdk.GasInfo{}
resultStr := "successful"

var abciRes abci.ResponseDeliverTx
defer func() {
telemetry.IncrCounter(1, "tx", "count")
telemetry.IncrCounter(1, "tx", resultStr)
telemetry.SetGauge(float32(gInfo.GasUsed), "tx", "gas", "used")
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
// call the hooks with the BeginBlock messages
wg := new(sync.WaitGroup)
for _, streamingListener := range app.abciListeners {
streamingListener := streamingListener // https://go.dev/doc/faq#closures_and_goroutines
if streamingListener.HaltAppOnDeliveryError() {
// increment the wait group counter
wg.Add(1)
go func() {
// decrement the counter when the go routine completes
defer wg.Done()
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, abciRes); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
app.halt()
}
}()
} else {
go func() {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, abciRes); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
}
}()
}
}
// wait for all the listener calls to finish
wg.Wait()
}()

gInfo, result, anteEvents, _, err := app.runTx(runTxModeDeliver, req.Tx)
if err != nil {
resultStr = "failed"
return sdkerrors.ResponseDeliverTxWithEvents(err, gInfo.GasWanted, gInfo.GasUsed, anteEvents, app.trace)
abciRes = sdkerrors.ResponseDeliverTxWithEvents(err, gInfo.GasWanted, gInfo.GasUsed, anteEvents, app.trace)
return abciRes
}

return abci.ResponseDeliverTx{
abciRes = abci.ResponseDeliverTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints?
Log: result.Log,
Data: result.Data,
Events: sdk.MarkEventsToIndex(result.Events, app.indexEvents),
}

return abciRes
}

// Commit implements the ABCI interface. It will commit all state that exists in
Expand Down
17 changes: 10 additions & 7 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@ import (
"reflect"
"strings"

"github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/snapshots"
"github.com/cosmos/cosmos-sdk/store"
"github.com/cosmos/cosmos-sdk/store/rootmulti"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/x/auth/legacy/legacytx"
"github.com/gogo/protobuf/proto"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
dbm "github.com/tendermint/tm-db"
)

const (
Expand Down Expand Up @@ -135,6 +134,10 @@ type BaseApp struct { // nolint: maligned
indexEvents map[string]struct{}

feeHandler sdk.FeeHandler

// abciListeners for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
abciListeners []ABCIListener
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
17 changes: 13 additions & 4 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package baseapp

import (
"fmt"
"io"

dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/snapshots"
"github.com/cosmos/cosmos-sdk/store"
sdk "github.com/cosmos/cosmos-sdk/types"
dbm "github.com/tendermint/tm-db"
"io"
)

// File for storing in-package BaseApp optional functions,
Expand Down Expand Up @@ -251,3 +249,14 @@ func (app *BaseApp) SetFeeHandler(feeHandler sdk.FeeHandler) {

app.feeHandler = feeHandler
}

// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
// add the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
}
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, s)
}
40 changes: 40 additions & 0 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package baseapp

import (
"io"
"sync"

abci "github.com/tendermint/tendermint/abci/types"

store "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/types"
)

// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
// HaltAppOnDeliveryError returns true if the application has been configured to halt when
// ListenBeginBlock, ListenEndBlock, ListenDeliverTx fail to process messages and false when
// the application has been configured to send messages to ListenBeginBlock, ListenEndBlock, ListenDeliverTx
// in fire-and-forget fashion.
//
// This behavior is controlled by a corresponding app config setting.
HaltAppOnDeliveryError() bool
}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService interface {
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
Stream(wg *sync.WaitGroup) error
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[store.StoreKey][]store.WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener
// Closer interface
io.Closer
}
Loading