Skip to content

Commit

Permalink
PostRestartHtlcCleaner refactoring
Browse files Browse the repository at this point in the history
Separate computation of incoming HTLCs and HTLCs relayed out, to allow
plugins to inject their own at each step.
  • Loading branch information
t-bast committed Nov 4, 2020
1 parent c556654 commit 9d63b94
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 29 deletions.
56 changes: 56 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/PluginParams.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2020 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair

import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.channel.Origin
import fr.acinq.eclair.payment.relay.PostRestartHtlcCleaner.IncomingHtlc

/** Custom plugin parameters. */
sealed trait PluginParams {
/** Plugin's friendly name. */
def name: String
}

/** Parameters for a plugin that adds support for an experimental or unofficial Bolt9 feature. */
trait CustomFeaturePlugin extends PluginParams {
/** A set of LightningMessage tags that the plugin wants to listen to. */
def messageTags: Set[Int]

/** Feature bit that the plugin wants to advertise through Init message. */
def feature: Feature

/** Plugin feature is always defined as unknown and optional. */
def pluginFeature: UnknownFeature = UnknownFeature(feature.optional)
}

/** Parameters for a plugin that defines custom commitment transactions (or non-standard HTLCs). */
trait CustomCommitmentsPlugin extends PluginParams {
/**
* If we do nothing after a restart, incoming HTLCs that were committed upstream but not relayed will eventually
* expire. If your plugin defines non-standard HTLCs, and they need to be automatically failed, they should be
* returned by this method.
*/
def getIncomingHtlcs: Seq[IncomingHtlc]

/**
* Outgoing HTLC sets that are still pending may either succeed or fail: we need to watch them to properly forward the
* result upstream to preserve channels. If you have non-standard HTLCs that may be in this situation, they should be
* returned by this method.
*/
def getHtlcsRelayedOut(htlcsIn: Seq[IncomingHtlc]): Map[Origin, Set[(ByteVector32, Long)]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortC

case class LocalChannelDown(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey) extends ChannelEvent

case class ChannelStateChanged(channel: ActorRef, peer: ActorRef, remoteNodeId: PublicKey, previousState: State, currentState: State, currentData: Data) extends ChannelEvent
case class ChannelStateChanged(channel: ActorRef, channelId: ByteVector32, peer: ActorRef, remoteNodeId: PublicKey, previousState: State, currentState: State, commitments: AbstractCommitments) extends ChannelEvent

case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) extends ChannelEvent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPacket, PaymentFa
import fr.acinq.eclair.transactions.DirectedHtlc.outgoing
import fr.acinq.eclair.transactions.OutgoingHtlc
import fr.acinq.eclair.wire.{TemporaryNodeFailure, UpdateAddHtlc}
import fr.acinq.eclair.{LongToBtcAmount, NodeParams}
import fr.acinq.eclair.{CustomCommitmentsPlugin, LongToBtcAmount, NodeParams}

import scala.concurrent.Promise
import scala.util.Try
Expand Down Expand Up @@ -58,10 +58,31 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial

context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])

// If we do nothing after a restart, incoming HTLCs that were committed upstream but not relayed will eventually
// expire and we won't lose money, but the channel will get closed, which is a major inconvenience. We want to detect
// this and fast-fail those HTLCs and thus preserve channels.
//
// Outgoing HTLC sets that are still pending may either succeed or fail: we need to watch them to properly forward the
// result upstream to preserve channels.
val brokenHtlcs = {
val channels = listLocalChannels(nodeParams.db.channels)
cleanupRelayDb(channels, nodeParams.db.pendingRelay)
checkBrokenHtlcs(channels, nodeParams.db.payments, nodeParams.privateKey)
val htlcsIn = {
val standardHtlcs = getIncomingHtlcs(channels, nodeParams.db.payments, nodeParams.privateKey)
val nonStandardHtlcs = nodeParams.pluginParams.collect { case p: CustomCommitmentsPlugin => p.getIncomingHtlcs }.flatten
standardHtlcs ++ nonStandardHtlcs
}
val relayedOut = {
// TODO: this part needs unit tests to verify maps are correctly merged
val standardRelayedOut = getHtlcsRelayedOut(channels, htlcsIn)
val nonStandardRelayedOut = nodeParams.pluginParams.collect { case p: CustomCommitmentsPlugin => p.getHtlcsRelayedOut(htlcsIn) }.flatten.toMap
standardRelayedOut ++ nonStandardRelayedOut
}
val notRelayed = htlcsIn.filterNot(htlcIn => relayedOut.keys.exists(origin => matchesOrigin(htlcIn.add, origin)))
log.info(s"htlcsIn=${htlcsIn.length} notRelayed=${notRelayed.length} relayedOut=${relayedOut.values.flatten.size}")
log.info("notRelayed={}", notRelayed.map(htlc => (htlc.add.channelId, htlc.add.id)))
log.info("relayedOut={}", relayedOut)
BrokenHtlcs(notRelayed, relayedOut, Set.empty)
}

Metrics.PendingNotRelayed.update(brokenHtlcs.notRelayed.size)
Expand All @@ -74,12 +95,12 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial

def main(brokenHtlcs: BrokenHtlcs): Receive = {
// When channels are restarted we immediately fail the incoming HTLCs that weren't relayed.
case e@ChannelStateChanged(channel, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING | CLOSING, NORMAL | SHUTDOWN | CLOSING | CLOSED, data: HasCommitments) =>
log.debug("channel {}: {} -> {}", data.channelId, e.previousState, e.currentState)
case e@ChannelStateChanged(channel, channelId, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING | CLOSING, NORMAL | SHUTDOWN | CLOSING | CLOSED, commitments: AbstractCommitments) =>
log.debug("channel {}: {} -> {}", channelId, e.previousState, e.currentState)
val acked = brokenHtlcs.notRelayed
.filter(_.add.channelId == data.channelId) // only consider htlcs coming from this channel
.filter(_.add.channelId == channelId) // only consider htlcs coming from this channel
.filter {
case IncomingHtlc(htlc, preimage_opt) if data.commitments.getIncomingHtlcCrossSigned(htlc.id).isDefined =>
case IncomingHtlc(htlc, preimage_opt) if commitments.getIncomingHtlcCrossSigned(htlc.id).isDefined =>
// this htlc is cross signed in the current commitment, we can settle it
preimage_opt match {
case Some(preimage) =>
Expand Down Expand Up @@ -290,19 +311,12 @@ object PostRestartHtlcCleaner {
case _ => None
}

/**
* If we do nothing after a restart, incoming HTLCs that were committed upstream but not relayed will eventually
* expire and we won't lose money, but the channel will get closed, which is a major inconvenience. We want to detect
* this and fast-fail those HTLCs and thus preserve channels.
*
* Outgoing HTLC sets that are still pending may either succeed or fail: we need to watch them to properly forward the
* result upstream to preserve channels.
*/
private def checkBrokenHtlcs(channels: Seq[HasCommitments], paymentsDb: IncomingPaymentsDb, privateKey: PrivateKey)(implicit log: LoggingAdapter): BrokenHtlcs = {
/** @return incoming HTLCs that have been *cross-signed* (that potentially have been relayed). */
private def getIncomingHtlcs(channels: Seq[HasCommitments], paymentsDb: IncomingPaymentsDb, privateKey: PrivateKey)(implicit log: LoggingAdapter): Seq[IncomingHtlc] = {
// We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been relayed).
// They signed it first, so the HTLC will first appear in our commitment tx, and later on in their commitment when
// we subsequently sign it. That's why we need to look in *their* commitment with direction=OUT.
val htlcsIn = channels
channels
.flatMap(_.commitments.remoteCommit.spec.htlcs)
.collect(outgoing)
.map(IncomingPacket.decrypt(_, privateKey))
Expand All @@ -313,12 +327,15 @@ object PostRestartHtlcCleaner {
// When we're the final recipient, we want to know if we want to fulfill or fail.
case Right(p@IncomingPacket.FinalPacket(add, _)) => IncomingHtlc(add, shouldFulfill(p, paymentsDb))
}
}

def isPendingUpstream(channelId: ByteVector32, htlcId: Long): Boolean =
htlcsIn.exists(htlc => htlc.add.channelId == channelId && htlc.add.id == htlcId)
/** @return whether a given HTLC is a pending incoming HTLC. */
private def isPendingUpstream(channelId: ByteVector32, htlcId: Long, htlcsIn: Seq[IncomingHtlc]): Boolean =
htlcsIn.exists(htlc => htlc.add.channelId == channelId && htlc.add.id == htlcId)

// We group relayed outgoing HTLCs by their origin.
val relayedOut = channels
/** @return pending outgoing HTLCs, grouped by their upstream origin. */
private def getHtlcsRelayedOut(channels: Seq[HasCommitments], htlcsIn: Seq[IncomingHtlc])(implicit log: LoggingAdapter): Map[Origin, Set[(ByteVector32, Long)]] = {
channels
.flatMap { c =>
// Filter out HTLCs that will never reach the blockchain or have already been timed-out on-chain.
val htlcsToIgnore: Set[Long] = c match {
Expand Down Expand Up @@ -356,16 +373,10 @@ object PostRestartHtlcCleaner {
// instant whereas the uncooperative close of the downstream channel will take time.
.filterKeys {
case _: Origin.Local => true
case o: Origin.ChannelRelayed => isPendingUpstream(o.originChannelId, o.originHtlcId)
case o: Origin.TrampolineRelayed => o.htlcs.exists { case (channelId, htlcId) => isPendingUpstream(channelId, htlcId) }
case o: Origin.ChannelRelayed => isPendingUpstream(o.originChannelId, o.originHtlcId, htlcsIn)
case o: Origin.TrampolineRelayed => o.htlcs.exists { case (channelId, htlcId) => isPendingUpstream(channelId, htlcId, htlcsIn) }
}
.toMap

val notRelayed = htlcsIn.filterNot(htlcIn => relayedOut.keys.exists(origin => matchesOrigin(htlcIn.add, origin)))
log.info(s"htlcsIn=${htlcsIn.length} notRelayed=${notRelayed.length} relayedOut=${relayedOut.values.flatten.size}")
log.info("notRelayed={}", notRelayed.map(htlc => (htlc.add.channelId, htlc.add.id)))
log.info("relayedOut={}", relayedOut)
BrokenHtlcs(notRelayed, relayedOut, Set.empty)
}

/**
Expand Down

0 comments on commit 9d63b94

Please sign in to comment.