Skip to content

Commit

Permalink
Problem: state streamers are not integrated (crypto-org-chain#702)
Browse files Browse the repository at this point in the history
* Problem: state streaming is integrated

Solution:
- integration the basic file streamer

* add integration test

* changelog

* fix build

* fix lint

* fix deliver tx event in cosmos-sdk

* fix integration test

* Update integration_tests/test_streamer.py

Signed-off-by: yihuang <huang@crypto.com>

* update ethermint and fix build

* add a small cli utility into test_streamer.py

* fix integration test

* update sdk to upstream

Signed-off-by: yihuang <huang@crypto.com>
  • Loading branch information
yihuang committed Sep 26, 2022
1 parent af996ab commit e18da07
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- [cronos#719](https://github.com/crypto-org-chain/cronos/pull/719) Fix `eth_call` for legacy blocks (backport #713).

### Improvements

- [cronos#702](https://github.com/crypto-org-chain/cronos/pull/702) Integrate the file state streamer (backport #702).

*Sep 20, 2022*

## v0.9.0-beta3
Expand Down
32 changes: 32 additions & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"os"
"path/filepath"
"sync"

"github.com/crypto-org-chain/cronos/x/cronos/middleware"

Expand All @@ -19,11 +20,13 @@ import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/server/api"
"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -118,6 +121,7 @@ import (
gravitytypes "github.com/peggyjv/gravity-bridge/module/v2/x/gravity/types"

// this line is used by starport scaffolding # stargate/app/moduleImport
cronosappclient "github.com/crypto-org-chain/cronos/client"
"github.com/crypto-org-chain/cronos/x/cronos"
cronosclient "github.com/crypto-org-chain/cronos/x/cronos/client"
cronoskeeper "github.com/crypto-org-chain/cronos/x/cronos/keeper"
Expand All @@ -142,6 +146,8 @@ const (
//
// NOTE: In the SDK, the default value is 255.
AddrLen = 20

FileStreamerDirectory = "file_streamer"
)

// this line is used by starport scaffolding # stargate/wasm/app/enabledProposals
Expand Down Expand Up @@ -341,6 +347,32 @@ func New(
tkeys := sdk.NewTransientStoreKeys(paramstypes.TStoreKey, evmtypes.TransientKey, feemarkettypes.TransientKey)
memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey)

// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
// Only support file streamer right now.
if cast.ToString(appOpts.Get(cronosappclient.FlagStreamers)) == "file" {
streamingDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data", FileStreamerDirectory)
if err := os.MkdirAll(streamingDir, os.ModePerm); err != nil {
panic(err)
}

// default to exposing all
exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec)
if err != nil {
panic(err)
}
bApp.SetStreamingService(service)

wg := new(sync.WaitGroup)
if err := service.Stream(wg); err != nil {
panic(err)
}
}

app := &App{
BaseApp: bApp,
cdc: cdc,
Expand Down
3 changes: 3 additions & 0 deletions client/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package client

const FlagStreamers = "streamers"
2 changes: 2 additions & 0 deletions cmd/cronosd/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
ethermint "github.com/evmos/ethermint/types"

"github.com/crypto-org-chain/cronos/app"
cronosclient "github.com/crypto-org-chain/cronos/client"
// this line is used by starport scaffolding # stargate/root/import
)

Expand Down Expand Up @@ -147,6 +148,7 @@ func initRootCmd(rootCmd *cobra.Command, encodingConfig params.EncodingConfig) {
func addModuleInitFlags(startCmd *cobra.Command) {
crisis.AddModuleInitFlags(startCmd)
cronos.AddModuleInitFlags(startCmd)
startCmd.Flags().String(cronosclient.FlagStreamers, "", "Enable streamers, only file streamer is supported right now")
// this line is used by starport scaffolding # stargate/root/initFlags
}

Expand Down
2 changes: 1 addition & 1 deletion integration_tests/configs/default.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
dotenv: '../../scripts/.env',
'cronos_777-1': {
cmd: 'cronosd',
'start-flags': '--trace',
'start-flags': '--trace --streamers file',
config: {
mempool: {
version: 'v1',
Expand Down
7 changes: 4 additions & 3 deletions integration_tests/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ def node_rpc(self, i):
return "tcp://127.0.0.1:%d" % ports.rpc_port(self.base_port(i))

def cosmos_cli(self, i=0):
return CosmosCLI(
self.base_dir / f"node{i}", self.node_rpc(i), self.chain_binary
)
return CosmosCLI(self.node_home(i), self.node_rpc(i), self.chain_binary)

def node_home(self, i=0):
return self.base_dir / f"node{i}"

def use_websocket(self, use=True):
self._w3 = None
Expand Down
19 changes: 17 additions & 2 deletions integration_tests/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration_tests/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ toml = "^0.10.2"
pysha3 = "^1.0.2"
jsonnet = "^0.18.0"
eth-account = { git = "https://github.com/mmsqe/eth-account.git", branch = "v0.5.8-rc0" }
cprotobuf = { git = "https://github.com/yihuang/cprotobuf.git" }

[tool.poetry.dev-dependencies]

Expand Down
80 changes: 80 additions & 0 deletions integration_tests/test_streamer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from cprotobuf import Field, ProtoEntity, decode_primitive
from hexbytes import HexBytes

from .utils import ADDRS


class StoreKVPairs(ProtoEntity):
# the store key for the KVStore this pair originates from
store_key = Field("string", 1)
# true indicates a delete operation
delete = Field("bool", 2)
key = Field("bytes", 3)
value = Field("bytes", 4)


def decode_stream_file(data, body_cls=StoreKVPairs, header_cls=None, footer_cls=None):
"""
header, body*, footer
"""
header = footer = None
body = []
offset = 0
size, n = decode_primitive(data, "uint64")
offset += n

# header
if header_cls is not None:
header = header_cls()
header.ParseFromString(data[offset : offset + size])
offset += size

while True:
size, n = decode_primitive(data[offset:], "uint64")
offset += n
if offset + size == len(data):
# footer
if footer_cls is not None:
footer = footer_cls()
footer.ParseFromString(data[offset : offset + size])
offset += size
break
else:
# body
if body_cls is not None:
item = body_cls()
item.ParseFromString(data[offset : offset + size])
body.append(item)
offset += size
return header, body, footer


def test_streamers(cronos):
"""
- check the streaming files are created
- try to parse the state change sets
"""
# inspect the first state change of the first tx in genesis
path = cronos.node_home(0) / "data/file_streamer/block-0-tx-0"
_, body, _ = decode_stream_file(open(path, "rb").read())
# creation of the validator account
assert body[0].store_key == "acc"
# the order in gen_txs is undeterministic, could be either one.
assert body[0].key in (
b"\x01" + HexBytes(ADDRS["validator"]),
b"\x01" + HexBytes(ADDRS["validator2"]),
)


if __name__ == "__main__":
import binascii
import sys

_, body, _ = decode_stream_file(open(sys.argv[1], "rb").read())
for item in body:
print(
item.store_key,
item.delete,
binascii.hexlify(item.key).decode(),
binascii.hexlify(item.value).decode(),
)
6 changes: 6 additions & 0 deletions nix/testenv.nix
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,11 @@ pkgs.poetry2nix.mkPoetryEnv {
nativeBuildInputs = (old.nativeBuildInputs or [ ]) ++ [ self.poetry ];
}
);

cprotobuf = super.cprotobuf.overridePythonAttrs (
old: {
nativeBuildInputs = (old.nativeBuildInputs or [ ]) ++ [ self.cython ];
}
);
});
}

0 comments on commit e18da07

Please sign in to comment.