Skip to content

Commit

Permalink
feat: ADR-038 Part 2: StreamingService interface, file writing implem…
Browse files Browse the repository at this point in the history
…entation, and configuration (#8664) (#13326)

* feat: ADR-038 Part 2: StreamingService interface, file writing implementation, and configuration (#8664)

<!-- < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < < ☺
v                               ✰  Thanks for creating a PR! ✰
v    Before smashing the submit button please review the checkboxes.
v    If a checkbox is n/a - please still include it but + a little note why
☺ > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >  -->

<!-- Add a description of the changes that this PR introduces and the files that
are the most critical to review.
-->

Hello 👋 this PR introduces the second stage of changes to support [ADR-038](cosmos/cosmos-sdk#8012) state listening. This is rebased on top of the [first segment](cosmos/cosmos-sdk#8551), which introduces the low level changes to the MultiStore and KVStore interfaces and implementations, the new WriteListener types, and the new listen.KVStore type.

In this segment we introduce the StreamingService interface, an implementation that writes out to files, and it's integration and configuration at the BaseApp level.

The idea was to have the first segment reviewed independently first but if people think it is easier/more time efficient to review both at the same time then we could start here.

Thanks!

This work is towards satisfying [ADR-038](https://github.com/cosmos/cosmos-sdk/blob/master/docs/architecture/adr-038-state-listening.md)

---

Before we can merge this PR, please make sure that all the following items have been
checked off. If any of the checklist items are not applicable, please leave them but
write a little note why.

- [x] Targeted PR against correct branch (see [CONTRIBUTING.md](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#pr-targeting))
- [x] Linked to Github issue with discussion and accepted design OR link to spec that describes this work.
- [x] Code follows the [module structure standards](https://github.com/cosmos/cosmos-sdk/blob/master/docs/building-modules/structure.md).
- [x] Wrote unit and integration [tests](https://github.com/cosmos/cosmos-sdk/blob/master/CONTRIBUTING.md#testing)
- [x] Updated relevant documentation (`docs/`) or specification (`x/<module>/spec/`)
- [x] Added relevant `godoc` [comments](https://blog.golang.org/godoc-documenting-go-code).
- [x] Added a relevant changelog entry to the `Unreleased` section in `CHANGELOG.md`
- [x] Re-reviewed `Files changed` in the Github PR explorer
- [x] Review `Codecov Report` in the comment section below once CI passes

* fix lint

* Update CHANGELOG.md

Co-authored-by: Ian Norden <iansnorden@gmail.com>
  • Loading branch information
2 people authored and dudong2 committed Nov 1, 2022
1 parent 49cb9f4 commit c0e06d0
Show file tree
Hide file tree
Showing 13 changed files with 1,085 additions and 8 deletions.
26 changes: 25 additions & 1 deletion baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg
res = app.beginBlocker(app.deliverState.ctx, req)
res.Events = sdk.MarkEventsToIndex(res.Events, app.indexEvents)
}
// set the signed validators for addition to context in deliverTx
app.voteInfos = req.LastCommitInfo.GetVotes()

// call the hooks with the BeginBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err)
}
}

return res
}
Expand All @@ -220,6 +229,13 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc
res.ConsensusParamUpdates = cp
}

// call the streaming service hooks with the EndBlock messages
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err)
}
}

return res
}

Expand Down Expand Up @@ -291,9 +307,17 @@ func (app *BaseApp) EndRecheckTx(req abci.RequestEndRecheckTx) abci.ResponseEndR
// Otherwise, the ResponseDeliverTx will contain releveant error information.
// Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant
// gas execution context.
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx {
func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) {
defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx")

defer func() {
for _, streamingListener := range app.abciListeners {
if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil {
app.logger.Error("DeliverTx listening hook failed", "err", err)
}
}
}()

gInfo := sdk.GasInfo{}
resultStr := "successful"

Expand Down
4 changes: 4 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ type BaseApp struct { //nolint: maligned
// indexEvents defines the set of events in the form {eventType}.{attributeKey},
// which informs Tendermint what to index. If empty, all events will be indexed.
indexEvents map[string]struct{}

// abciListeners for hooking into the ABCI message processing of the BaseApp
// and exposing the requests and responses to external consumers
abciListeners []ABCIListener
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
11 changes: 11 additions & 0 deletions baseapp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,14 @@ func MetricsProvider(prometheus bool) cache.MetricsProvider {
}
return cache.NopMetricsProvider()
}

// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore
func (app *BaseApp) SetStreamingService(s StreamingService) {
// add the listeners for each StoreKey
for key, lis := range s.Listeners() {
app.cms.AddListeners(key, lis)
}
// register the StreamingService within the BaseApp
// BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context
app.abciListeners = append(app.abciListeners, s)
}
33 changes: 33 additions & 0 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package baseapp

import (
"io"
"sync"

abci "github.com/line/ostracon/abci/types"

store "github.com/line/lbm-sdk/store/types"
"github.com/line/lbm-sdk/types"
)

// ABCIListener interface used to hook into the ABCI message processing of the BaseApp
type ABCIListener interface {
// ListenBeginBlock updates the streaming service with the latest BeginBlock messages
ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error
// ListenEndBlock updates the steaming service with the latest EndBlock messages
ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error
// ListenDeliverTx updates the steaming service with the latest DeliverTx messages
ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error
}

// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks
type StreamingService interface {
// Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file
Stream(wg *sync.WaitGroup) error
// Listeners returns the streaming service's listeners for the BaseApp to register
Listeners() map[store.StoreKey][]store.WriteListener
// ABCIListener interface for hooking into the ABCI messages from inside the BaseApp
ABCIListener
// Closer interface
io.Closer
}
7 changes: 7 additions & 0 deletions simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/line/lbm-sdk/server/config"
servertypes "github.com/line/lbm-sdk/server/types"
simappparams "github.com/line/lbm-sdk/simapp/params"
"github.com/line/lbm-sdk/store/streaming"
"github.com/line/lbm-sdk/testutil/testdata"
sdk "github.com/line/lbm-sdk/types"
"github.com/line/lbm-sdk/types/module"
Expand Down Expand Up @@ -315,6 +316,12 @@ func NewSimApp(
// not include this key.
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey")

// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil {
ostos.Exit(err.Error())
}

app := &SimApp{
BaseApp: bApp,
legacyAmino: legacyAmino,
Expand Down
12 changes: 5 additions & 7 deletions store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/line/lbm-sdk/store/cachekv"
"github.com/line/lbm-sdk/store/dbadapter"
"github.com/line/lbm-sdk/store/listenkv"
"github.com/line/lbm-sdk/store/tracekv"
"github.com/line/lbm-sdk/store/types"
)

Expand Down Expand Up @@ -49,17 +51,13 @@ func NewFromKVStore(
}

for key, store := range stores {
var cacheWrapped types.CacheWrap
if cms.TracingEnabled() {
cacheWrapped = store.CacheWrapWithTrace(cms.traceWriter, cms.traceContext)
} else {
cacheWrapped = store.CacheWrap()
store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, cms.traceContext)
}
if cms.ListeningEnabled(key) {
cms.stores[key] = cacheWrapped.CacheWrapWithListeners(key, cms.listeners[key])
} else {
cms.stores[key] = cacheWrapped
store = listenkv.NewStore(store.(types.KVStore), key, listeners[key])
}
cms.stores[key] = cachekv.NewStore(store.(types.KVStore))
}

return cms
Expand Down
67 changes: 67 additions & 0 deletions store/streaming/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# State Streaming Service
This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a
file or stream, as described in [ADR-038](../../docs/architecture/adr-038-state-listening.md) and defined in [types/streaming.go](../../baseapp/streaming.go).
The child directories contain the implementations for specific output destinations.

Currently, a `StreamingService` implementation that writes state changes out to files is supported, in the future support for additional
output destinations can be added.

The `StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file:

```toml
[store]
streamers = [ # if len(streamers) > 0 we are streaming
"file", # name of the streaming service, used by constructor
]

[streamers]
[streamers.file]
keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"]
write_dir = "path to the write directory"
prefix = "optional prefix to prepend to the generated file names"
```

`store.streamers` contains a list of the names of the `StreamingService` implementations to employ which are used by `ServiceTypeFromString`
to return the `ServiceConstructor` for that particular implementation:


```go
listeners := cast.ToStringSlice(appOpts.Get("store.streamers"))
for _, listenerName := range listeners {
constructor, err := ServiceTypeFromString(listenerName)
if err != nil {
// handle error
}
}
```

`streamers` contains a mapping of the specific `StreamingService` implementation name to the configuration parameters for that specific service.
`streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service and is required by every type of `StreamingService`.
In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off.

Additional configuration parameters are optional and specific to the implementation.
In the case of the file streaming service, `streamers.file.write_dir` contains the path to the
directory to write the files to, and `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions
with other App `StreamingService` output files.

The `ServiceConstructor` accepts `AppOptions`, the store keys collected using `streamers.x.keys`, a `BinaryMarshaller` and
returns a `StreamingService` implementation. The `AppOptions` are passed in to provide access to any implementation specific configuration options,
e.g. in the case of the file streaming service the `streamers.file.write_dir` and `streamers.file.prefix`.

```go
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
// handler error
}
```

The returned `StreamingService` is loaded into the BaseApp using the BaseApp's `SetStreamingService` method.
The `Stream` method is called on the service to begin the streaming process. Depending on the implementation this process
may be synchronous or asynchronous with the message processing of the state machine.

```go
bApp.SetStreamingService(streamingService)
wg := new(sync.WaitGroup)
quitChan := make(chan struct{})
streamingService.Stream(wg, quitChan)
```
137 changes: 137 additions & 0 deletions store/streaming/constructor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package streaming

import (
"fmt"
"strings"
"sync"

"github.com/spf13/cast"

"github.com/line/lbm-sdk/baseapp"
"github.com/line/lbm-sdk/codec"
serverTypes "github.com/line/lbm-sdk/server/types"
"github.com/line/lbm-sdk/store/streaming/file"
"github.com/line/lbm-sdk/store/types"
)

// ServiceConstructor is used to construct a streaming service
type ServiceConstructor func(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error)

// ServiceType enum for specifying the type of StreamingService
type ServiceType int

const (
Unknown ServiceType = iota
File
// add more in the future
)

// ServiceTypeFromString returns the streaming.ServiceType corresponding to the provided name
func ServiceTypeFromString(name string) ServiceType {
switch strings.ToLower(name) {
case "file", "f":
return File
default:
return Unknown
}
}

// String returns the string name of a streaming.ServiceType
func (sst ServiceType) String() string {
switch sst {
case File:
return "file"
default:
return "unknown"
}
}

// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors
var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{
File: NewFileStreamingService,
}

// NewServiceConstructor returns the streaming.ServiceConstructor corresponding to the provided name
func NewServiceConstructor(name string) (ServiceConstructor, error) {
ssType := ServiceTypeFromString(name)
if ssType == Unknown {
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
}
if constructor, ok := ServiceConstructorLookupTable[ssType]; ok && constructor != nil {
return constructor, nil
}
return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
}

// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService
func NewFileStreamingService(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error) {
filePrefix := cast.ToString(opts.Get("streamers.file.prefix"))
fileDir := cast.ToString(opts.Get("streamers.file.write_dir"))
return file.NewStreamingService(fileDir, filePrefix, keys, marshaller)
}

// LoadStreamingServices is a function for loading StreamingServices onto the BaseApp using the provided AppOptions, codec, and keys
// It returns the WaitGroup and quit channel used to synchronize with the streaming services and any error that occurs during the setup
func LoadStreamingServices(bApp *baseapp.BaseApp, appOpts serverTypes.AppOptions, appCodec codec.BinaryCodec, keys map[string]*types.KVStoreKey) ([]baseapp.StreamingService, *sync.WaitGroup, error) {
// waitgroup and quit channel for optional shutdown coordination of the streaming service(s)
wg := new(sync.WaitGroup)
// configure state listening capabilities using AppOptions
streamers := cast.ToStringSlice(appOpts.Get("store.streamers"))
activeStreamers := make([]baseapp.StreamingService, 0, len(streamers))
for _, streamerName := range streamers {
// get the store keys allowed to be exposed for this streaming service
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName)))
var exposeStoreKeys []types.StoreKey
if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys
exposeStoreKeys = make([]types.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
} else {
exposeStoreKeys = make([]types.StoreKey, 0, len(exposeKeyStrs))
for _, keyStr := range exposeKeyStrs {
if storeKey, ok := keys[keyStr]; ok {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
}
}
if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything
continue
}
// get the constructor for this streamer name
constructor, err := NewServiceConstructor(streamerName)
if err != nil {
// close any services we may have already spun up before hitting the error on this one
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}
return nil, nil, err
}
// generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
// close any services we may have already spun up before hitting the error on this one
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}
return nil, nil, err
}
// register the streaming service with the BaseApp
bApp.SetStreamingService(streamingService)
// kick off the background streaming service loop
streamingService.Stream(wg)
// add to the list of active streamers
activeStreamers = append(activeStreamers, streamingService)
}
// if there are no active streamers, activeStreamers is empty (len == 0) and the waitGroup is not waiting on anything
return activeStreamers, wg, nil
}

func exposeAll(list []string) bool {
for _, ele := range list {
if ele == "*" {
return true
}
}
return false
}
Loading

0 comments on commit c0e06d0

Please sign in to comment.