Skip to content

Commit

Permalink
e2e: integrate light clients (bp #6196)
Browse files Browse the repository at this point in the history
integrate light clients (#6196)
fix e2e app test (#6223)
fix light client generator (#6236)
  • Loading branch information
tnasu committed Dec 23, 2021
1 parent 647a2e0 commit d56369c
Show file tree
Hide file tree
Showing 16 changed files with 249 additions and 57 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ jobs:
**/**.go
go.mod
go.sum
- uses: golangci/golangci-lint-action@v2.2.1
- uses: golangci/golangci-lint-action@v2.5.1
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.31
args: --timeout 10m --skip-files "_test\.go" # for skip unused linter (https://github.com/golangci/golangci-lint/issues/791)
version: v1.38
args: --timeout 10m
github-token: ${{ secrets.github_token }}
if: env.GIT_DIFF
31 changes: 5 additions & 26 deletions cmd/ostracon/commands/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"context"
"errors"
"fmt"
rpchttp "github.com/line/ostracon/rpc/client/http"
"net/http"
"os"
"path/filepath"
"regexp"
"strings"
"time"

Expand All @@ -17,15 +17,13 @@ import (

dbm "github.com/line/tm-db/v2"

"github.com/line/ostracon/crypto/merkle"
"github.com/line/ostracon/libs/log"
tmmath "github.com/line/ostracon/libs/math"
tmos "github.com/line/ostracon/libs/os"
"github.com/line/ostracon/light"
lproxy "github.com/line/ostracon/light/proxy"
lrpc "github.com/line/ostracon/light/rpc"
dbs "github.com/line/ostracon/light/store/db"
rpchttp "github.com/line/ostracon/rpc/client/http"
rpcserver "github.com/line/ostracon/rpc/jsonrpc/server"
)

Expand Down Expand Up @@ -233,12 +231,11 @@ func runProxy(cmd *cobra.Command, args []string) error {
cfg.WriteTimeout = config.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}

p := lproxy.Proxy{
Addr: listenAddr,
Config: cfg,
Client: lrpc.NewClient(rpcClient, c, lrpc.KeyPathFn(defaultMerkleKeyPathFn())),
Logger: logger,
p, err := lproxy.NewProxy(c, listenAddr, primaryAddr, cfg, logger, lrpc.KeyPathFn(lrpc.DefaultMerkleKeyPathFn()))
if err != nil {
return err
}

// Stop upon receiving SIGTERM or CTRL-C.
tmos.TrapSignal(logger, func() {
p.Listener.Close()
Expand Down Expand Up @@ -277,21 +274,3 @@ func saveProviders(db dbm.DB, primaryAddr, witnessesAddrs string) error {
}
return nil
}

func defaultMerkleKeyPathFn() lrpc.KeyPathFunc {
// regexp for extracting store name from /abci_query path
storeNameRegexp := regexp.MustCompile(`\/store\/(.+)\/key`)

return func(path string, key []byte) (merkle.KeyPath, error) {
matches := storeNameRegexp.FindStringSubmatch(path)
if len(matches) != 2 {
return nil, fmt.Errorf("can't find store name in %s using %s", path, storeNameRegexp)
}
storeName := matches[1]

kp := merkle.KeyPath{}
kp = kp.AppendKey([]byte(storeName), merkle.KeyEncodingURL)
kp = kp.AppendKey(key, merkle.KeyEncodingURL)
return kp, nil
}
}
6 changes: 5 additions & 1 deletion light/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,8 @@ func (c *Client) cleanupAfter(height int64) error {
}

func (c *Client) updateTrustedLightBlock(l *types.LightBlock) error {
c.logger.Debug("updating trusted light block", "light_block", l)

if err := c.trustedStore.SaveLightBlock(l); err != nil {
return fmt.Errorf("failed to save trusted header: %w", err)
}
Expand Down Expand Up @@ -1055,10 +1057,12 @@ and remove witness. Otherwise, use the different primary`, e.WitnessIndex), "wit
// respond or couldn't find the block, then we ignore it and move on to
// the next witness.
if _, ok := e.Reason.(provider.ErrBadLightBlock); ok {
c.logger.Info("Witness sent us invalid header / vals -> removing it", "witness", c.witnesses[e.WitnessIndex])
c.logger.Info("Witness sent us invalid header / vals -> removing it",
"witness", c.witnesses[e.WitnessIndex], "err", err)
witnessesToRemove = append(witnessesToRemove, e.WitnessIndex)
}
}

}

// we need to make sure that we remove witnesses by index in the reverse
Expand Down
24 changes: 24 additions & 0 deletions light/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (

"github.com/line/ostracon/libs/log"
tmpubsub "github.com/line/ostracon/libs/pubsub"
"github.com/line/ostracon/light"
lrpc "github.com/line/ostracon/light/rpc"
rpchttp "github.com/line/ostracon/rpc/client/http"
rpcserver "github.com/line/ostracon/rpc/jsonrpc/server"
)

Expand All @@ -21,6 +23,28 @@ type Proxy struct {
Listener net.Listener
}

// NewProxy creates the struct used to run an HTTP server for serving light
// client rpc requests.
func NewProxy(
lightClient *light.Client,
listenAddr, providerAddr string,
config *rpcserver.Config,
logger log.Logger,
opts ...lrpc.Option,
) (*Proxy, error) {
rpcClient, err := rpchttp.NewWithTimeout(providerAddr, "/websocket", uint(config.WriteTimeout.Seconds()))
if err != nil {
return nil, fmt.Errorf("failed to create http client for %s: %w", providerAddr, err)
}

return &Proxy{
Addr: listenAddr,
Config: config,
Client: lrpc.NewClient(rpcClient, lightClient, opts...),
Logger: logger,
}, nil
}

// ListenAndServe configures the rpcserver.WebsocketManager, sets up the RPC
// routes to proxy via Client, and starts up an HTTP server on the TCP network
// address p.Addr.
Expand Down
22 changes: 22 additions & 0 deletions light/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"regexp"
"time"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -61,6 +62,27 @@ func KeyPathFn(fn KeyPathFunc) Option {
}
}

// DefaultMerkleKeyPathFn creates a function used to generate merkle key paths
// from a path string and a key. This is the default used by the cosmos SDK.
// This merkle key paths are required when verifying /abci_query calls
func DefaultMerkleKeyPathFn() KeyPathFunc {
// regexp for extracting store name from /abci_query path
storeNameRegexp := regexp.MustCompile(`\/store\/(.+)\/key`)

return func(path string, key []byte) (merkle.KeyPath, error) {
matches := storeNameRegexp.FindStringSubmatch(path)
if len(matches) != 2 {
return nil, fmt.Errorf("can't find store name in %s using %s", path, storeNameRegexp)
}
storeName := matches[1]

kp := merkle.KeyPath{}
kp = kp.AppendKey([]byte(storeName), merkle.KeyEncodingURL)
kp = kp.AppendKey(key, merkle.KeyEncodingURL)
return kp, nil
}
}

// NewClient returns a new client.
func NewClient(next rpcclient.Client, lc LightClient, opts ...Option) *Client {
c := &Client{
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (app *Application) Commit() abci.ResponseCommit {
if err != nil {
panic(err)
}
logger.Info("Created state sync snapshot", "height", snapshot.Height)
app.logger.Info("Created state sync snapshot", "height", snapshot.Height)
}
retainHeight := int64(0)
if app.cfg.RetainBlocks > 0 {
Expand Down
1 change: 1 addition & 0 deletions test/e2e/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Config struct {
Listen string
Protocol string
Dir string
Mode string `toml:"mode"`
PersistInterval uint64 `toml:"persist_interval"`
SnapshotInterval uint64 `toml:"snapshot_interval"`
RetainBlocks uint64 `toml:"retain_blocks"`
Expand Down
95 changes: 92 additions & 3 deletions test/e2e/app/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package main

import (
"context"
"errors"
"fmt"
"github.com/line/ostracon/types"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/spf13/viper"
Expand All @@ -16,10 +20,16 @@ import (
tmflags "github.com/line/ostracon/libs/cli/flags"
"github.com/line/ostracon/libs/log"
tmnet "github.com/line/ostracon/libs/net"
"github.com/line/ostracon/light"
lproxy "github.com/line/ostracon/light/proxy"
lrpc "github.com/line/ostracon/light/rpc"
dbs "github.com/line/ostracon/light/store/db"
"github.com/line/ostracon/node"
"github.com/line/ostracon/p2p"
"github.com/line/ostracon/privval"
"github.com/line/ostracon/proxy"
rpcserver "github.com/line/ostracon/rpc/jsonrpc/server"
e2e "github.com/line/ostracon/test/e2e/pkg"
mcs "github.com/line/ostracon/test/maverick/consensus"
maverick "github.com/line/ostracon/test/maverick/node"
)
Expand Down Expand Up @@ -66,7 +76,11 @@ func run(configFile string) error {
err = startApp(cfg)
case "builtin":
if len(cfg.Misbehaviors) == 0 {
err = startNode(cfg)
if cfg.Mode == string(e2e.ModeLight) {
err = startLightClient(cfg)
} else {
err = startNode(cfg)
}
} else {
err = startMaverick(cfg)
}
Expand Down Expand Up @@ -139,8 +153,67 @@ func startNode(cfg *Config) error {
return n.Start()
}

// startMaverick starts a Maverick node that runs the application directly. It assumes the Ostracon
// configuration is in $OCHOME/config/ostracon.toml.
func startLightClient(cfg *Config) error {
tmcfg, nodeLogger, _, err := setupNode()
if err != nil {
return err
}

dbContext := &node.DBContext{ID: "light", Config: tmcfg}
lightDB, err := node.DefaultDBProvider(dbContext)
if err != nil {
return err
}

providers := rpcEndpoints(tmcfg.P2P.PersistentPeers)

c, err := light.NewHTTPClient(
context.Background(),
cfg.ChainID,
light.TrustOptions{
Period: tmcfg.StateSync.TrustPeriod,
Height: tmcfg.StateSync.TrustHeight,
Hash: tmcfg.StateSync.TrustHashBytes(),
},
providers[0],
providers[1:],
dbs.New(lightDB, "light"),
types.DefaultVoterParams(),
light.Logger(nodeLogger),
)
if err != nil {
return err
}

rpccfg := rpcserver.DefaultConfig()
rpccfg.MaxBodyBytes = tmcfg.RPC.MaxBodyBytes
rpccfg.MaxHeaderBytes = tmcfg.RPC.MaxHeaderBytes
rpccfg.MaxOpenConnections = tmcfg.RPC.MaxOpenConnections
// If necessary adjust global WriteTimeout to ensure it's greater than
// TimeoutBroadcastTxCommit.
// See https://github.com/tendermint/tendermint/issues/3435
if rpccfg.WriteTimeout <= tmcfg.RPC.TimeoutBroadcastTxCommit {
rpccfg.WriteTimeout = tmcfg.RPC.TimeoutBroadcastTxCommit + 1*time.Second
}

p, err := lproxy.NewProxy(c, tmcfg.RPC.ListenAddress, providers[0], rpccfg, nodeLogger,
lrpc.KeyPathFn(lrpc.DefaultMerkleKeyPathFn()))
if err != nil {
return err
}

logger.Info("Starting proxy...", "laddr", tmcfg.RPC.ListenAddress)
if err := p.ListenAndServe(); err != http.ErrServerClosed {
// Error starting or closing listener:
logger.Error("proxy ListenAndServe", "err", err)
}

return nil
}

// FIXME: Temporarily disconnected maverick until it is redesigned
// startMaverick starts a Maverick node that runs the application directly. It assumes the Tendermint
// configuration is in $TMHOME/config/tendermint.toml.
func startMaverick(cfg *Config) error {
app, err := NewApplication(cfg)
if err != nil {
Expand Down Expand Up @@ -247,3 +320,19 @@ func setupNode() (*config.Config, log.Logger, *p2p.NodeKey, error) {

return tmcfg, nodeLogger, nodeKey, nil
}

// rpcEndpoints takes a list of persistent peers and splits them into a list of rpc endpoints
// using 26657 as the port number
func rpcEndpoints(peers string) []string {
arr := strings.Split(peers, ",")
endpoints := make([]string, len(arr))
for i, v := range arr {
urlString := strings.SplitAfter(v, "@")[1]
hostName := strings.Split(urlString, ":26656")[0]
// use RPC port instead
port := 26657
rpcEndpoint := "http://" + hostName + ":" + fmt.Sprint(port)
endpoints[i] = rpcEndpoint
}
return endpoints
}
Loading

0 comments on commit d56369c

Please sign in to comment.