Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: state streamers are not integrated #702

Merged
merged 15 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
### State Machine Breaking
- [cronos#695](https://github.com/crypto-org-chain/cronos/pull/695) Implement ADR-007, generic events format with indexed params.

### Improvements

- [cronos#702](https://github.com/crypto-org-chain/cronos/pull/702) Integrate the file state streamer.

*September 13, 2022*

## v0.9.0
Expand Down
32 changes: 32 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"os"
"path/filepath"
"sync"

"github.com/crypto-org-chain/cronos/x/cronos/middleware"

Expand All @@ -19,11 +20,13 @@ import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/server/api"
"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -117,6 +120,7 @@ import (
gravitytypes "github.com/peggyjv/gravity-bridge/module/v2/x/gravity/types"

// this line is used by starport scaffolding # stargate/app/moduleImport
cronosappclient "github.com/crypto-org-chain/cronos/client"
"github.com/crypto-org-chain/cronos/x/cronos"
cronosclient "github.com/crypto-org-chain/cronos/x/cronos/client"
cronoskeeper "github.com/crypto-org-chain/cronos/x/cronos/keeper"
Expand All @@ -141,6 +145,8 @@ const (
//
// NOTE: In the SDK, the default value is 255.
AddrLen = 20

FileStreamerDirectory = "file_streamer"
)

// this line is used by starport scaffolding # stargate/wasm/app/enabledProposals
Expand Down Expand Up @@ -340,6 +346,32 @@ func New(
tkeys := sdk.NewTransientStoreKeys(paramstypes.TStoreKey, evmtypes.TransientKey, feemarkettypes.TransientKey)
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey)

// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
// Only support file streamer right now.
if cast.ToString(appOpts.Get(cronosappclient.FlagStreamers)) == "file" {
streamingDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data", FileStreamerDirectory)
if err := os.MkdirAll(streamingDir, os.ModePerm); err != nil {
panic(err)
}

// default to exposing all
exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
yihuang marked this conversation as resolved.
Show resolved Hide resolved
service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec)
if err != nil {
panic(err)
}
bApp.SetStreamingService(service)

wg := new(sync.WaitGroup)
if err := service.Stream(wg); err != nil {
panic(err)
}
}

app := &App{
BaseApp: bApp,
cdc: cdc,
Expand Down
3 changes: 3 additions & 0 deletions client/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package client

const FlagStreamers = "streamers"
2 changes: 2 additions & 0 deletions cmd/cronosd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
ethermint "github.com/evmos/ethermint/types"

"github.com/crypto-org-chain/cronos/app"
cronosclient "github.com/crypto-org-chain/cronos/client"
// this line is used by starport scaffolding # stargate/root/import
)

Expand Down Expand Up @@ -147,6 +148,7 @@ func initRootCmd(rootCmd *cobra.Command, encodingConfig params.EncodingConfig) {
func addModuleInitFlags(startCmd *cobra.Command) {
crisis.AddModuleInitFlags(startCmd)
cronos.AddModuleInitFlags(startCmd)
startCmd.Flags().String(cronosclient.FlagStreamers, "", "Enable streamers, only file streamer is supported right now")
// this line is used by starport scaffolding # stargate/root/initFlags
}

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/configs/default.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
dotenv: '../../scripts/.env',
'cronos_777-1': {
cmd: 'cronosd',
'start-flags': '--trace',
'start-flags': '--trace --streamers file',
config: {
mempool: {
version: 'v1',
Expand Down
7 changes: 4 additions & 3 deletions integration_tests/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ def node_rpc(self, i):
return "tcp://127.0.0.1:%d" % ports.rpc_port(self.base_port(i))

def cosmos_cli(self, i=0):
return CosmosCLI(
self.base_dir / f"node{i}", self.node_rpc(i), self.chain_binary
)
return CosmosCLI(self.node_home(i), self.node_rpc(i), self.chain_binary)

def node_home(self, i=0):
return self.base_dir / f"node{i}"

def use_websocket(self, use=True):
self._w3 = None
Expand Down
19 changes: 17 additions & 2 deletions integration_tests/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration_tests/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ toml = "^0.10.2"
pysha3 = "^1.0.2"
jsonnet = "^0.18.0"
eth-account = { git = "https://github.com/mmsqe/eth-account.git", branch = "v0.5.8-rc0" }
cprotobuf = { git = "https://github.com/yihuang/cprotobuf.git" }

[tool.poetry.dev-dependencies]

Expand Down
62 changes: 62 additions & 0 deletions integration_tests/test_streamer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from cprotobuf import Field, ProtoEntity, decode_primitive
from hexbytes import HexBytes

from .utils import ADDRS


class StoreKVPairs(ProtoEntity):
# the store key for the KVStore this pair originates from
store_key = Field("string", 1)
# true indicates a set operation, false indicates a delete operation
yihuang marked this conversation as resolved.
Show resolved Hide resolved
delete = Field("bool", 2)
key = Field("bytes", 3)
value = Field("bytes", 4)


def decode_stream_file(data, body_cls=StoreKVPairs, header_cls=None, footer_cls=None):
"""
header, body*, footer
"""
header = footer = None
body = []
offset = 0
size, n = decode_primitive(data, "uint64")
offset += n

# header
if header_cls is not None:
header = header_cls()
header.ParseFromString(data[offset : offset + size])
offset += size

while True:
size, n = decode_primitive(data[offset:], "uint64")
offset += n
if offset + size == len(data):
# footer
if footer_cls is not None:
footer = footer_cls()
footer.ParseFromString(data[offset : offset + size])
offset += size
break
else:
# body
if body_cls is not None:
item = body_cls()
item.ParseFromString(data[offset : offset + size])
body.append(item)
offset += size
return header, body, footer


def test_streamers(cronos):
"""
- check the streaming files are created
- try to parse the state change sets
"""
# inspect the first state change of the first tx in genesis
path = cronos.node_home(0) / "data/file_streamer/block-0-tx-0"
_, body, _ = decode_stream_file(open(path, "rb").read())
yihuang marked this conversation as resolved.
Show resolved Hide resolved
# creation of the validator account
assert body[0]["store_key"] == "acc"
mmsqe marked this conversation as resolved.
Show resolved Hide resolved
assert body[0]["key"] == b"\x01" + HexBytes(ADDRS["validator"])
6 changes: 6 additions & 0 deletions nix/testenv.nix
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,11 @@ pkgs.poetry2nix.mkPoetryEnv {
nativeBuildInputs = (old.nativeBuildInputs or [ ]) ++ [ self.poetry ];
}
);

cprotobuf = super.cprotobuf.overridePythonAttrs (
old: {
nativeBuildInputs = (old.nativeBuildInputs or [ ]) ++ [ self.cython ];
}
);
});
}