Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow relaying based on first block #18

Merged
merged 25 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const (

defaultMinQueryLoopDuration = 1 * time.Second
defaultBalanceUpdateWaitDuration = 60 * time.Second
inSyncNumBlocksThreshold = 2
defaultInSyncNumBlocksThreshold = 2
blockMaxRetries = 5
)

Expand Down Expand Up @@ -201,6 +201,7 @@ func (ccp *CosmosChainProcessor) clientState(ctx context.Context, clientID strin

// queryCyclePersistence hold the variables that should be retained across queryCycles.
type queryCyclePersistence struct {
// the latest known height of the chain
latestHeight int64
latestQueriedBlock int64
retriesAtLatestQueriedBlock int
Expand Down Expand Up @@ -366,7 +367,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
firstTimeInSync := false

if !ccp.inSync {
if (persistence.latestHeight - persistence.latestQueriedBlock) < inSyncNumBlocksThreshold {
if (persistence.latestHeight - persistence.latestQueriedBlock) < int64(defaultInSyncNumBlocksThreshold) {
ccp.inSync = true
firstTimeInSync = true
ccp.log.Info("Chain is in sync")
Expand All @@ -390,7 +391,14 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu

chainID := ccp.chainProvider.ChainId()

for i := persistence.latestQueriedBlock + 1; i <= persistence.latestHeight; i++ {
firstHeightToQuery := persistence.latestQueriedBlock
// On the first ever update, we want to make sure we propagate the block info to the path processor
// Afterward, we only want to query new blocks
if ccp.inSync && !firstTimeInSync {
firstHeightToQuery++
}

for i := firstHeightToQuery; i <= persistence.latestHeight; i++ {
var (
eg errgroup.Group
blockRes *coretypes.ResultBlockResults
Expand Down Expand Up @@ -491,7 +499,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
}
}

if newLatestQueriedBlock == persistence.latestQueriedBlock {
if (ccp.inSync && !firstTimeInSync) && newLatestQueriedBlock == persistence.latestQueriedBlock {
return nil
}

Expand All @@ -516,6 +524,8 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
continue
}

ccp.log.Debug("sending new data to the path processor", zap.Bool("inSync", ccp.inSync))

pp.HandleNewData(chainID, processor.ChainProcessorCacheData{
LatestBlock: ccp.latestBlock,
LatestHeader: latestHeader,
Expand Down
11 changes: 6 additions & 5 deletions relayer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func (c *Chain) CreateClients(ctx context.Context,
customClientTrustingPeriod,
maxClockDrift time.Duration,
customClientTrustingPeriodPercentage int64,
memo string) (string, string, error) {
memo string,
) (string, string, error) {
// Query the latest heights on src and dst and retry if the query fails
var srch, dsth int64
if err := retry.Do(func() error {
Expand Down Expand Up @@ -64,7 +65,7 @@ func (c *Chain) CreateClients(ctx context.Context,
}

// overriding the unbonding period should only be possible when creating single clients at a time (CreateClient)
var overrideUnbondingPeriod = time.Duration(0)
overrideUnbondingPeriod := time.Duration(0)

var clientSrc, clientDst string
eg, egCtx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -126,7 +127,8 @@ func CreateClient(
overrideUnbondingPeriod,
maxClockDrift time.Duration,
customClientTrustingPeriodPercentage int64,
memo string) (string, error) {
memo string,
) (string, error) {
// If a client ID was specified in the path and override is not set, ensure the client exists.
if !override && src.PathEnd.ClientID != "" {
// TODO: check client is not expired
Expand Down Expand Up @@ -316,7 +318,7 @@ func MsgUpdateClient(
eg.Go(func() error {
return retry.Do(func() error {
var err error
dstTrustedHeader, err = src.ChainProvider.QueryIBCHeader(egCtx, int64(dstClientState.GetLatestHeight().GetRevisionHeight()))
dstTrustedHeader, err = src.ChainProvider.QueryIBCHeader(egCtx, int64(dstClientState.GetLatestHeight().GetRevisionHeight())+1)
return err
}, retry.Context(egCtx), RtyAtt, RtyDel, RtyErr, retry.OnRetry(func(n uint, err error) {
src.log.Info(
Expand Down Expand Up @@ -518,7 +520,6 @@ func findMatchingClient(ctx context.Context, src, dst *Chain, newClientState ibc

for _, existingClientState := range clientsResp {
clientID, err := provider.ClientsMatch(ctx, src.ChainProvider, dst.ChainProvider, existingClientState, newClientState)

// If there is an error parsing/type asserting the client state in ClientsMatch this is going
// to make the entire find matching client logic fail.
// We should really never be encountering an error here and if we do it is probably a sign of a
Expand Down
47 changes: 24 additions & 23 deletions relayer/processor/message_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,21 @@ func (mp *messageProcessor) processMessages(
var err error
needsClientUpdate, err = mp.shouldUpdateClientNow(ctx, src, dst)
if err != nil {
return err
return fmt.Errorf("should update client now: %w", err)
}

if err := mp.assembleMsgUpdateClient(ctx, src, dst); err != nil {
return err
return fmt.Errorf("assemble message update client: %w", err)
}
}

mp.assembleMessages(ctx, messages, src, dst)

return mp.trackAndSendMessages(ctx, src, dst, needsClientUpdate)
if err := mp.trackAndSendMessages(ctx, src, dst, needsClientUpdate); err != nil {
return fmt.Errorf("track and send messages: %w", err)
}

return nil
}

func isLocalhostClient(srcClientID, dstClientID string) bool {
Expand All @@ -136,7 +140,7 @@ func (mp *messageProcessor) shouldUpdateClientNow(ctx context.Context, src, dst
if dst.clientState.ConsensusTime.IsZero() {
h, err := src.chainProvider.QueryIBCHeader(ctx, int64(dst.clientState.ConsensusHeight.RevisionHeight))
if err != nil {
return false, fmt.Errorf("failed to get header height: %w", err)
return false, fmt.Errorf("query ibc header: %w", err)
}
consensusHeightTime = time.Unix(0, int64(h.ConsensusState().GetTimestamp()))
} else {
Expand Down Expand Up @@ -166,16 +170,18 @@ func (mp *messageProcessor) shouldUpdateClientNow(ctx context.Context, src, dst
mp.metrics.SetClientTrustingPeriod(src.info.PathName, dst.info.ChainID, dst.info.ClientID, time.Duration(dst.clientState.TrustingPeriod))
}

if shouldUpdateClientNow {
mp.log.Info("Client update threshold condition met",
zap.String("path_name", src.info.PathName),
zap.String("chain_id", dst.info.ChainID),
zap.String("client_id", dst.info.ClientID),
zap.Int64("trusting_period", dst.clientState.TrustingPeriod.Milliseconds()),
zap.Int64("time_since_client_update", time.Since(consensusHeightTime).Milliseconds()),
zap.Int64("client_threshold_time", mp.clientUpdateThresholdTime.Milliseconds()),
)
}
mp.log.Debug("should update client now?",
zap.String("path_name", src.info.PathName),
zap.String("chain_id", dst.info.ChainID),
zap.String("client_id", dst.info.ClientID),
zap.Int64("trusting_period", dst.clientState.TrustingPeriod.Milliseconds()),
zap.Int64("time_since_client_update", time.Since(consensusHeightTime).Milliseconds()),
zap.Int64("client_threshold_time", mp.clientUpdateThresholdTime.Milliseconds()),
zap.Bool("enough_blocks_passed", enoughBlocksPassed),
zap.Bool("past_two_thirds_trusting_period", pastTwoThirdsTrustingPeriod),
zap.Bool("past_configured_client_update_threshold", pastConfiguredClientUpdateThreshold),
zap.Bool("should_update_client_now", shouldUpdateClientNow),
)

return shouldUpdateClientNow, nil
}
Expand Down Expand Up @@ -295,11 +301,6 @@ func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, ds
trustedNextValidatorsHash = header.NextValidatorsHash()
}

// As we only require one chain to be in sync the src.latestHeader may be nil. In that case
// we want to skip it
if src.latestHeader == nil {
return fmt.Errorf("latest header is nil for chain_id: %s. Waiting for catching up", src.info.ChainID)
}
if src.latestHeader.Height() == trustedConsensusHeight.RevisionHeight &&
!bytes.Equal(src.latestHeader.NextValidatorsHash(), trustedNextValidatorsHash) {
return fmt.Errorf("latest header height is equal to the client trusted height: %d, "+
Expand All @@ -313,12 +314,12 @@ func (mp *messageProcessor) assembleMsgUpdateClient(ctx context.Context, src, ds
dst.clientTrustedState.IBCHeader,
)
if err != nil {
return fmt.Errorf("error assembling new client header: %w", err)
return fmt.Errorf("msg update client header: %w", err)
}

msgUpdateClient, err := dst.chainProvider.MsgUpdateClient(clientID, msgUpdateClientHeader)
if err != nil {
return fmt.Errorf("error assembling MsgUpdateClient: %w", err)
return fmt.Errorf("msg update client: %w", err)
}

mp.msgUpdateClient = msgUpdateClient
Expand Down Expand Up @@ -475,7 +476,7 @@ func (mp *messageProcessor) sendBatchMessages(
}
callbacks := []func(rtr *provider.RelayerTxResponse, err error){callback}

//During testing, this adds a callback so our test case can inspect the TX results
// During testing, this adds a callback so our test case can inspect the TX results
if PathProcMessageCollector != nil {
testCallback := func(rtr *provider.RelayerTxResponse, err error) {
msgResult := &PathProcessorMessageResp{
Expand Down Expand Up @@ -562,7 +563,7 @@ func (mp *messageProcessor) sendSingleMessage(

callbacks = append(callbacks, callback)

//During testing, this adds a callback so our test case can inspect the TX results
// During testing, this adds a callback so our test case can inspect the TX results
if PathProcMessageCollector != nil {
testCallback := func(rtr *provider.RelayerTxResponse, err error) {
msgResult := &PathProcessorMessageResp{
Expand Down
7 changes: 7 additions & 0 deletions relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,17 @@ func (pathEnd *pathEndRuntime) mergeCacheData(
memoLimit, maxReceiverSize int,
) {
pathEnd.lastClientUpdateHeightMu.Lock()
var zeroType provider.LatestBlock
if d.LatestBlock == zeroType {
// sanity check
panic("received zero type latest block")
}
pathEnd.latestBlock = d.LatestBlock
pathEnd.lastClientUpdateHeightMu.Unlock()

pathEnd.inSync = d.InSync
pathEnd.log.Debug("set in sync", zap.Bool("in_sync", pathEnd.inSync), zap.String("chain_id", pathEnd.info.ChainID))

pathEnd.latestHeader = d.LatestHeader
pathEnd.clientState = d.ClientState

Expand Down
6 changes: 4 additions & 2 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun
func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
var retryTimer *time.Timer

pp.flushTimer = time.NewTimer(time.Hour)
pp.flushTimer = time.NewTimer(pp.flushInterval)

for {
// block until we have any signals to process
Expand All @@ -406,7 +406,8 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {
}
}

if !pp.pathEnd1.inSync && !pp.pathEnd2.inSync {
pp.log.Debug("path processor run: are the chains in sync? ", zap.Bool("pathEnd1", pp.pathEnd1.inSync), zap.Bool("pathEnd2", pp.pathEnd2.inSync))
if !pp.pathEnd1.inSync || !pp.pathEnd2.inSync {
continue
}

Expand All @@ -420,6 +421,7 @@ func (pp *PathProcessor) Run(ctx context.Context, cancel func()) {

// process latest message cache state from both pathEnds
if err := pp.processLatestMessages(ctx, cancel); err != nil {
pp.log.Debug("error process latest messages", zap.Error(err))
// in case of IBC message send errors, schedule retry after durationErrorRetry
if retryTimer != nil {
retryTimer.Stop()
Expand Down
12 changes: 8 additions & 4 deletions relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1069,11 +1069,17 @@ func (pp *PathProcessor) processLatestMessages(ctx context.Context, cancel func(
var eg errgroup.Group
eg.Go(func() error {
mp := newMessageProcessor(pp.log, pp.metrics, pp.memo, pp.clientUpdateThresholdTime, pp.isLocalhost)
return mp.processMessages(ctx, pathEnd1Messages, pp.pathEnd2, pp.pathEnd1)
if err := mp.processMessages(ctx, pathEnd1Messages, pp.pathEnd2, pp.pathEnd1); err != nil {
return fmt.Errorf("process path end 1 messages: %w", err)
}
return nil
})
eg.Go(func() error {
mp := newMessageProcessor(pp.log, pp.metrics, pp.memo, pp.clientUpdateThresholdTime, pp.isLocalhost)
return mp.processMessages(ctx, pathEnd2Messages, pp.pathEnd1, pp.pathEnd2)
if err := mp.processMessages(ctx, pathEnd2Messages, pp.pathEnd1, pp.pathEnd2); err != nil {
return fmt.Errorf("process path end 2 messages: %w", err)
}
return nil
})
return eg.Wait()
}
Expand Down Expand Up @@ -1207,7 +1213,6 @@ func (pp *PathProcessor) queuePendingRecvAndAcks(
srcMu sync.Locker,
dstMu sync.Locker,
) (*skippedPackets, error) {

if len(seqs) == 0 {
src.log.Debug("Nothing to flush", zap.String("channel", k.ChannelID), zap.String("port", k.PortID))
if pp.metrics != nil {
Expand Down Expand Up @@ -1502,7 +1507,6 @@ func (pp *PathProcessor) flush(ctx context.Context) error {
&pathEnd2CacheMu,
&pathEnd1CacheMu,
)

if err != nil {
return err
}
Expand Down
Loading