From 9d63b94c3bd028e5b6fec4fbbb87ad509eea5bd0 Mon Sep 17 00:00:00 2001 From: t-bast Date: Wed, 4 Nov 2020 09:34:18 +0100 Subject: [PATCH] PostRestartHtlcCleaner refactoring Separate computation of incoming HTLCs and HTLCs relayed out, to allow plugins to inject their own at each step. --- .../scala/fr/acinq/eclair/PluginParams.scala | 56 ++++++++++++++++ .../acinq/eclair/channel/ChannelEvents.scala | 2 +- .../relay/PostRestartHtlcCleaner.scala | 67 +++++++++++-------- 3 files changed, 96 insertions(+), 29 deletions(-) create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/PluginParams.scala diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/PluginParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/PluginParams.scala new file mode 100644 index 0000000000..a2f7de0264 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/PluginParams.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair + +import fr.acinq.bitcoin.ByteVector32 +import fr.acinq.eclair.channel.Origin +import fr.acinq.eclair.payment.relay.PostRestartHtlcCleaner.IncomingHtlc + +/** Custom plugin parameters. */ +sealed trait PluginParams { + /** Plugin's friendly name. */ + def name: String +} + +/** Parameters for a plugin that adds support for an experimental or unofficial Bolt9 feature. */ +trait CustomFeaturePlugin extends PluginParams { + /** A set of LightningMessage tags that the plugin wants to listen to. */ + def messageTags: Set[Int] + + /** Feature bit that the plugin wants to advertise through Init message. */ + def feature: Feature + + /** Plugin feature is always defined as unknown and optional. */ + def pluginFeature: UnknownFeature = UnknownFeature(feature.optional) +} + +/** Parameters for a plugin that defines custom commitment transactions (or non-standard HTLCs). */ +trait CustomCommitmentsPlugin extends PluginParams { + /** + * If we do nothing after a restart, incoming HTLCs that were committed upstream but not relayed will eventually + * expire. If your plugin defines non-standard HTLCs, and they need to be automatically failed, they should be + * returned by this method. + */ + def getIncomingHtlcs: Seq[IncomingHtlc] + + /** + * Outgoing HTLC sets that are still pending may either succeed or fail: we need to watch them to properly forward the + * result upstream to preserve channels. If you have non-standard HTLCs that may be in this situation, they should be + * returned by this method. + */ + def getHtlcsRelayedOut(htlcsIn: Seq[IncomingHtlc]): Map[Origin, Set[(ByteVector32, Long)]] +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala index b62cf8bf4f..1aa809da8a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala @@ -42,7 +42,7 @@ case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortC case class LocalChannelDown(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey) extends ChannelEvent -case class ChannelStateChanged(channel: ActorRef, peer: ActorRef, remoteNodeId: PublicKey, previousState: State, currentState: State, currentData: Data) extends ChannelEvent +case class ChannelStateChanged(channel: ActorRef, channelId: ByteVector32, peer: ActorRef, remoteNodeId: PublicKey, previousState: State, currentState: State, commitments: AbstractCommitments) extends ChannelEvent case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) extends ChannelEvent diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala index ebc9b75c24..f9fe6775b6 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala @@ -29,7 +29,7 @@ import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPacket, PaymentFa import fr.acinq.eclair.transactions.DirectedHtlc.outgoing import fr.acinq.eclair.transactions.OutgoingHtlc import fr.acinq.eclair.wire.{TemporaryNodeFailure, UpdateAddHtlc} -import fr.acinq.eclair.{LongToBtcAmount, NodeParams} +import fr.acinq.eclair.{CustomCommitmentsPlugin, LongToBtcAmount, NodeParams} import scala.concurrent.Promise import scala.util.Try @@ -58,10 +58,31 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial context.system.eventStream.subscribe(self, classOf[ChannelStateChanged]) + // If we do nothing after a restart, incoming HTLCs that were committed upstream but not relayed will eventually + // expire and we won't lose money, but the channel will get closed, which is a major inconvenience. We want to detect + // this and fast-fail those HTLCs and thus preserve channels. + // + // Outgoing HTLC sets that are still pending may either succeed or fail: we need to watch them to properly forward the + // result upstream to preserve channels. val brokenHtlcs = { val channels = listLocalChannels(nodeParams.db.channels) cleanupRelayDb(channels, nodeParams.db.pendingRelay) - checkBrokenHtlcs(channels, nodeParams.db.payments, nodeParams.privateKey) + val htlcsIn = { + val standardHtlcs = getIncomingHtlcs(channels, nodeParams.db.payments, nodeParams.privateKey) + val nonStandardHtlcs = nodeParams.pluginParams.collect { case p: CustomCommitmentsPlugin => p.getIncomingHtlcs }.flatten + standardHtlcs ++ nonStandardHtlcs + } + val relayedOut = { + // TODO: this part needs unit tests to verify maps are correctly merged + val standardRelayedOut = getHtlcsRelayedOut(channels, htlcsIn) + val nonStandardRelayedOut = nodeParams.pluginParams.collect { case p: CustomCommitmentsPlugin => p.getHtlcsRelayedOut(htlcsIn) }.flatten.toMap + standardRelayedOut ++ nonStandardRelayedOut + } + val notRelayed = htlcsIn.filterNot(htlcIn => relayedOut.keys.exists(origin => matchesOrigin(htlcIn.add, origin))) + log.info(s"htlcsIn=${htlcsIn.length} notRelayed=${notRelayed.length} relayedOut=${relayedOut.values.flatten.size}") + log.info("notRelayed={}", notRelayed.map(htlc => (htlc.add.channelId, htlc.add.id))) + log.info("relayedOut={}", relayedOut) + BrokenHtlcs(notRelayed, relayedOut, Set.empty) } Metrics.PendingNotRelayed.update(brokenHtlcs.notRelayed.size) @@ -74,12 +95,12 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial def main(brokenHtlcs: BrokenHtlcs): Receive = { // When channels are restarted we immediately fail the incoming HTLCs that weren't relayed. - case e@ChannelStateChanged(channel, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING | CLOSING, NORMAL | SHUTDOWN | CLOSING | CLOSED, data: HasCommitments) => - log.debug("channel {}: {} -> {}", data.channelId, e.previousState, e.currentState) + case e@ChannelStateChanged(channel, channelId, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING | CLOSING, NORMAL | SHUTDOWN | CLOSING | CLOSED, commitments: AbstractCommitments) => + log.debug("channel {}: {} -> {}", channelId, e.previousState, e.currentState) val acked = brokenHtlcs.notRelayed - .filter(_.add.channelId == data.channelId) // only consider htlcs coming from this channel + .filter(_.add.channelId == channelId) // only consider htlcs coming from this channel .filter { - case IncomingHtlc(htlc, preimage_opt) if data.commitments.getIncomingHtlcCrossSigned(htlc.id).isDefined => + case IncomingHtlc(htlc, preimage_opt) if commitments.getIncomingHtlcCrossSigned(htlc.id).isDefined => // this htlc is cross signed in the current commitment, we can settle it preimage_opt match { case Some(preimage) => @@ -290,19 +311,12 @@ object PostRestartHtlcCleaner { case _ => None } - /** - * If we do nothing after a restart, incoming HTLCs that were committed upstream but not relayed will eventually - * expire and we won't lose money, but the channel will get closed, which is a major inconvenience. We want to detect - * this and fast-fail those HTLCs and thus preserve channels. - * - * Outgoing HTLC sets that are still pending may either succeed or fail: we need to watch them to properly forward the - * result upstream to preserve channels. - */ - private def checkBrokenHtlcs(channels: Seq[HasCommitments], paymentsDb: IncomingPaymentsDb, privateKey: PrivateKey)(implicit log: LoggingAdapter): BrokenHtlcs = { + /** @return incoming HTLCs that have been *cross-signed* (that potentially have been relayed). */ + private def getIncomingHtlcs(channels: Seq[HasCommitments], paymentsDb: IncomingPaymentsDb, privateKey: PrivateKey)(implicit log: LoggingAdapter): Seq[IncomingHtlc] = { // We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been relayed). // They signed it first, so the HTLC will first appear in our commitment tx, and later on in their commitment when // we subsequently sign it. That's why we need to look in *their* commitment with direction=OUT. - val htlcsIn = channels + channels .flatMap(_.commitments.remoteCommit.spec.htlcs) .collect(outgoing) .map(IncomingPacket.decrypt(_, privateKey)) @@ -313,12 +327,15 @@ object PostRestartHtlcCleaner { // When we're the final recipient, we want to know if we want to fulfill or fail. case Right(p@IncomingPacket.FinalPacket(add, _)) => IncomingHtlc(add, shouldFulfill(p, paymentsDb)) } + } - def isPendingUpstream(channelId: ByteVector32, htlcId: Long): Boolean = - htlcsIn.exists(htlc => htlc.add.channelId == channelId && htlc.add.id == htlcId) + /** @return whether a given HTLC is a pending incoming HTLC. */ + private def isPendingUpstream(channelId: ByteVector32, htlcId: Long, htlcsIn: Seq[IncomingHtlc]): Boolean = + htlcsIn.exists(htlc => htlc.add.channelId == channelId && htlc.add.id == htlcId) - // We group relayed outgoing HTLCs by their origin. - val relayedOut = channels + /** @return pending outgoing HTLCs, grouped by their upstream origin. */ + private def getHtlcsRelayedOut(channels: Seq[HasCommitments], htlcsIn: Seq[IncomingHtlc])(implicit log: LoggingAdapter): Map[Origin, Set[(ByteVector32, Long)]] = { + channels .flatMap { c => // Filter out HTLCs that will never reach the blockchain or have already been timed-out on-chain. val htlcsToIgnore: Set[Long] = c match { @@ -356,16 +373,10 @@ object PostRestartHtlcCleaner { // instant whereas the uncooperative close of the downstream channel will take time. .filterKeys { case _: Origin.Local => true - case o: Origin.ChannelRelayed => isPendingUpstream(o.originChannelId, o.originHtlcId) - case o: Origin.TrampolineRelayed => o.htlcs.exists { case (channelId, htlcId) => isPendingUpstream(channelId, htlcId) } + case o: Origin.ChannelRelayed => isPendingUpstream(o.originChannelId, o.originHtlcId, htlcsIn) + case o: Origin.TrampolineRelayed => o.htlcs.exists { case (channelId, htlcId) => isPendingUpstream(channelId, htlcId, htlcsIn) } } .toMap - - val notRelayed = htlcsIn.filterNot(htlcIn => relayedOut.keys.exists(origin => matchesOrigin(htlcIn.add, origin))) - log.info(s"htlcsIn=${htlcsIn.length} notRelayed=${notRelayed.length} relayedOut=${relayedOut.values.flatten.size}") - log.info("notRelayed={}", notRelayed.map(htlc => (htlc.add.channelId, htlc.add.id))) - log.info("relayedOut={}", relayedOut) - BrokenHtlcs(notRelayed, relayedOut, Set.empty) } /**