Skip to content

Commit

Permalink
Merge branch 'main' into vimystic/edit_rpcaddr
Browse files Browse the repository at this point in the history
  • Loading branch information
jtieri authored Oct 12, 2023
2 parents c56ac98 + 200d0fb commit 2d982ba
Show file tree
Hide file tree
Showing 15 changed files with 163 additions and 24 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/interchaintest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:

jobs:
events:
runs-on: self-hosted
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.21
uses: actions/setup-go@v4
Expand All @@ -28,7 +28,7 @@ jobs:
run: make interchaintest-events

legacy:
runs-on: self-hosted
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.21
uses: actions/setup-go@v4
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
* [\#1221](https://github.com/cosmos/relayer/pull/1221) Update cometbft to v0.37.2 and ibc-go to v7.2.0.
* [\#1226](https://github.com/cosmos/relayer/pull/1226) Avoid invalid Bech32 prefix error in parallel tests when sdk Config get overwritten by each other in single process.
* [\#1231](https://github.com/cosmos/relayer/pull/1231) Reduce get bech32 prefix when get signer.
* [\#1302](https://github.com/cosmos/relayer/pull/1302) Avoid packet get relayed when estimated gas is higher than max gas.
* [\#1303](https://github.com/cosmos/relayer/pull/1303) Add missing max gas amount on txf to avoid estimate less gas when simualte runTx.

## v0.9.3

Expand Down
59 changes: 59 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/cosmos/relayer/v2/relayer"
"github.com/cosmos/relayer/v2/relayer/processor"

"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -57,6 +58,9 @@ const (
flagSrcConnID = "src-connection-id"
flagDstConnID = "dst-connection-id"
flagOutput = "output"
flagStuckPacketChainID = "stuck-packet-chain-id"
flagStuckPacketHeightStart = "stuck-packet-height-start"
flagStuckPacketHeightEnd = "stuck-packet-height-end"
)

const (
Expand Down Expand Up @@ -424,3 +428,58 @@ func addOutputFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command {
}
return cmd
}

func stuckPacketFlags(v *viper.Viper, cmd *cobra.Command) *cobra.Command {
cmd.Flags().String(flagStuckPacketChainID, "", "chain ID with the stuck packet(s)")
if err := v.BindPFlag(flagStuckPacketChainID, cmd.Flags().Lookup(flagStuckPacketChainID)); err != nil {
panic(err)
}
cmd.Flags().Uint64(flagStuckPacketHeightStart, 0, "height to start searching for the stuck packet(s)")
if err := v.BindPFlag(flagStuckPacketHeightStart, cmd.Flags().Lookup(flagStuckPacketHeightStart)); err != nil {
panic(err)
}
cmd.Flags().Uint64(flagStuckPacketHeightEnd, 0, "height to end searching for the stuck packet(s)")
if err := v.BindPFlag(flagStuckPacketHeightEnd, cmd.Flags().Lookup(flagStuckPacketHeightEnd)); err != nil {
panic(err)
}
return cmd
}

func parseStuckPacketFromFlags(cmd *cobra.Command) (*processor.StuckPacket, error) {
stuckPacketChainID, err := cmd.Flags().GetString(flagStuckPacketChainID)
if err != nil {
return nil, err
}

if stuckPacketChainID == "" {
return nil, nil
}

stuckPacketHeightStart, err := cmd.Flags().GetUint64(flagStuckPacketHeightStart)
if err != nil {
return nil, err
}

if stuckPacketHeightStart == 0 {
return nil, fmt.Errorf("stuck packet chain ID %s is set but start height is not", stuckPacketChainID)
}

stuckPacketHeightEnd, err := cmd.Flags().GetUint64(flagStuckPacketHeightEnd)
if err != nil {
return nil, err
}

if stuckPacketHeightEnd == 0 {
return nil, fmt.Errorf("stuck packet chain ID %s is set but end height is not", stuckPacketChainID)
}

if stuckPacketHeightEnd < stuckPacketHeightStart {
return nil, fmt.Errorf("stuck packet end height %d is less than start height %d", stuckPacketHeightEnd, stuckPacketHeightStart)
}

return &processor.StuckPacket{
ChainID: stuckPacketChainID,
StartHeight: stuckPacketHeightStart,
EndHeight: stuckPacketHeightEnd,
}, nil
}
7 changes: 7 additions & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
return err
}

stuckPacket, err := parseStuckPacketFromFlags(cmd)
if err != nil {
return err
}

rlyErrCh := relayer.StartRelayer(
cmd.Context(),
a.log,
Expand All @@ -156,6 +161,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
processorType,
initialBlockHistory,
prometheusMetrics,
stuckPacket,
)

// Block until the error channel sends a message.
Expand All @@ -179,5 +185,6 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
cmd = initBlockFlag(a.viper, cmd)
cmd = flushIntervalFlag(a.viper, cmd)
cmd = memoFlag(a.viper, cmd)
cmd = stuckPacketFlags(a.viper, cmd)
return cmd
}
8 changes: 8 additions & 0 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,11 @@ $ %s tx flush demo-path channel-0`,
}
}

stuckPacket, err := parseStuckPacketFromFlags(cmd)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(cmd.Context(), flushTimeout)
defer cancel()

Expand All @@ -811,6 +816,7 @@ $ %s tx flush demo-path channel-0`,
relayer.ProcessorEvents,
0,
nil,
stuckPacket,
)

// Block until the error channel sends a message.
Expand All @@ -830,6 +836,8 @@ $ %s tx flush demo-path channel-0`,

cmd = strategyFlag(a.viper, cmd)
cmd = memoFlag(a.viper, cmd)
cmd = stuckPacketFlags(a.viper, cmd)

return cmd
}

Expand Down
18 changes: 18 additions & 0 deletions docs/advanced_usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,25 @@ To remove the feegrant configuration:
- `rly chains configure feegrant basicallowance kujira --delete`


## Stuck Packet

There can be scenarios where a standard flush fails to clear a packet due to differences in the way packets are observed. The standard flush depends on the packet queries working properly. Sometimes the packet queries can miss things that the block scanning performed by the relayer during standard operation wouldn't. For packets affected by this, if they were emitted in recent blocks, the `--block-history` flag can be used to have the standard relayer block scanning start at a block height that many blocks behind the current chain tip. However, if the stuck packet occurred at an old height, farther back than would be reasonable for the `--block-history` scan from historical to current, there is an additional set of flags that can be used to zoom in on the block heights where the stuck packet occurred.

For example, say a relayer is configured between Chain A and B. The relayer was not operational during the time a user on Chain A sends a packet to Chain B. Due to an issue in the queries to Chain A, the typical flush of the relayer does not relay the packet. Say that many days go by before recognition of the issue by the relayer operator. The relayer operator could start up the relayer with a massive `--block-history` to query all blocks from the time of the stuck packet until the current block, but that could take many hours to query through each block. Instead, the relayer operator can flush out the packet by doing the following:

```bash
rly start $PATH_NAME --stuck-packet-chain-id $CHAIN_A_CHAIN_ID --stuck-packet-height-start $CHAIN_A_STUCK_PACKET_HEIGHT --stuck-packet-height-end $CHAIN_A_STUCK_PACKET_HEIGHT -d
```

Alternatively, a flush can be run with these flags so that the relayer exits once it is done:

```bash
rly tx flush $PATH_NAME --stuck-packet-chain-id $CHAIN_A_CHAIN_ID --stuck-packet-height-start $CHAIN_A_STUCK_PACKET_HEIGHT --stuck-packet-height-end $CHAIN_A_STUCK_PACKET_HEIGHT -d
```

If the `CHAIN_A_STUCK_PACKET_HEIGHT` is not exactly known, the `stuck-packet-height-start` and `stuck-packet-height-end` flags can be placed at heights surrounding the range where the stuck packet is expected to be, for convenience of not needing to dig through every block to determine the exact height.

Note that this narrows the window of visibility that the relayer has into what has happened on the chain, since the relayer is only getting a picture of what happened between `stuck-packet-height-start` and `stuck-packet-height-end` and then starts observing the most recent blocks after that. If a packet was actually relayed properly in between `stuck-packet-height-end` and the chain tip, then the relayer would encounter errors trying to relay a packet that was already relayed. This feature should only be used by advanced users for zooming in on a troublesome packet.

---

Expand Down
29 changes: 22 additions & 7 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ type CosmosChainProcessor struct {
parsedGasPrices *sdk.DecCoins
}

func NewCosmosChainProcessor(log *zap.Logger, provider *CosmosProvider, metrics *processor.PrometheusMetrics) *CosmosChainProcessor {
func NewCosmosChainProcessor(
log *zap.Logger,
provider *CosmosProvider,
metrics *processor.PrometheusMetrics,
) *CosmosChainProcessor {
return &CosmosChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
Expand Down Expand Up @@ -208,7 +212,7 @@ type queryCyclePersistence struct {
// Run starts the query loop for the chain which will gather applicable ibc messages and push events out to the relevant PathProcessors.
// The initialBlockHistory parameter determines how many historical blocks should be fetched and processed before continuing with current blocks.
// ChainProcessors should obey the context and return upon context cancellation.
func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error {
func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory uint64, stuckPacket *processor.StuckPacket) error {
minQueryLoopDuration := ccp.chainProvider.PCfg.MinLoopDuration
if minQueryLoopDuration == 0 {
minQueryLoopDuration = defaultMinQueryLoopDuration
Expand Down Expand Up @@ -247,6 +251,10 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui
latestQueriedBlock = 0
}

if stuckPacket != nil && ccp.chainProvider.ChainId() == stuckPacket.ChainID {
latestQueriedBlock = int64(stuckPacket.StartHeight)
}

persistence.latestQueriedBlock = latestQueriedBlock

var eg errgroup.Group
Expand All @@ -266,7 +274,7 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui
defer ticker.Stop()

for {
if err := ccp.queryCycle(ctx, &persistence); err != nil {
if err := ccp.queryCycle(ctx, &persistence, stuckPacket); err != nil {
return err
}
select {
Expand Down Expand Up @@ -327,7 +335,7 @@ func (ccp *CosmosChainProcessor) initializeChannelState(ctx context.Context) err
return nil
}

func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *queryCyclePersistence) error {
func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *queryCyclePersistence, stuckPacket *processor.StuckPacket) error {
status, err := ccp.nodeStatusWithRetry(ctx)
if err != nil {
// don't want to cause CosmosChainProcessor to quit here, can retry again next cycle.
Expand Down Expand Up @@ -383,11 +391,11 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
var eg errgroup.Group
var blockRes *ctypes.ResultBlockResults
var ibcHeader provider.IBCHeader
i := i
sI := i
eg.Go(func() (err error) {
queryCtx, cancelQueryCtx := context.WithTimeout(ctx, blockResultsQueryTimeout)
defer cancelQueryCtx()
blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &i)
blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &sI)
if err != nil && ccp.metrics != nil {
ccp.metrics.IncBlockQueryFailure(chainID, "RPC Client")
}
Expand All @@ -396,7 +404,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
eg.Go(func() (err error) {
queryCtx, cancelQueryCtx := context.WithTimeout(ctx, queryTimeout)
defer cancelQueryCtx()
ibcHeader, err = ccp.chainProvider.QueryIBCHeader(queryCtx, i)
ibcHeader, err = ccp.chainProvider.QueryIBCHeader(queryCtx, sI)
if err != nil && ccp.metrics != nil {
ccp.metrics.IncBlockQueryFailure(chainID, "IBC Header")
}
Expand Down Expand Up @@ -467,6 +475,13 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
}

newLatestQueriedBlock = i

if stuckPacket != nil &&
ccp.chainProvider.ChainId() == stuckPacket.ChainID &&
newLatestQueriedBlock == int64(stuckPacket.EndHeight) {
i = persistence.latestHeight
ccp.log.Debug("Parsed stuck packet height, skipping to current")
}
}

if newLatestQueriedBlock == persistence.latestQueriedBlock {
Expand Down
17 changes: 8 additions & 9 deletions relayer/chains/cosmos/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,6 @@ func (cc *CosmosProvider) SendMsgsWith(ctx context.Context, msgs []sdk.Msg, memo
if err != nil {
return nil, err
}

adjusted = uint64(float64(adjusted) * cc.PCfg.GasAdjustment)
}

//Cannot feegrant your own TX
Expand Down Expand Up @@ -1668,6 +1666,9 @@ func (cc *CosmosProvider) PrepareFactory(txf tx.Factory, signingKey string) (tx.
txf = txf.WithGas(cc.PCfg.MinGasAmount)
}

if cc.PCfg.MaxGasAmount != 0 {
txf = txf.WithGas(cc.PCfg.MaxGasAmount)
}
txf, err = cc.SetWithExtensionOptions(txf)
if err != nil {
return tx.Factory{}, err
Expand All @@ -1676,21 +1677,19 @@ func (cc *CosmosProvider) PrepareFactory(txf tx.Factory, signingKey string) (tx.
}

// AdjustEstimatedGas adjusts the estimated gas usage by multiplying it by the gas adjustment factor
// and bounding the result by the maximum gas amount option. If the gas usage is zero, the adjusted gas
// is also zero. If the gas adjustment factor produces an infinite result, an error is returned.
// max-gas-amount is enforced.
// and return estimated gas is higher than max gas error. If the gas usage is zero, the adjusted gas
// is also zero.
func (cc *CosmosProvider) AdjustEstimatedGas(gasUsed uint64) (uint64, error) {
if gasUsed == 0 {
return gasUsed, nil
}
if cc.PCfg.MaxGasAmount > 0 && gasUsed > cc.PCfg.MaxGasAmount {
return 0, fmt.Errorf("estimated gas %d is higher than max gas %d", gasUsed, cc.PCfg.MaxGasAmount)
}
gas := cc.PCfg.GasAdjustment * float64(gasUsed)
if math.IsInf(gas, 1) {
return 0, fmt.Errorf("infinite gas used")
}
// Bound the gas estimate by the max_gas option
if cc.PCfg.MaxGasAmount > 0 {
gas = math.Min(gas, float64(cc.PCfg.MaxGasAmount))
}
return uint64(gas), nil
}

Expand Down
8 changes: 8 additions & 0 deletions relayer/chains/cosmos/tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ func TestCosmosProvider_AdjustEstimatedGas(t *testing.T) {
expectedGas: 75000,
expectedErr: nil,
},
{
name: "estimated gas is higher than max gas",
gasUsed: 50000,
gasAdjustment: 1.5,
maxGasAmount: 70000,
expectedGas: 75000,
expectedErr: fmt.Errorf("estimated gas 75000 is higher than max gas 70000"),
},
}

for _, tc := range testCases {
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/mock/mock_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type queryCyclePersistence struct {
latestQueriedBlock int64
}

func (mcp *MockChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error {
func (mcp *MockChainProcessor) Run(ctx context.Context, initialBlockHistory uint64, _ *processor.StuckPacket) error {
// this will be used for persistence across query cycle loop executions
persistence := queryCyclePersistence{
// would be query of latest height, mocking 20
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/penumbra/penumbra_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type queryCyclePersistence struct {
// Run starts the query loop for the chain which will gather applicable ibc messages and push events out to the relevant PathProcessors.
// The initialBlockHistory parameter determines how many historical blocks should be fetched and processed before continuing with current blocks.
// ChainProcessors should obey the context and return upon context cancellation.
func (pcp *PenumbraChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error {
func (pcp *PenumbraChainProcessor) Run(ctx context.Context, initialBlockHistory uint64, _ *processor.StuckPacket) error {
minQueryLoopDuration := pcp.chainProvider.PCfg.MinLoopDuration
if minQueryLoopDuration == 0 {
minQueryLoopDuration = defaultMinQueryLoopDuration
Expand Down
2 changes: 1 addition & 1 deletion relayer/processor/chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ChainProcessor interface {
// Run starts the query loop for the chain which will gather applicable ibc messages and push events out to the relevant PathProcessors.
// The initialBlockHistory parameter determines how many historical blocks should be fetched and processed before continuing with current blocks.
// ChainProcessors should obey the context and return upon context cancellation.
Run(ctx context.Context, initialBlockHistory uint64) error
Run(ctx context.Context, initialBlockHistory uint64, stuckPacket *StuckPacket) error

// Provider returns the ChainProvider, which provides the methods for querying, assembling IBC messages, and sending transactions.
Provider() provider.ChainProvider
Expand Down
10 changes: 9 additions & 1 deletion relayer/processor/event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type EventProcessorBuilder struct {
initialBlockHistory uint64
pathProcessors PathProcessors
messageLifecycle MessageLifecycle
stuckPacket *StuckPacket
}

// EventProcessor is a built instance that is ready to be executed with Run(ctx).
Expand All @@ -20,6 +21,7 @@ type EventProcessor struct {
initialBlockHistory uint64
pathProcessors PathProcessors
messageLifecycle MessageLifecycle
stuckPacket *StuckPacket
}

// NewEventProcessor creates a builder than can be used to construct a multi-ChainProcessor, multi-PathProcessor topology for the relayer.
Expand Down Expand Up @@ -61,6 +63,12 @@ func (ep EventProcessorBuilder) WithMessageLifecycle(messageLifecycle MessageLif
return ep
}

// WithStuckPacket sets the stuck packet configuration.
func (ep EventProcessorBuilder) WithStuckPacket(stuckPacket *StuckPacket) EventProcessorBuilder {
ep.stuckPacket = stuckPacket
return ep
}

// Build links the relevant ChainProcessors and PathProcessors, then returns an EventProcessor that can be used to run the ChainProcessors and PathProcessors.
func (ep EventProcessorBuilder) Build() EventProcessor {
for _, chainProcessor := range ep.chainProcessors {
Expand Down Expand Up @@ -95,7 +103,7 @@ func (ep EventProcessor) Run(ctx context.Context) error {
for _, chainProcessor := range ep.chainProcessors {
chainProcessor := chainProcessor
eg.Go(func() error {
err := chainProcessor.Run(runCtx, ep.initialBlockHistory)
err := chainProcessor.Run(runCtx, ep.initialBlockHistory, ep.stuckPacket)
// Signal the other chain processors to exit.
runCtxCancel()
return err
Expand Down
Loading

0 comments on commit 2d982ba

Please sign in to comment.