Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Main patch from tm-v0.34.10 #358

Merged
merged 5 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ require (
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
gonum.org/v1/gonum v0.9.3
google.golang.org/grpc v1.35.0
google.golang.org/grpc v1.37.0
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,7 @@ google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8=
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.37.0 h1:uSZWeQJX5j11bIQ4AJoj+McDBo29cY1MCoC1wO3ts+c=
google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.37.1/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
Expand Down
20 changes: 15 additions & 5 deletions light/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func NewClientFromTrustedStore(

// Validate the number of witnesses.
if len(c.witnesses) < 1 {
return nil, errNoWitnesses{}
return nil, ErrNoWitnesses
}

// Verify witnesses are all on the same chain.
Expand Down Expand Up @@ -770,7 +770,17 @@ func (c *Client) verifySkipping(
pivotHeight := verifiedBlock.Height + (blockCache[depth].Height-verifiedBlock.
Height)*verifySkippingNumerator/verifySkippingDenominator
interimBlock, providerErr := source.LightBlock(ctx, pivotHeight)
if providerErr != nil {
switch providerErr {
case nil:
blockCache = append(blockCache, interimBlock)

// if the error is benign, the client does not need to replace the primary
case provider.ErrLightBlockNotFound, provider.ErrNoResponse, provider.ErrHeightTooHigh:
return nil, err

// all other errors such as ErrBadLightBlock or ErrUnreliableProvider are seen as malevolent and the
// provider is removed
default:
return nil, ErrVerificationFailed{From: verifiedBlock.Height, To: pivotHeight, Reason: providerErr}
}
blockCache = append(blockCache, interimBlock)
Expand Down Expand Up @@ -1008,7 +1018,7 @@ func (c *Client) lightBlockFromPrimary(ctx context.Context, height int64) (*type
// Everything went smoothly. We reset the lightBlockRequests and return the light block
return l, nil

case provider.ErrNoResponse, provider.ErrLightBlockNotFound:
case provider.ErrNoResponse, provider.ErrLightBlockNotFound, provider.ErrHeightTooHigh:
// we find a new witness to replace the primary
c.logger.Debug("error from light block request from primary, replacing...",
"error", err, "height", height, "primary", c.primary)
Expand Down Expand Up @@ -1111,7 +1121,7 @@ func (c *Client) findNewPrimary(ctx context.Context, height int64, remove bool)
return response.lb, nil

// process benign errors by logging them only
case provider.ErrNoResponse, provider.ErrLightBlockNotFound:
case provider.ErrNoResponse, provider.ErrLightBlockNotFound, provider.ErrHeightTooHigh:
lastError = response.err
c.logger.Debug("error on light block request from witness",
"error", response.err, "primary", c.witnesses[response.witnessIndex])
Expand Down Expand Up @@ -1139,7 +1149,7 @@ func (c *Client) compareFirstHeaderWithWitnesses(ctx context.Context, h *types.S
defer c.providerMutex.Unlock()

if len(c.witnesses) < 1 {
return errNoWitnesses{}
return ErrNoWitnesses
}

errc := make(chan error, len(c.witnesses))
Expand Down
2 changes: 1 addition & 1 deletion light/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c *Client) detectDivergence(ctx context.Context, primaryTrace []*types.Lig
defer c.providerMutex.Unlock()

if len(c.witnesses) == 0 {
return errNoWitnesses{}
return ErrNoWitnesses
}

// launch one goroutine per witness to retrieve the light block of the target height
Expand Down
8 changes: 0 additions & 8 deletions light/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ func (e errConflictingHeaders) Error() string {
e.Block.Hash(), e.WitnessIndex)
}

// errNoWitnesses means that there are not enough witnesses connected to
// continue running the light client.
type errNoWitnesses struct{}

func (e errNoWitnesses) Error() string {
return "no witnesses connected. please reset light client"
}

// errBadWitness is returned when the witness either does not respond or
// responds with an invalid header.
type errBadWitness struct {
Expand Down
4 changes: 3 additions & 1 deletion mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
for _, tx := range msg.Txs {
tx := tx // pin! workaround for `scopelint` error
memR.mempool.CheckTxAsync(tx, txInfo, func(err error) {
if err != nil {
if err == ErrTxInCache {
memR.Logger.Debug("Tx already exists in cache", "tx", txID(tx))
} else if err != nil {
memR.Logger.Info("Could not check tx", "tx", txID(tx), "err", err)
}
}, nil)
Expand Down
19 changes: 17 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,21 @@ func CustomReactors(reactors map[string]p2p.Reactor) Option {
n.sw.RemoveReactor(name, existingReactor)
}
n.sw.AddReactor(name, reactor)
// register the new channels to the nodeInfo
// NOTE: This is a bit messy now with the type casting but is
// cleaned up in the following version when NodeInfo is changed from
// and interface to a concrete type
if ni, ok := n.nodeInfo.(p2p.DefaultNodeInfo); ok {
for _, chDesc := range reactor.GetChannels() {
if !ni.HasChannel(chDesc.ID) {
ni.Channels = append(ni.Channels, chDesc.ID)
n.transport.AddChannel(chDesc.ID)
}
}
n.nodeInfo = ni
} else {
n.Logger.Error("Node info is not of type DefaultNodeInfo. Custom reactor channels can not be added.")
}
}
}
}
Expand Down Expand Up @@ -1265,7 +1280,7 @@ func makeNodeInfo(
txIndexer txindex.TxIndexer,
genDoc *types.GenesisDoc,
state sm.State,
) (p2p.NodeInfo, error) {
) (p2p.DefaultNodeInfo, error) {
txIndexerStatus := "on"
if _, ok := txIndexer.(*null.TxIndex); ok {
txIndexerStatus = "off"
Expand All @@ -1280,7 +1295,7 @@ func makeNodeInfo(
case "v2":
bcChannel = bcv2.BlockchainChannel
default:
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
return p2p.DefaultNodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}

nodeInfo := p2p.DefaultNodeInfo{
Expand Down
13 changes: 13 additions & 0 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
tmrand "github.com/line/ostracon/libs/rand"
mempl "github.com/line/ostracon/mempool"
"github.com/line/ostracon/p2p"
"github.com/line/ostracon/p2p/conn"
p2pmock "github.com/line/ostracon/p2p/mock"
"github.com/line/ostracon/privval"
"github.com/line/ostracon/proxy"
Expand Down Expand Up @@ -388,6 +389,14 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
defer os.RemoveAll(config.RootDir)

cr := p2pmock.NewReactor()
cr.Channels = []*conn.ChannelDescriptor{
{
ID: byte(0x31),
Priority: 5,
SendQueueCapacity: 100,
RecvMessageCapacity: 100,
},
}
customBlockchainReactor := p2pmock.NewReactor()

nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
Expand Down Expand Up @@ -416,6 +425,10 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {

assert.True(t, customBlockchainReactor.IsRunning())
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))

channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels
assert.Contains(t, channels, mempl.MempoolChannel)
assert.Contains(t, channels, cr.Channels[0].ID)
}

func TestNodeNewNodeTxIndexIndexer(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion p2p/mock/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

type Reactor struct {
p2p.BaseReactor

Channels []*conn.ChannelDescriptor
}

func NewReactor() *Reactor {
Expand All @@ -17,7 +19,7 @@ func NewReactor() *Reactor {
return r
}

func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return []*conn.ChannelDescriptor{} }
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}
13 changes: 9 additions & 4 deletions p2p/node_info.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package p2p

import (
"bytes"
"errors"
"fmt"
"reflect"

"github.com/line/ostracon/libs/bytes"
tmbytes "github.com/line/ostracon/libs/bytes"
tmstrings "github.com/line/ostracon/libs/strings"
tmp2p "github.com/line/ostracon/proto/ostracon/p2p"
"github.com/line/ostracon/version"
Expand Down Expand Up @@ -85,9 +86,9 @@ type DefaultNodeInfo struct {

// Check compatibility.
// Channels are HexBytes so easier to read as JSON
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Channels bytes.HexBytes `json:"channels"` // channels this node knows about
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Channels tmbytes.HexBytes `json:"channels"` // channels this node knows about

// ASCIIText fields
Moniker string `json:"moniker"` // arbitrary moniker
Expand Down Expand Up @@ -222,6 +223,10 @@ func (info DefaultNodeInfo) NetAddress() (*NetAddress, error) {
return NewNetAddressString(idAddr)
}

func (info DefaultNodeInfo) HasChannel(chID byte) bool {
return bytes.Contains(info.Channels, []byte{chID})
}

func (info DefaultNodeInfo) ToProto() *tmp2p.DefaultNodeInfo {

dni := new(tmp2p.DefaultNodeInfo)
Expand Down
3 changes: 2 additions & 1 deletion p2p/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func TestNodeInfoCompatible(t *testing.T) {
assert.NoError(t, ni1.CompatibleWith(ni2))

// add another channel; still compatible
ni2.Channels = []byte{newTestChannel, testCh}
ni2.Channels = append(ni2.Channels, newTestChannel)
assert.True(t, ni2.HasChannel(newTestChannel))
assert.NoError(t, ni1.CompatibleWith(ni2))

// wrong NodeInfo type is not compatible
Expand Down
7 changes: 1 addition & 6 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,10 @@ func newPeer(
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
var channs = make([]byte, 0, len(chDescs))
for _, desc := range chDescs {
channs = append(channs, desc.ID)
}

p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: channs,
channels: nodeInfo.(DefaultNodeInfo).Channels,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
Expand Down
13 changes: 13 additions & 0 deletions p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,19 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
return nil
}

// AddChannel registers a channel to nodeInfo.
// NOTE: NodeInfo must be of type DefaultNodeInfo else channels won't be updated
// This is a bit messy at the moment but is cleaned up in the following version
// when NodeInfo changes from an interface to a concrete type
func (mt *MultiplexTransport) AddChannel(chID byte) {
if ni, ok := mt.nodeInfo.(DefaultNodeInfo); ok {
if !ni.HasChannel(chID) {
ni.Channels = append(ni.Channels, chID)
}
mt.nodeInfo = ni
}
}

func (mt *MultiplexTransport) acceptPeers() {
for {
c, err := mt.listener.Accept()
Expand Down
15 changes: 15 additions & 0 deletions p2p/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,21 @@ func TestTransportHandshake(t *testing.T) {
}
}

func TestTransportAddChannel(t *testing.T) {
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
)
testChannel := byte(0x01)

mt.AddChannel(testChannel)
if !mt.nodeInfo.(DefaultNodeInfo).HasChannel(testChannel) {
t.Errorf("missing added channel %v. Got %v", testChannel, mt.nodeInfo.(DefaultNodeInfo).Channels)
}
}

// create listener
func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
var (
Expand Down
29 changes: 19 additions & 10 deletions test/e2e/runner/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (
"github.com/line/ostracon/types"
)

// Load generates transactions against the network until the given
// context is cancelled.
func Load(ctx context.Context, testnet *e2e.Testnet) error {
// Load generates transactions against the network until the given context is
// canceled. A multiplier of greater than one can be supplied if load needs to
// be generated beyond a minimum amount.
func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
// Since transactions are executed across all nodes in the network, we need
// to reduce transaction load for larger networks to avoid using too much
// CPU. This gives high-throughput small networks and low-throughput large ones.
Expand All @@ -37,7 +38,7 @@ func Load(ctx context.Context, testnet *e2e.Testnet) error {
logger.Info(fmt.Sprintf("Starting transaction load (%v workers)...", concurrency))
started := time.Now()

go loadGenerate(ctx, chTx)
go loadGenerate(ctx, chTx, multiplier)

for w := 0; w < concurrency; w++ {
go loadProcess(ctx, testnet, chTx, chSuccess)
Expand All @@ -64,14 +65,14 @@ func Load(ctx context.Context, testnet *e2e.Testnet) error {
}
}

// loadGenerate generates jobs until the context is cancelled
func loadGenerate(ctx context.Context, chTx chan<- types.Tx) {
// loadGenerate generates jobs until the context is canceled
func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) {
for i := 0; i < math.MaxInt64; i++ {
// We keep generating the same 1000 keys over and over, with different values.
// This gives a reasonable load without putting too much data in the app.
id := i % 1000

bz := make([]byte, 2048) // 4kb hex-encoded
bz := make([]byte, 1024) // 1kb hex-encoded
_, err := rand.Read(bz)
if err != nil {
panic(fmt.Sprintf("Failed to read random bytes: %v", err))
Expand All @@ -80,7 +81,8 @@ func loadGenerate(ctx context.Context, chTx chan<- types.Tx) {

select {
case chTx <- tx:
time.Sleep(10 * time.Millisecond)
time.Sleep(time.Duration(100/multiplier) * time.Millisecond)

case <-ctx.Done():
close(chTx)
return
Expand All @@ -103,10 +105,17 @@ func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx
if err != nil {
continue
}

// check that the node is up
_, err = client.Health(ctx)
if err != nil {
continue
}

clients[node.Name] = client
}
_, err = client.BroadcastTxCommit(ctx, tx)
if err != nil {

if _, err = client.BroadcastTxSync(ctx, tx); err != nil {
continue
}
chSuccess <- tx
Expand Down
Loading