Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account for moving channel state to flush complete in timeout processing #4425

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions modules/core/04-channel/keeper/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func (k Keeper) TimeoutPacket(

// TimeoutExecuted deletes the commitment send from this chain after it verifies timeout.
// If the timed-out packet came from an ORDERED channel then this channel will be closed.
// If the channel is in the FLUSHING state and there is a counterparty upgrade, then the
// upgrade will be aborted if the upgrade has timed out. Otherwise, if there are no more inflight packets,
// then the channel will be set to the FLUSHCOMPLETE state.
//
// CONTRACT: this function must be called in the IBC handler
func (k Keeper) TimeoutExecuted(
Expand All @@ -154,14 +157,36 @@ func (k Keeper) TimeoutExecuted(

k.deletePacketCommitment(ctx, packet.GetSourcePort(), packet.GetSourceChannel(), packet.GetSequence())

if channel.FlushStatus == types.FLUSHING && !k.HasInflightPackets(ctx, packet.GetSourcePort(), packet.GetSourceChannel()) {
channel.FlushStatus = types.FLUSHCOMPLETE
k.SetChannel(ctx, packet.GetSourcePort(), packet.GetSourceChannel(), channel)
}

if channel.Ordering == types.ORDERED {
channel.Close()
k.SetChannel(ctx, packet.GetSourcePort(), packet.GetSourceChannel(), channel)
Comment on lines 160 to 162
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be slightly funky.

Does channe.Close() mutate the channel (I assume so).

Should this check be after the flushing state checks.
Otherwise you'll close the channel with a timed out packet on an ordered channel, and have this upgrade info laying around in store.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think you're right! We cane move this to after the new flushing check, nice catch

emitChannelClosedEvent(ctx, packet, channel)
}

// TODO: handle situation outlined in https://github.com/cosmos/ibc-go/issues/4454
// if an upgrade is in progress, handling packet flushing and update channel state appropriately
if channel.State == types.STATE_FLUSHING {
counterpartyUpgrade, found := k.GetCounterpartyUpgrade(ctx, packet.GetSourcePort(), packet.GetSourceChannel())
if !found {
return errorsmod.Wrapf(types.ErrUpgradeNotFound, "counterparty upgrade not found for channel: %s", packet.GetSourceChannel())
}

timeout := counterpartyUpgrade.Timeout
// if the timeout is valid then use it, otherwise it has not been set in the upgrade handshake yet.
if timeout.IsValid() {
if hasPassed, err := timeout.HasPassed(ctx); hasPassed {
// packet flushing timeout has expired, abort the upgrade and return nil,
// committing an error receipt to state, restoring the channel and successfully timing out the packet.
k.MustAbortUpgrade(ctx, packet.GetSourcePort(), packet.GetSourceChannel(), err)
return nil
}

// set the channel state to flush complete if all packets have been flushed.
if !k.HasInflightPackets(ctx, packet.GetSourcePort(), packet.GetSourceChannel()) {
channel.State = types.STATE_FLUSHCOMPLETE
k.SetChannel(ctx, packet.GetSourcePort(), packet.GetSourceChannel(), channel)
}
}
}

k.Logger(ctx).Info(
Expand All @@ -176,10 +201,6 @@ func (k Keeper) TimeoutExecuted(
// emit an event marking that we have processed the timeout
emitTimeoutPacketEvent(ctx, packet, channel)

if channel.Ordering == types.ORDERED && channel.IsClosed() {
emitChannelClosedEvent(ctx, packet, channel)
}

return nil
}

Expand Down
174 changes: 137 additions & 37 deletions modules/core/04-channel/keeper/timeout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,43 +286,6 @@ func (suite *KeeperTestSuite) TestTimeoutExecuted() {
suite.Require().Equal(channel.FlushStatus, types.NOTINFLUSH)
},
},
// {
// "success UNORDERED channel in FLUSHING state",
// func() {
// suite.coordinator.Setup(path)

// timeoutHeight := clienttypes.GetSelfHeight(suite.chainB.GetContext())
// timeoutTimestamp := uint64(suite.chainB.GetContext().BlockTime().UnixNano())

// sequence, err := path.EndpointA.SendPacket(timeoutHeight, timeoutTimestamp, ibctesting.MockPacketData)
// suite.Require().NoError(err)

// packet = types.NewPacket(ibctesting.MockPacketData, sequence, path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID, path.EndpointB.ChannelConfig.PortID, path.EndpointB.ChannelID, timeoutHeight, timeoutTimestamp)
// chanCap = suite.chainA.GetChannelCapability(path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID)

// // Move channel to FLUSHING state
// path.EndpointA.ChannelConfig.ProposedUpgrade.Fields.Version = mock.UpgradeVersion
// path.EndpointB.ChannelConfig.ProposedUpgrade.Fields.Version = mock.UpgradeVersion

// err = path.EndpointA.ChanUpgradeInit()
// suite.Require().NoError(err)

// err = path.EndpointB.ChanUpgradeTry()
// suite.Require().NoError(err)

// err = path.EndpointA.ChanUpgradeAck()
// suite.Require().NoError(err)
// },
// func(packetCommitment []byte, err error) {
// suite.Require().NoError(err)
// suite.Require().Nil(packetCommitment)

// // Check flush status has been set to FLUSHCOMPLETE
// channel := path.EndpointA.GetChannel()
// suite.Require().Equal(channel.State, types.ACKUPGRADE)
// suite.Require().Equal(channel.FlushStatus, types.FLUSHCOMPLETE)
// },
// },
{
"channel not found",
func() {
Expand Down Expand Up @@ -361,6 +324,143 @@ func (suite *KeeperTestSuite) TestTimeoutExecuted() {
suite.Require().NotNil(packetCommitment)
},
},
{
"set to flush complete with no inflight packets",
func() {
suite.coordinator.Setup(path)

chanCap = suite.chainA.GetChannelCapability(path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID)

channel := path.EndpointA.GetChannel()
channel.State = types.STATE_FLUSHING
path.EndpointA.SetChannel(channel)
path.EndpointA.SetChannelCounterpartyUpgrade(types.Upgrade{
Timeout: types.DefaultTimeout,
})
},
func(packetCommitment []byte, err error) {
suite.Require().NoError(err)

channel := path.EndpointA.GetChannel()
suite.Require().Equal(types.STATE_FLUSHCOMPLETE, channel.State, "channel state should still be set to FLUSHCOMPLETE")
},
},
{
"conterparty upgrade timeout is invalid",
func() {
suite.coordinator.Setup(path)

timeoutHeight := clienttypes.GetSelfHeight(suite.chainB.GetContext())
timeoutTimestamp := uint64(suite.chainB.GetContext().BlockTime().UnixNano())

sequence, err := path.EndpointA.SendPacket(timeoutHeight, timeoutTimestamp, ibctesting.MockPacketData)
suite.Require().NoError(err)

packet = types.NewPacket(ibctesting.MockPacketData, sequence, path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID, path.EndpointB.ChannelConfig.PortID, path.EndpointB.ChannelID, timeoutHeight, timeoutTimestamp)
chanCap = suite.chainA.GetChannelCapability(path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID)

channel := path.EndpointA.GetChannel()
channel.State = types.STATE_FLUSHING
path.EndpointA.SetChannel(channel)
path.EndpointA.SetChannelCounterpartyUpgrade(types.Upgrade{})
},
func(packetCommitment []byte, err error) {
suite.Require().NoError(err)

channel := path.EndpointA.GetChannel()
suite.Require().Equal(types.STATE_FLUSHING, channel.State, "channel state should still be FLUSHING")
},
},
{
"conterparty upgrade timed out (abort)",
func() {
suite.coordinator.Setup(path)

timeoutHeight := clienttypes.GetSelfHeight(suite.chainB.GetContext())
timeoutTimestamp := uint64(suite.chainB.GetContext().BlockTime().UnixNano())

sequence, err := path.EndpointA.SendPacket(timeoutHeight, timeoutTimestamp, ibctesting.MockPacketData)
suite.Require().NoError(err)

packet = types.NewPacket(ibctesting.MockPacketData, sequence, path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID, path.EndpointB.ChannelConfig.PortID, path.EndpointB.ChannelID, timeoutHeight, timeoutTimestamp)
chanCap = suite.chainA.GetChannelCapability(path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID)

channel := path.EndpointA.GetChannel()
channel.State = types.STATE_FLUSHING
path.EndpointA.SetChannel(channel)
path.EndpointA.SetChannelUpgrade(types.Upgrade{
Fields: path.EndpointA.GetProposedUpgrade().Fields,
Timeout: types.NewTimeout(clienttypes.ZeroHeight(), 1),
})
path.EndpointA.SetChannelCounterpartyUpgrade(types.Upgrade{
Timeout: types.NewTimeout(clienttypes.ZeroHeight(), 1),
})
},
func(packetCommitment []byte, err error) {
suite.Require().NoError(err)

channel := path.EndpointA.GetChannel()
suite.Require().Equal(types.OPEN, channel.State, "channel state should still be OPEN")

upgrade, found := suite.chainA.App.GetIBCKeeper().ChannelKeeper.GetUpgrade(suite.chainA.GetContext(), path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID)
suite.Require().False(found, "upgrade should not be present")
suite.Require().Equal(types.Upgrade{}, upgrade, "upgrade should be zero value")

upgrade, found = suite.chainA.App.GetIBCKeeper().ChannelKeeper.GetCounterpartyUpgrade(suite.chainA.GetContext(), path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID)
suite.Require().False(found, "counterparty upgrade should not be present")
suite.Require().Equal(types.Upgrade{}, upgrade, "counterparty upgrade should be zero value")

errorReceipt, found := suite.chainA.App.GetIBCKeeper().ChannelKeeper.GetUpgradeErrorReceipt(suite.chainA.GetContext(), path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID)
suite.Require().True(found, "error receipt should be present")
suite.Require().Equal(channel.UpgradeSequence, errorReceipt.Sequence, "error receipt sequence should be equal to channel upgrade sequence")
},
},
{
"conterparty upgrade has not timed out with in-flight packets",
func() {
suite.coordinator.Setup(path)

timeoutHeight := clienttypes.GetSelfHeight(suite.chainB.GetContext())
timeoutTimestamp := uint64(suite.chainB.GetContext().BlockTime().UnixNano())

// we are sending two packets here as one will be removed in TimeoutExecuted. This is to ensure that
// there is still an in-flight packet so that the channel remains in the flushing state.
_, err := path.EndpointA.SendPacket(timeoutHeight, timeoutTimestamp, ibctesting.MockPacketData)
suite.Require().NoError(err)

sequence, err := path.EndpointA.SendPacket(timeoutHeight, timeoutTimestamp, ibctesting.MockPacketData)
suite.Require().NoError(err)

packet = types.NewPacket(ibctesting.MockPacketData, sequence, path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID, path.EndpointB.ChannelConfig.PortID, path.EndpointB.ChannelID, timeoutHeight, timeoutTimestamp)
chanCap = suite.chainA.GetChannelCapability(path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID)

channel := path.EndpointA.GetChannel()
channel.State = types.STATE_FLUSHING
path.EndpointA.SetChannel(channel)
path.EndpointA.SetChannelUpgrade(types.Upgrade{
Fields: path.EndpointA.GetProposedUpgrade().Fields,
Timeout: types.DefaultTimeout,
})
path.EndpointA.SetChannelCounterpartyUpgrade(types.Upgrade{
Timeout: types.DefaultTimeout,
})
},
func(packetCommitment []byte, err error) {
suite.Require().NoError(err)

channel := path.EndpointA.GetChannel()
suite.Require().Equal(types.STATE_FLUSHING, channel.State, "channel state should still be FLUSHING")

_, found := suite.chainA.App.GetIBCKeeper().ChannelKeeper.GetUpgrade(suite.chainA.GetContext(), path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID)
suite.Require().True(found, "upgrade should not be deleted")

_, found = suite.chainA.App.GetIBCKeeper().ChannelKeeper.GetCounterpartyUpgrade(suite.chainA.GetContext(), path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID)
suite.Require().True(found, "counterparty upgrade should not be deleted")

_, found = suite.chainA.App.GetIBCKeeper().ChannelKeeper.GetUpgradeErrorReceipt(suite.chainA.GetContext(), path.EndpointA.ChannelConfig.PortID, path.EndpointA.ChannelID)
suite.Require().False(found, "error receipt should not be written")
},
},
}

for i, tc := range testCases {
Expand Down
Loading