From 60f41ce0be9bbf7184d6efc2273f73b4d0f19cfd Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Mon, 27 Mar 2023 15:10:56 -0600 Subject: [PATCH 1/2] Fix flush on ordered channels --- relayer/processor/path_processor_internal.go | 42 ++++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 7112d46c8..03682d2a3 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1089,15 +1089,51 @@ func queuePendingRecvAndAcks( return nil } - unrecv, err := dst.chainProvider.QueryUnreceivedPackets(ctx, dst.latestBlock.Height, k.CounterpartyChannelID, k.CounterpartyPortID, seqs) + dstChan, dstPort := k.CounterpartyChannelID, k.CounterpartyPortID + + unrecv, err := dst.chainProvider.QueryUnreceivedPackets(ctx, dst.latestBlock.Height, dstChan, dstPort, seqs) if err != nil { return err } + dstHeight := int64(dst.latestBlock.Height) + if len(unrecv) > 0 { - src.log.Debug("Will flush MsgRecvPacket", zap.String("channel", k.ChannelID), zap.String("port", k.PortID), zap.Uint64s("sequences", unrecv)) + channel, err := dst.chainProvider.QueryChannel(ctx, dstHeight, dstChan, dstPort) + if err != nil { + return err + } + + if channel.Channel.Ordering == chantypes.ORDERED { + nextSeqRecv, err := dst.chainProvider.QueryNextSeqRecv(ctx, dstHeight, dstChan, dstPort) + if err != nil { + return err + } + + var newUnrecv []uint64 + + for _, seq := range unrecv { + if seq == nextSeqRecv.NextSequenceReceive { + newUnrecv = append(newUnrecv, seq) + break + } + } + + unrecv = newUnrecv + } + } + + if len(unrecv) > 0 { + src.log.Debug("Will flush MsgRecvPacket", + zap.String("channel", k.ChannelID), + zap.String("port", k.PortID), + zap.Uint64s("sequences", unrecv), + ) } else { - src.log.Debug("No MsgRecvPacket to flush", zap.String("channel", k.ChannelID), zap.String("port", k.PortID)) + src.log.Debug("No MsgRecvPacket to flush", + zap.String("channel", k.ChannelID), + zap.String("port", k.PortID), + ) } for _, seq := range unrecv { From b1545b7221757f4bdc5bd0e82c56f10da05d827d Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Mon, 27 Mar 2023 15:40:10 -0600 Subject: [PATCH 2/2] Queue all packets at nextseqrecv or above --- relayer/processor/path_processor_internal.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 03682d2a3..040808103 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1113,7 +1113,7 @@ func queuePendingRecvAndAcks( var newUnrecv []uint64 for _, seq := range unrecv { - if seq == nextSeqRecv.NextSequenceReceive { + if seq >= nextSeqRecv.NextSequenceReceive { newUnrecv = append(newUnrecv, seq) break }