Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add api support for abci streaming to external systems #952

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/sims.yml
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,8 @@ jobs:
strategy:
fail-fast: false
matrix:
# test-sim-nondeterminism-state-listening-file is the nondeterminism test with state listening to a file.
# test-sim-nondeterminism-state-listening-trace is the nondeterminism test with state listening to trace.
# test-sim-nondeterminism-state-listening-kafka is the nondeterminism test with state listening to kafka.
test: ["file", "trace", "kafka"]
test: ["trace"]
os: ["ubuntu-latest", "macos-latest"]
runs-on: ${{ matrix.os }}
steps:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* Bump Cosmos-SDK to `v0.45.4-pio-4` (from `v0.45.4-pio-2`) to utilize the new `CountAuthorization` authz grant type. [#807](https://github.com/provenance-io/provenance/issues/807)
* Update metadata module authz handling to properly call `Accept` and delete/update authorizations as they're used [#905](https://github.com/provenance-io/provenance/issues/905)
* Read the `custom.toml` config file if it exists. This is read before the other config files, and isn't managed by the `config` commands [#989](https://github.com/provenance-io/provenance/issues/989)
* ABCI Request Response Streaming [#940](https://github.com/provenance-io/provenance/issues/940)

### Bug Fixes

Expand Down
45 changes: 43 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"encoding/json"
"fmt"
"io"
"net/http"
"os"
Expand Down Expand Up @@ -98,11 +99,13 @@ import (

// PROVENANCE
appparams "github.com/provenance-io/provenance/app/params"
appstreaming "github.com/provenance-io/provenance/app/streaming"
_ "github.com/provenance-io/provenance/client/docs/statik" // registers swagger-ui files with statik
"github.com/provenance-io/provenance/internal/antewrapper"
piohandlers "github.com/provenance-io/provenance/internal/handlers"
"github.com/provenance-io/provenance/internal/provwasm"
"github.com/provenance-io/provenance/internal/statesync"
"github.com/provenance-io/provenance/internal/streaming"
"github.com/provenance-io/provenance/x/attribute"
attributekeeper "github.com/provenance-io/provenance/x/attribute/keeper"
attributetypes "github.com/provenance-io/provenance/x/attribute/types"
Expand Down Expand Up @@ -279,6 +282,9 @@ type App struct {

// module configurator
configurator module.Configurator

// ABCI streaming service
streamingService streaming.StreamService
}

func init() {
Expand Down Expand Up @@ -798,6 +804,9 @@ func New(
app.ScopedIBCKeeper = scopedIBCKeeper
app.ScopedTransferKeeper = scopedTransferKeeper

// register streaming service
SpicyLemon marked this conversation as resolved.
Show resolved Hide resolved
app.registerStreamingService(appOpts)

return app
}

Expand All @@ -806,12 +815,20 @@ func (app *App) Name() string { return app.BaseApp.Name() }

// BeginBlocker application updates every begin block
func (app *App) BeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock) abci.ResponseBeginBlock {
return app.mm.BeginBlock(ctx, req)
res := app.mm.BeginBlock(ctx, req)
if app.streamingService != nil {
app.streamingService.StreamBeginBlocker(ctx, req, res)
}
return res
}

// EndBlocker application updates every end block
func (app *App) EndBlocker(ctx sdk.Context, req abci.RequestEndBlock) abci.ResponseEndBlock {
return app.mm.EndBlock(ctx, req)
res := app.mm.EndBlock(ctx, req)
if app.streamingService != nil {
app.streamingService.StreamEndBlocker(ctx, req, res)
}
return res
}

// InitChainer application update at chain initialization
Expand Down Expand Up @@ -926,6 +943,30 @@ func (app *App) RegisterTendermintService(clientCtx client.Context) {
tmservice.RegisterTendermintService(app.BaseApp.GRPCQueryRouter(), clientCtx, app.interfaceRegistry)
}

// RegisterStreamingService is used to register a streaming service into the App
func (app *App) registerStreamingService(appOpts servertypes.AppOptions) {
app.Logger().Info(fmt.Sprintf("Streaming Service Config options: "+
"\n%s/config/app.toml"+
"\n----------------------------------"+
"\nset streaming.enable true"+
"\nset streaming.service to one of %s\n",
os.Getenv("PIO_HOME"), appstreaming.ConfigOptions()))
egaxhaj marked this conversation as resolved.
Show resolved Hide resolved
enableStreaming := cast.ToBool(appOpts.Get(fmt.Sprintf("%s.%s", streaming.TomlKey, streaming.EnableParam)))
if enableStreaming {
serviceTomlKey := fmt.Sprintf("%s.%s", streaming.TomlKey, streaming.ServiceParam)
service := cast.ToString(appOpts.Get(serviceTomlKey))
ssi, found := appstreaming.StreamServiceInitializers[service]
if found {
// App will pass BeginBlocker and EndBlocker requests and responses to the streaming services
app.streamingService = ssi.Init(appOpts, app.AppCodec())
app.Logger().Info("Starting streaming", "service", service)
} else {
panic(fmt.Sprintf("could not create %s service: unknown streaming.service %s, expected one of %s",
service, service, appstreaming.ConfigOptions()))
}
}
}

// RegisterSwaggerAPI registers swagger route with API Server
func RegisterSwaggerAPI(ctx client.Context, rtr *mux.Router) {
statikFS, err := fs.New()
Expand Down
89 changes: 89 additions & 0 deletions app/sim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"testing"

"github.com/spf13/viper"
"github.com/stretchr/testify/require"

abci "github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -379,6 +380,94 @@ func TestAppStateDeterminism(t *testing.T) {
}
}

func TestAppStateDeterminismWithStateListeningTrace(t *testing.T) {
if !sdksim.FlagEnabledValue {
t.Skip("skipping application simulation")
}

config := sdksim.NewConfigFromFlags()
config.InitialBlockHeight = 1
config.ExportParamsPath = ""
config.OnOperation = false
config.AllInvariants = false
config.ChainID = helpers.SimAppChainID
config.DBBackend = "memdb"

numSeeds := 3
numTimesToRunPerSeed := 5
appHashList := make([]json.RawMessage, numTimesToRunPerSeed)

// load trace streaming service config options
m := make(map[string]interface{})
m["streaming.enabled"] = true
m["streaming.service"] = []string{"trace"}
// example service params
m["streaming.trace.print-data-to-stdout"] = true

appOpts := viper.New()
for key, value := range m {
appOpts.SetDefault(key, value)
}

for i := 0; i < numSeeds; i++ {
config.Seed = rand.Int63()
PrintConfig(config)

for j := 0; j < numTimesToRunPerSeed; j++ {
var logger log.Logger
if sdksim.FlagVerboseValue {
logger = log.TestingLogger()
} else {
logger = log.NewNopLogger()
}

db := dbm.NewMemDB()
app := New(logger,
db,
nil,
true, map[int64]bool{},
DefaultNodeHome,
sdksim.FlagPeriodValue,
MakeEncodingConfig(),
appOpts,
interBlockCacheOpt(),
)

fmt.Printf(
"running provenance non-determinism simulation; seed %d: %d/%d, attempt: %d/%d\n",
config.Seed, i+1, numSeeds, j+1, numTimesToRunPerSeed,
)

_, _, err := simulation.SimulateFromSeed(
t,
os.Stdout,
app.BaseApp,
sdksim.AppStateFn(app.AppCodec(), app.SimulationManager()),
simtypes.RandomAccounts, // Replace with own random account function if using keys other than secp256k1
sdksim.SimulationOperations(app, app.AppCodec(), config),
app.ModuleAccountAddrs(),
config,
app.AppCodec(),
)
require.NoError(t, err)

if config.Commit {
PrintStats(config, db)
}

appHash := app.LastCommitID().Hash
appHashList[j] = appHash

if j != 0 {
require.Equal(
t, string(appHashList[0]), string(appHashList[j]),
"non-determinism in seed %d: %d/%d, attempt: %d/%d\n", config.Seed, i+1, numSeeds, j+1, numTimesToRunPerSeed,
)
}
}
}
}

// fauxMerkleModeOpt returns a BaseApp option to use a dbStoreAdapter instead of
// an IAVLStore for faster simulation speed.
func fauxMerkleModeOpt(bapp *baseapp.BaseApp) {
Expand Down
92 changes: 92 additions & 0 deletions app/streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Provenance Streaming Services

This package contains a map of supported streaming services that push data out to external systems.
In addition, this package contains service implementations that build on top of `StreamService` and `StreamServiceInitializer` interfaces.
These two interfaces are defined in `internal/streaming/streaming.go`.

## `StreamServiceInitializer`

This interface is defined as:

```go
// internal/streaming/streaming.go

// StreamServiceInitializer interface initializes StreamService
// implementations which are then registered with the App
type StreamServiceInitializer interface {
// Init configures and initializes the streaming service
Init(opts servertypes.AppOptions, marshaller codec.BinaryCodec) StreamService
}
```

Stream Services need to extend this interface to enable them to be initialized and loaded by the `App`.
In addition, the service must be defined in the `StreamServiceIntializers` map in `app/streaming/streaming.go`
to allow the App to load a service through `config` properties.
Take a look at [app/streaming/trace/trace.go](./trace/trace.go) for an implementation example of this interface.

Implementations must be added `app/streaming/streaming.go` for the App to be able to load the service when enabled.
See the [configuration](#configuration) section for how to configure a service.

```go
// app/streaming/streaming.go

// StreamServiceInitializers contains a map of supported StreamServiceInitializer implementations
var StreamServiceInitializers = map[string]streaming.StreamServiceInitializer{
"trace": trace.StreamServiceInitializer,
}
```

## `StreamService`

This interface is defined as:

```go
// internal/streaming/streaming.go

// StreamService interface used to hook into the ABCI message processing of the BaseApp
type StreamService interface {
// StreamBeginBlocker updates the streaming service with the latest BeginBlock messages
StreamBeginBlocker(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock)
// StreamEndBlocker updates the steaming service with the latest EndBlock messages
StreamEndBlocker(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock)
}
```

Take a look at [app/streaming/trace/service/service.go](./trace/service/service.go) for an implementation example of this interface.

## Configuration

Streaming services are configured in the `streaming` TOML mapping which can be placed in the App's `custom.toml` file. There are two parameters
for configuring a service: `streaming.enable` and `streaming.service`. `streaming.enable` is bool that turns on or off a streaming service.
`streaming.service` specifies the service name that is registered with the App.

```toml
[streaming]
enable = true
# The streaming service name that ABCI BeginBlocker and EndBlocker request and response will be sent to.
# Supported services are: trace
service = "service name to stream ABCI data"
```

This provides node operates with the ability to `opt-in` and enable streaming to external systems.

At this time, the only pre-defined service is the [trace](./trace) streaming service.
AS mentioned above, service can be added by adding the service to the `StreamServiceInitializers`
in [app/streaming/streaming.go](./streaming.go) and adding configuration properties in `app.toml`.
egaxhaj marked this conversation as resolved.
Show resolved Hide resolved
See [Trace streaming service configuration](#trace-streaming-service-configuration) for an example.

### Trace streaming service configuration

The configuration for the `trace` service is defined as:

```toml
[streaming]
enable = true
# The streaming service name that ABCI BeginBlocker and EndBlocker request and response will be sent to.
# Supported services are: trace
service = "trace"

[streaming.trace]
# When true, it will print ABCI BeginBlocker and EndBlocker request and response to stdout.
print-data-to-stdout = false
```
23 changes: 23 additions & 0 deletions app/streaming/streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package streaming

import (
"sort"
"strings"

"github.com/provenance-io/provenance/app/streaming/trace"
"github.com/provenance-io/provenance/internal/streaming"
)

// StreamServiceInitializers contains a map of supported StreamServiceInitializer implementations
var StreamServiceInitializers = map[string]streaming.StreamServiceInitializer{
"trace": trace.StreamServiceInitializer,
}

func ConfigOptions() string {
SpicyLemon marked this conversation as resolved.
Show resolved Hide resolved
keys := make([]string, 0, len(StreamServiceInitializers))
for k := range StreamServiceInitializers {
keys = append(keys, k)
}
sort.Strings(keys)
return strings.Join(keys, ",")
egaxhaj marked this conversation as resolved.
Show resolved Hide resolved
}
Loading