From 1d2b1a8ba3f1e093438ba8bca278dca9771ed033 Mon Sep 17 00:00:00 2001 From: marbar3778 Date: Mon, 26 Dec 2022 14:46:43 +0100 Subject: [PATCH 1/4] further decouple store --- baseapp/baseapp.go | 2 +- baseapp/options.go | 15 +++++++++------ simapp/app.go | 6 +++++- simapp/app_v2.go | 5 ++++- store/streaming/README.md | 2 +- store/streaming/constructor.go | 13 ++++--------- store/streaming/constructor_test.go | 7 ++----- store/streaming/file/README.md | 2 +- store/streaming/file/service.go | 3 +-- {baseapp => store/types}/streaming.go | 6 ++---- 10 files changed, 30 insertions(+), 31 deletions(-) rename {baseapp => store/types}/streaming.go (93%) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index 1e95dac964e..1115954500e 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -141,7 +141,7 @@ type BaseApp struct { //nolint: maligned // abciListeners for hooking into the ABCI message processing of the BaseApp // and exposing the requests and responses to external consumers - abciListeners []ABCIListener + abciListeners []storetypes.ABCIListener } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a diff --git a/baseapp/options.go b/baseapp/options.go index 90e58973fe8..39c40b6b24a 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -11,6 +11,7 @@ import ( pruningtypes "github.com/cosmos/cosmos-sdk/store/pruning/types" "github.com/cosmos/cosmos-sdk/store/snapshots" snapshottypes "github.com/cosmos/cosmos-sdk/store/snapshots/types" + storetypes "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/mempool" ) @@ -232,14 +233,16 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) { } // SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore -func (app *BaseApp) SetStreamingService(s StreamingService) { +func (app *BaseApp) SetStreamingService(s []storetypes.StreamingService) { // add the listeners for each StoreKey - for key, lis := range s.Listeners() { - app.cms.AddListeners(key, lis) + for _, streamer := range s { + for key, lis := range streamer.Listeners() { + app.cms.AddListeners(key, lis) + } + // register the StreamingService within the BaseApp + // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context + app.abciListeners = append(app.abciListeners, streamer) } - // register the StreamingService within the BaseApp - // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context - app.abciListeners = append(app.abciListeners, s) } // SetTxDecoder sets the TxDecoder if it wasn't provided in the BaseApp constructor. diff --git a/simapp/app.go b/simapp/app.go index cc7dee35e80..3a250ee2523 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -263,11 +263,15 @@ func NewSimApp( memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey") // load state streaming if enabled - if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, logger, keys); err != nil { + streamers, _, err := streaming.LoadStreamingServices(appOpts, appCodec, logger, keys) + if err != nil { logger.Error("failed to load state streaming", "err", err) os.Exit(1) } + // register the streaming service with the BaseApp + bApp.SetStreamingService(streamers) + app := &SimApp{ BaseApp: bApp, legacyAmino: legacyAmino, diff --git a/simapp/app_v2.go b/simapp/app_v2.go index 99f93d53163..41fe110909c 100644 --- a/simapp/app_v2.go +++ b/simapp/app_v2.go @@ -246,11 +246,14 @@ func NewSimApp( app.App = appBuilder.Build(logger, db, traceStore, baseAppOptions...) // load state streaming if enabled - if _, _, err := streaming.LoadStreamingServices(app.App.BaseApp, appOpts, app.appCodec, logger, app.kvStoreKeys()); err != nil { + streamers, _, err := streaming.LoadStreamingServices(appOpts, app.appCodec, logger, app.kvStoreKeys()) + if err != nil { logger.Error("failed to load state streaming", "err", err) os.Exit(1) } + app.App.BaseApp.SetStreamingService(streamers) + /**** Module Options ****/ app.ModuleManager.RegisterInvariants(app.CrisisKeeper) diff --git a/store/streaming/README.md b/store/streaming/README.md index 9eb962ac862..57fa3f6631e 100644 --- a/store/streaming/README.md +++ b/store/streaming/README.md @@ -3,7 +3,7 @@ This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a file or stream, as described in [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/main/docs/architecture/adr-038-state-listening.md) -and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/baseapp/streaming.go). +and defined in [types/streaming.go](https://github.com/cosmos/cosmos-sdk/blob/main/store/types/streaming.go). The child directories contain the implementations for specific output destinations. Currently, a `StreamingService` implementation that writes state changes out to diff --git a/store/streaming/constructor.go b/store/streaming/constructor.go index c756c61b0f3..b44018c9671 100644 --- a/store/streaming/constructor.go +++ b/store/streaming/constructor.go @@ -7,7 +7,6 @@ import ( "strings" "sync" - "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client/flags" serverTypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store/streaming/file" @@ -18,7 +17,7 @@ import ( ) // ServiceConstructor is used to construct a streaming service -type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, types.Codec, log.Logger) (baseapp.StreamingService, error) +type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, types.Codec, log.Logger) (types.StreamingService, error) // ServiceType enum for specifying the type of StreamingService type ServiceType int @@ -90,7 +89,7 @@ func NewFileStreamingService( keys []types.StoreKey, marshaller types.Codec, logger log.Logger, -) (baseapp.StreamingService, error) { +) (types.StreamingService, error) { homePath := cast.ToString(opts.Get(flags.FlagHome)) filePrefix := cast.ToString(opts.Get(OptStreamersFilePrefix)) fileDir := cast.ToString(opts.Get(OptStreamersFileWriteDir)) @@ -118,18 +117,17 @@ func NewFileStreamingService( // WaitGroup and quit channel used to synchronize with the streaming services // and any error that occurs during the setup. func LoadStreamingServices( - bApp *baseapp.BaseApp, appOpts serverTypes.AppOptions, appCodec types.Codec, logger log.Logger, keys map[string]*types.KVStoreKey, -) ([]baseapp.StreamingService, *sync.WaitGroup, error) { +) ([]types.StreamingService, *sync.WaitGroup, error) { // waitgroup and quit channel for optional shutdown coordination of the streaming service(s) wg := new(sync.WaitGroup) // configure state listening capabilities using AppOptions streamers := cast.ToStringSlice(appOpts.Get(OptStoreStreamers)) - activeStreamers := make([]baseapp.StreamingService, 0, len(streamers)) + activeStreamers := make([]types.StreamingService, 0, len(streamers)) for _, streamerName := range streamers { var exposeStoreKeys []types.StoreKey @@ -180,9 +178,6 @@ func LoadStreamingServices( return nil, nil, err } - // register the streaming service with the BaseApp - bApp.SetStreamingService(streamingService) - // kick off the background streaming service loop streamingService.Stream(wg) diff --git a/store/streaming/constructor_test.go b/store/streaming/constructor_test.go index 03a3574f047..28f462a268f 100644 --- a/store/streaming/constructor_test.go +++ b/store/streaming/constructor_test.go @@ -5,9 +5,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/libs/log" - dbm "github.com/tendermint/tm-db" - "github.com/cosmos/cosmos-sdk/baseapp" serverTypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store/streaming" "github.com/cosmos/cosmos-sdk/store/streaming/file" @@ -49,10 +47,9 @@ func TestStreamingServiceConstructor(t *testing.T) { } func TestLoadStreamingServices(t *testing.T) { - db := dbm.NewMemDB() + encCdc := types.NewTestCodec() keys := types.NewKVStoreKeys("mockKey1", "mockKey2") - bApp := baseapp.NewBaseApp("appName", log.NewNopLogger(), db, nil) testCases := map[string]struct { appOpts serverTypes.AppOptions @@ -76,7 +73,7 @@ func TestLoadStreamingServices(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - activeStreamers, _, err := streaming.LoadStreamingServices(bApp, tc.appOpts, encCdc, log.NewNopLogger(), keys) + activeStreamers, _, err := streaming.LoadStreamingServices(tc.appOpts, encCdc, log.NewNopLogger(), keys) require.NoError(t, err) require.Equal(t, tc.activeStreamersLen, len(activeStreamers)) }) diff --git a/store/streaming/file/README.md b/store/streaming/file/README.md index 0c34de7f3be..d5ca3534d3e 100644 --- a/store/streaming/file/README.md +++ b/store/streaming/file/README.md @@ -1,6 +1,6 @@ # File Streaming Service -This pkg contains an implementation of the [StreamingService](../../../baseapp/streaming.go) that writes +This pkg contains an implementation of the [StreamingService](../../types/streaming.go) that writes the data stream out to files on the local filesystem. This process is performed synchronously with the message processing of the state machine. diff --git a/store/streaming/file/service.go b/store/streaming/file/service.go index a5be0538d71..9e5928ac96c 100644 --- a/store/streaming/file/service.go +++ b/store/streaming/file/service.go @@ -14,11 +14,10 @@ import ( abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" - "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/store/types" ) -var _ baseapp.StreamingService = &StreamingService{} +var _ types.StreamingService = &StreamingService{} // StreamingService is a concrete implementation of StreamingService that writes // state changes out to files. diff --git a/baseapp/streaming.go b/store/types/streaming.go similarity index 93% rename from baseapp/streaming.go rename to store/types/streaming.go index b8b382ae05b..cf963bcb0b6 100644 --- a/baseapp/streaming.go +++ b/store/types/streaming.go @@ -1,4 +1,4 @@ -package baseapp +package types import ( "context" @@ -6,8 +6,6 @@ import ( "sync" abci "github.com/tendermint/tendermint/abci/types" - - store "github.com/cosmos/cosmos-sdk/store/types" ) // ABCIListener interface used to hook into the ABCI message processing of the BaseApp. @@ -29,7 +27,7 @@ type StreamingService interface { // Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file Stream(wg *sync.WaitGroup) error // Listeners returns the streaming service's listeners for the BaseApp to register - Listeners() map[store.StoreKey][]store.WriteListener + Listeners() map[StoreKey][]WriteListener // ABCIListener interface for hooking into the ABCI messages from inside the BaseApp ABCIListener // Closer interface From 0bae28d744c2af7cf4d9d41e778bb3bda706e57a Mon Sep 17 00:00:00 2001 From: marbar3778 Date: Mon, 26 Dec 2022 14:55:05 +0100 Subject: [PATCH 2/4] further decouple store --- store/cachekv/internal/btree_test.go | 5 ++- store/cachekv/store.go | 2 +- store/internal/conv/doc.go | 2 ++ store/internal/conv/string.go | 26 ++++++++++++++ store/internal/conv/string_test.go | 54 ++++++++++++++++++++++++++++ 5 files changed, 85 insertions(+), 4 deletions(-) create mode 100644 store/internal/conv/doc.go create mode 100644 store/internal/conv/string.go create mode 100644 store/internal/conv/string_test.go diff --git a/store/cachekv/internal/btree_test.go b/store/cachekv/internal/btree_test.go index b6aa22db8e0..903558070b0 100644 --- a/store/cachekv/internal/btree_test.go +++ b/store/cachekv/internal/btree_test.go @@ -4,7 +4,6 @@ import ( "testing" "github.com/cosmos/cosmos-sdk/store/types" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" ) @@ -195,9 +194,9 @@ func verifyIterator(t *testing.T, itr types.Iterator, expected []int64, msg stri } func int642Bytes(i int64) []byte { - return sdk.Uint64ToBigEndian(uint64(i)) + return types.Uint64ToBigEndian(uint64(i)) } func bytes2Int64(buf []byte) int64 { - return int64(sdk.BigEndianToUint64(buf)) + return int64(types.BigEndianToUint64(buf)) } diff --git a/store/cachekv/store.go b/store/cachekv/store.go index 666a7125735..203223493d3 100644 --- a/store/cachekv/store.go +++ b/store/cachekv/store.go @@ -9,8 +9,8 @@ import ( "github.com/tendermint/tendermint/libs/math" dbm "github.com/tendermint/tm-db" - "github.com/cosmos/cosmos-sdk/internal/conv" "github.com/cosmos/cosmos-sdk/store/cachekv/internal" + "github.com/cosmos/cosmos-sdk/store/internal/conv" "github.com/cosmos/cosmos-sdk/store/internal/kv" "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" diff --git a/store/internal/conv/doc.go b/store/internal/conv/doc.go new file mode 100644 index 00000000000..1c86f5c1440 --- /dev/null +++ b/store/internal/conv/doc.go @@ -0,0 +1,2 @@ +// Package conv provides internal functions for convertions and data manipulation +package conv diff --git a/store/internal/conv/string.go b/store/internal/conv/string.go new file mode 100644 index 00000000000..ab2b7f44b38 --- /dev/null +++ b/store/internal/conv/string.go @@ -0,0 +1,26 @@ +package conv + +import ( + "reflect" + "unsafe" +) + +// UnsafeStrToBytes uses unsafe to convert string into byte array. Returned bytes +// must not be altered after this function is called as it will cause a segmentation fault. +func UnsafeStrToBytes(s string) []byte { + var buf []byte + sHdr := (*reflect.StringHeader)(unsafe.Pointer(&s)) + bufHdr := (*reflect.SliceHeader)(unsafe.Pointer(&buf)) + bufHdr.Data = sHdr.Data + bufHdr.Cap = sHdr.Len + bufHdr.Len = sHdr.Len + return buf +} + +// UnsafeBytesToStr is meant to make a zero allocation conversion +// from []byte -> string to speed up operations, it is not meant +// to be used generally, but for a specific pattern to delete keys +// from a map. +func UnsafeBytesToStr(b []byte) string { + return *(*string)(unsafe.Pointer(&b)) +} diff --git a/store/internal/conv/string_test.go b/store/internal/conv/string_test.go new file mode 100644 index 00000000000..3e051d37b90 --- /dev/null +++ b/store/internal/conv/string_test.go @@ -0,0 +1,54 @@ +package conv + +import ( + "runtime" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +func TestStringSuite(t *testing.T) { + suite.Run(t, new(StringSuite)) +} + +type StringSuite struct{ suite.Suite } + +func unsafeConvertStr() []byte { + return UnsafeStrToBytes("abc") +} + +func (s *StringSuite) TestUnsafeStrToBytes() { + // we convert in other function to trigger GC. We want to check that + // the underlying array in []bytes is accessible after GC will finish swapping. + for i := 0; i < 5; i++ { + b := unsafeConvertStr() + runtime.GC() + <-time.NewTimer(2 * time.Millisecond).C + b2 := append(b, 'd') //nolint:gocritic // append is fine here + s.Equal("abc", string(b)) + s.Equal("abcd", string(b2)) + } +} + +func unsafeConvertBytes() string { + return UnsafeBytesToStr([]byte("abc")) +} + +func (s *StringSuite) TestUnsafeBytesToStr() { + // we convert in other function to trigger GC. We want to check that + // the underlying array in []bytes is accessible after GC will finish swapping. + for i := 0; i < 5; i++ { + str := unsafeConvertBytes() + runtime.GC() + <-time.NewTimer(2 * time.Millisecond).C + s.Equal("abc", str) + } +} + +func BenchmarkUnsafeStrToBytes(b *testing.B) { + for i := 0; i < b.N; i++ { + UnsafeStrToBytes(strconv.Itoa(i)) + } +} From a81f0d34443bb4f201ca75e50b691b667b4c97e8 Mon Sep 17 00:00:00 2001 From: marbar3778 Date: Tue, 27 Dec 2022 13:33:52 +0100 Subject: [PATCH 3/4] cleanup --- CHANGELOG.md | 2 ++ baseapp/options.go | 15 +++++++++++++-- simapp/app.go | 10 +--------- simapp/app_v2.go | 10 +--------- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bee7ce9c04a..7d2e4e89cc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -108,6 +108,8 @@ Ref: https://keepachangelog.com/en/1.0.0/ * [#13881](https://github.com/cosmos/cosmos-sdk/pull/13881) Optimize iteration on nested cached KV stores and other operations in general. * (x/gov) [#14347](https://github.com/cosmos/cosmos-sdk/pull/14347) Support `v1.Proposal` message in `v1beta1.Proposal.Content`. * (x/gov) [#14390](https://github.com/cosmos/cosmos-sdk/pull/14390) Add title, proposer and summary to proposal struct +* (baseapp) [#14417](https://github.com/cosmos/cosmos-sdk/pull/14417) `SetStreamingService` accepts appOptions, AppCodec and Storekeys needed to set streamers. + * Store pacakge no longer has a dependency on baseapp. ### State Machine Breaking diff --git a/baseapp/options.go b/baseapp/options.go index 39c40b6b24a..361b5fa451f 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -3,14 +3,17 @@ package baseapp import ( "fmt" "io" + "os" dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/codec/types" + serverTypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store" pruningtypes "github.com/cosmos/cosmos-sdk/store/pruning/types" "github.com/cosmos/cosmos-sdk/store/snapshots" snapshottypes "github.com/cosmos/cosmos-sdk/store/snapshots/types" + "github.com/cosmos/cosmos-sdk/store/streaming" storetypes "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/mempool" @@ -233,9 +236,17 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) { } // SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore -func (app *BaseApp) SetStreamingService(s []storetypes.StreamingService) { +func (app *BaseApp) SetStreamingService( + appOpts serverTypes.AppOptions, + appCodec storetypes.Codec, + keys map[string]*storetypes.KVStoreKey) { + streamers, _, err := streaming.LoadStreamingServices(appOpts, appCodec, app.logger, keys) + if err != nil { + app.logger.Error("failed to load state streaming", "err", err) + os.Exit(1) + } // add the listeners for each StoreKey - for _, streamer := range s { + for _, streamer := range streamers { for key, lis := range streamer.Listeners() { app.cms.AddListeners(key, lis) } diff --git a/simapp/app.go b/simapp/app.go index 3a250ee2523..63fa955bf55 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -31,7 +31,6 @@ import ( "github.com/cosmos/cosmos-sdk/server/config" servertypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/std" - "github.com/cosmos/cosmos-sdk/store/streaming" storetypes "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/testutil/testdata_pulsar" sdk "github.com/cosmos/cosmos-sdk/types" @@ -262,15 +261,8 @@ func NewSimApp( // not include this key. memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey") - // load state streaming if enabled - streamers, _, err := streaming.LoadStreamingServices(appOpts, appCodec, logger, keys) - if err != nil { - logger.Error("failed to load state streaming", "err", err) - os.Exit(1) - } - // register the streaming service with the BaseApp - bApp.SetStreamingService(streamers) + bApp.SetStreamingService(appOpts, appCodec, keys) app := &SimApp{ BaseApp: bApp, diff --git a/simapp/app_v2.go b/simapp/app_v2.go index 41fe110909c..7598d99a6a0 100644 --- a/simapp/app_v2.go +++ b/simapp/app_v2.go @@ -22,7 +22,6 @@ import ( "github.com/cosmos/cosmos-sdk/server/api" "github.com/cosmos/cosmos-sdk/server/config" servertypes "github.com/cosmos/cosmos-sdk/server/types" - "github.com/cosmos/cosmos-sdk/store/streaming" storetypes "github.com/cosmos/cosmos-sdk/store/types" "github.com/cosmos/cosmos-sdk/testutil/testdata_pulsar" "github.com/cosmos/cosmos-sdk/types/module" @@ -245,14 +244,7 @@ func NewSimApp( app.App = appBuilder.Build(logger, db, traceStore, baseAppOptions...) - // load state streaming if enabled - streamers, _, err := streaming.LoadStreamingServices(appOpts, app.appCodec, logger, app.kvStoreKeys()) - if err != nil { - logger.Error("failed to load state streaming", "err", err) - os.Exit(1) - } - - app.App.BaseApp.SetStreamingService(streamers) + app.App.BaseApp.SetStreamingService(appOpts, app.appCodec, app.kvStoreKeys()) /**** Module Options ****/ From 4eee62f084f6d78b7317995072314f6d1d184feb Mon Sep 17 00:00:00 2001 From: marbar3778 Date: Tue, 27 Dec 2022 22:04:25 +0100 Subject: [PATCH 4/4] address comments --- baseapp/options.go | 11 +++++------ simapp/app.go | 5 ++++- simapp/app_v2.go | 5 ++++- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/baseapp/options.go b/baseapp/options.go index 361b5fa451f..a98992398d7 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -3,12 +3,11 @@ package baseapp import ( "fmt" "io" - "os" dbm "github.com/tendermint/tm-db" "github.com/cosmos/cosmos-sdk/codec/types" - serverTypes "github.com/cosmos/cosmos-sdk/server/types" + servertypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store" pruningtypes "github.com/cosmos/cosmos-sdk/store/pruning/types" "github.com/cosmos/cosmos-sdk/store/snapshots" @@ -237,13 +236,12 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) { // SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore func (app *BaseApp) SetStreamingService( - appOpts serverTypes.AppOptions, + appOpts servertypes.AppOptions, appCodec storetypes.Codec, - keys map[string]*storetypes.KVStoreKey) { + keys map[string]*storetypes.KVStoreKey) error { streamers, _, err := streaming.LoadStreamingServices(appOpts, appCodec, app.logger, keys) if err != nil { - app.logger.Error("failed to load state streaming", "err", err) - os.Exit(1) + return err } // add the listeners for each StoreKey for _, streamer := range streamers { @@ -254,6 +252,7 @@ func (app *BaseApp) SetStreamingService( // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context app.abciListeners = append(app.abciListeners, streamer) } + return nil } // SetTxDecoder sets the TxDecoder if it wasn't provided in the BaseApp constructor. diff --git a/simapp/app.go b/simapp/app.go index 63fa955bf55..b5fa910b1bc 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -262,7 +262,10 @@ func NewSimApp( memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey") // register the streaming service with the BaseApp - bApp.SetStreamingService(appOpts, appCodec, keys) + if err := bApp.SetStreamingService(appOpts, appCodec, keys); err != nil { + logger.Error("failed to load state streaming", "err", err) + os.Exit(1) + } app := &SimApp{ BaseApp: bApp, diff --git a/simapp/app_v2.go b/simapp/app_v2.go index 7598d99a6a0..9c5e117d6ef 100644 --- a/simapp/app_v2.go +++ b/simapp/app_v2.go @@ -244,7 +244,10 @@ func NewSimApp( app.App = appBuilder.Build(logger, db, traceStore, baseAppOptions...) - app.App.BaseApp.SetStreamingService(appOpts, app.appCodec, app.kvStoreKeys()) + if err := app.App.BaseApp.SetStreamingService(appOpts, app.appCodec, app.kvStoreKeys()); err != nil { + logger.Error("failed to load state streaming", "err", err) + os.Exit(1) + } /**** Module Options ****/