diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index bda00592d0..e0529fee9e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -91,14 +91,16 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, maxPaymentAttempts: Int, enableTrampolinePayment: Boolean) { val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey + val nodeId: PublicKey = nodeKeyManager.nodeId - val keyPair = KeyPair(nodeId.value, privateKey.value) - val pluginMessageTags: Set[Int] = pluginParams.flatMap(_.tags).toSet + val keyPair: KeyPair = KeyPair(nodeId.value, privateKey.value) + + val pluginMessageTags: Set[Int] = pluginParams.collect { case p: CustomFeaturePlugin => p.messageTags }.toSet.flatten def currentBlockHeight: Long = blockCount.get - def featuresFor(nodeId: PublicKey) = overrideFeatures.getOrElse(nodeId, features) + def featuresFor(nodeId: PublicKey): Features = overrideFeatures.getOrElse(nodeId, features) } object NodeParams extends Logging { @@ -241,23 +243,24 @@ object NodeParams extends Logging { require(features.hasFeature(Features.VariableLengthOnion), s"${Features.VariableLengthOnion.rfcName} must be enabled") } + val pluginMessageParams = pluginParams.collect { case p: CustomFeaturePlugin => p } val features = Features.fromConfiguration(config) validateFeatures(features) - require(pluginParams.forall(_.feature.mandatory > 128), "Plugin mandatory feature bit is too low, must be > 128") - require(pluginParams.forall(_.feature.mandatory % 2 == 0), "Plugin mandatory feature bit is odd, must be even") - require(pluginParams.flatMap(_.tags).forall(_ > 32768), "Plugin messages tags must be > 32768") - val pluginFeatureSet = pluginParams.map(_.feature.mandatory).toSet + require(pluginMessageParams.forall(_.feature.mandatory > 128), "Plugin mandatory feature bit is too low, must be > 128") + require(pluginMessageParams.forall(_.feature.mandatory % 2 == 0), "Plugin mandatory feature bit is odd, must be even") + require(pluginMessageParams.flatMap(_.messageTags).forall(_ > 32768), "Plugin messages tags must be > 32768") + val pluginFeatureSet = pluginMessageParams.map(_.feature.mandatory).toSet require(Features.knownFeatures.map(_.mandatory).intersect(pluginFeatureSet).isEmpty, "Plugin feature bit overlaps with known feature bit") - require(pluginFeatureSet.size == pluginParams.size, "Duplicate plugin feature bits found") + require(pluginFeatureSet.size == pluginMessageParams.size, "Duplicate plugin feature bits found") - val coreAndPluginFeatures = features.copy(unknown = features.unknown ++ pluginParams.map(_.pluginFeature)) + val coreAndPluginFeatures = features.copy(unknown = features.unknown ++ pluginMessageParams.map(_.pluginFeature)) val overrideFeatures: Map[PublicKey, Features] = config.getConfigList("override-features").asScala.map { e => val p = PublicKey(ByteVector.fromValidHex(e.getString("nodeid"))) val f = Features.fromConfiguration(e) validateFeatures(f) - p -> f.copy(unknown = f.unknown ++ pluginParams.map(_.pluginFeature)) + p -> f.copy(unknown = f.unknown ++ pluginMessageParams.map(_.pluginFeature)) }.toMap val syncWhitelist: Set[PublicKey] = config.getStringList("sync-whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet @@ -382,13 +385,3 @@ object NodeParams extends Logging { ) } } - -/** - * @param tags a set of LightningMessage tags that plugin is interested in - * @param feature a Feature bit that plugin advertises through Init message - */ -case class PluginParams(tags: Set[Int], feature: Feature) { - def pluginFeature: UnknownFeature = UnknownFeature(feature.optional) - - override def toString: String = s"Messaging enabled plugin=${feature.rfcName} with feature bit=${feature.optional} and LN message tags=${tags.mkString(",")}" -} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/PluginParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/PluginParams.scala new file mode 100644 index 0000000000..e7b4e4ce1c --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/PluginParams.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair + +import fr.acinq.bitcoin.ByteVector32 +import fr.acinq.eclair.channel.Origin +import fr.acinq.eclair.payment.relay.PostRestartHtlcCleaner.IncomingHtlc + +/** Custom plugin parameters. */ +sealed trait PluginParams { + /** Plugin's friendly name. */ + def name: String +} + +/** Parameters for a plugin that adds support for an experimental or unofficial Bolt9 feature. */ +trait CustomFeaturePlugin extends PluginParams { + /** A set of LightningMessage tags that the plugin wants to listen to. */ + def messageTags: Set[Int] + + /** Feature bit that the plugin wants to advertise through Init message. */ + def feature: Feature + + /** Plugin feature is always defined as unknown and optional. */ + def pluginFeature: UnknownFeature = UnknownFeature(feature.optional) +} + +/** Parameters for a plugin that defines custom commitment transactions (or non-standard HTLCs). */ +trait CustomCommitmentsPlugin extends PluginParams { + /** + * If we do nothing after a restart, incoming HTLCs that were committed upstream but not relayed will eventually + * expire. If your plugin defines non-standard HTLCs, and they need to be automatically failed, they should be + * returned by this method. + */ + def getIncomingHtlcs: Seq[IncomingHtlc] + + /** + * Outgoing HTLC sets that are still pending may either succeed or fail: we need to watch them to properly forward the + * result upstream to preserve channels. If you have non-standard HTLCs that may be in this situation, they should be + * returned by this method. + */ + def getHtlcsRelayedOut(htlcsIn: Seq[IncomingHtlc]): Map[Origin, Set[(ByteVector32, Long)]] +} \ No newline at end of file diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 1b7d73b9c2..169e343084 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -66,9 +66,10 @@ import scala.util.{Failure, Success} * * Created by PM on 25/01/2016. * - * @param datadir directory where eclair-core will write/read its data. - * @param seeds_opt optional seeds, if set eclair will use them instead of generating them and won't create a node_seed.dat and channel_seed.dat files. - * @param db optional databases to use, if not set eclair will create the necessary databases + * @param datadir directory where eclair-core will write/read its data. + * @param pluginParams parameters for all configured plugins. + * @param seeds_opt optional seeds, if set eclair will use them instead of generating them and won't create a node_seed.dat and channel_seed.dat files. + * @param db optional databases to use, if not set eclair will create the necessary databases */ class Setup(datadir: File, pluginParams: Seq[PluginParams], @@ -127,7 +128,7 @@ class Setup(datadir: File, } val nodeParams = NodeParams.makeNodeParams(config, instanceId, nodeKeyManager, channelKeyManager, initTor(), databases, blockCount, feeEstimator, pluginParams) - pluginParams.foreach(param => logger.info(param.toString)) + pluginParams.foreach(param => logger.info(s"Using plugin: ${param.name}")) val serverBindingAddress = new InetSocketAddress( config.getString("server.binding-ip"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala index 0e7f07b718..fce0d4075d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala @@ -279,9 +279,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId } case Event(c: CloseCommand, d) => - val channelId = Helpers.getChannelId(d) - channelOpenReplyToUser(Right(ChannelOpenResponse.ChannelClosed(channelId))) - handleFastClose(c, channelId) + channelOpenReplyToUser(Right(ChannelOpenResponse.ChannelClosed(d.channelId))) + handleFastClose(c, d.channelId) case Event(TickChannelOpenTimeout, _) => channelOpenReplyToUser(Left(LocalError(new RuntimeException("open channel cancelled, took too long")))) @@ -334,9 +333,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId goto(WAIT_FOR_FUNDING_CREATED) using DATA_WAIT_FOR_FUNDING_CREATED(open.temporaryChannelId, localParams, remoteParams, open.fundingSatoshis, open.pushMsat, open.feeratePerKw, open.firstPerCommitmentPoint, open.channelFlags, channelVersion, accept) sending accept } - case Event(c: CloseCommand, d) => - val channelId = Helpers.getChannelId(d) - handleFastClose(c, channelId) + case Event(c: CloseCommand, d) => handleFastClose(c, d.channelId) case Event(e: Error, d: DATA_WAIT_FOR_OPEN_CHANNEL) => handleRemoteError(e, d) @@ -1666,8 +1663,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case Event(c: CMD_GETINFO, _) => val replyTo = if (c.replyTo == ActorRef.noSender) sender else c.replyTo - val channelId = Helpers.getChannelId(stateData) - replyTo ! RES_GETINFO(remoteNodeId, channelId, stateName, stateData) + replyTo ! RES_GETINFO(remoteNodeId, stateData.channelId, stateName, stateData) stay case Event(c: CMD_ADD_HTLC, d: HasCommitments) => @@ -1675,7 +1671,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId val error = ChannelUnavailable(d.channelId) handleAddHtlcCommandError(c, error, None) // we don't provide a channel_update: this will be a permanent channel failure - case Event(c: CMD_CLOSE, d) => handleCommandError(CommandUnavailableInThisState(Helpers.getChannelId(d), "close", stateName), c) + case Event(c: CMD_CLOSE, d) => handleCommandError(CommandUnavailableInThisState(d.channelId, "close", stateName), c) case Event(c: CMD_FORCECLOSE, d) => d match { @@ -1683,10 +1679,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId val replyTo = if (c.replyTo == ActorRef.noSender) sender else c.replyTo replyTo ! RES_SUCCESS(c, data.channelId) handleLocalError(ForcedLocalCommit(data.channelId), data, Some(c)) - case _ => handleCommandError(CommandUnavailableInThisState(Helpers.getChannelId(d), "forceclose", stateName), c) + case _ => handleCommandError(CommandUnavailableInThisState(d.channelId, "forceclose", stateName), c) } - case Event(c: CMD_UPDATE_RELAY_FEE, d) => handleCommandError(CommandUnavailableInThisState(Helpers.getChannelId(d), "updaterelayfee", stateName), c) + case Event(c: CMD_UPDATE_RELAY_FEE, d) => handleCommandError(CommandUnavailableInThisState(d.channelId, "updaterelayfee", stateName), c) // we only care about this event in NORMAL and SHUTDOWN state, and there may be cases where the task is not cancelled case Event(_: RevocationTimeout, _) => stay @@ -1718,8 +1714,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case WAIT_FOR_INIT_INTERNAL -> WAIT_FOR_INIT_INTERNAL => () // called at channel initialization case state -> nextState => if (state != nextState) { - context.system.eventStream.publish(ChannelStateChanged(self, peer, remoteNodeId, state, nextState, nextStateData)) + val commitments_opt = nextStateData match { + case hasCommitments: HasCommitments => Some(hasCommitments.commitments) + case _ => None + } + context.system.eventStream.publish(ChannelStateChanged(self, nextStateData.channelId, peer, remoteNodeId, state, nextState, commitments_opt)) } + if (nextState == CLOSED) { // channel is closed, scheduling this actor for self destruction context.system.scheduler.scheduleOnce(10 seconds, self, Symbol("shutdown")) @@ -1876,8 +1877,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case hasReplyTo: HasReplyToCommand => if (hasReplyTo.replyTo == ActorRef.noSender) Some(sender) else Some(hasReplyTo.replyTo) } replyTo_opt.foreach { replyTo => - val channelId = Helpers.getChannelId(newData) - replyTo ! RES_SUCCESS(c, channelId) + replyTo ! RES_SUCCESS(c, newData.channelId) } stay using newData } @@ -1886,7 +1886,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId log.warning(s"${cause.getMessage} while processing cmd=${c.getClass.getSimpleName} in state=$stateName") val replyTo = if (c.replyTo == ActorRef.noSender) sender else c.replyTo replyTo ! RES_ADD_FAILED(c, cause, channelUpdate) - context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(cause), isFatal = false)) + context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(cause), isFatal = false)) stay } @@ -1903,7 +1903,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId replyTo_opt.foreach { replyTo => replyTo ! RES_FAILURE(c, cause) } - context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(cause), isFatal = false)) + context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(cause), isFatal = false)) stay } @@ -1956,7 +1956,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId val error = Error(d.channelId, exc.getMessage) // NB: we don't use the handleLocalError handler because it would result in the commit tx being published, which we don't want: // implementation *guarantees* that in case of BITCOIN_FUNDING_PUBLISH_FAILED, the funding tx hasn't and will never be published, so we can close the channel right away - context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(exc), isFatal = true)) + context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(exc), isFatal = true)) goto(CLOSED) sending error } @@ -1964,7 +1964,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId log.warning(s"funding tx hasn't been confirmed in time, cancelling channel delay=$FUNDING_TIMEOUT_FUNDEE") val exc = FundingTxTimedout(d.channelId) val error = Error(d.channelId, exc.getMessage) - context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(exc), isFatal = true)) + context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(exc), isFatal = true)) goto(CLOSED) sending error } @@ -2035,8 +2035,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId case _: ChannelException => () case _ => log.error(cause, s"msg=${msg.getOrElse("n/a")} stateData=$stateData ") } - val error = Error(Helpers.getChannelId(d), cause.getMessage) - context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(cause), isFatal = true)) + val error = Error(d.channelId, cause.getMessage) + context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(cause), isFatal = true)) d match { case dd: HasCommitments if Closing.nothingAtStake(dd) => goto(CLOSED) @@ -2052,7 +2052,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId def handleRemoteError(e: Error, d: Data) = { // see BOLT 1: only print out data verbatim if is composed of printable ASCII characters log.error(s"peer sent error: ascii='${e.toAscii}' bin=${e.data.toHex}") - context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, RemoteError(e), isFatal = true)) + context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, RemoteError(e), isFatal = true)) d match { case _: DATA_CLOSING => stay // nothing to do, there is already a spending tx published @@ -2402,7 +2402,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId val category_opt = LogCategory(currentMessage) val id = currentMessage match { case INPUT_RESTORED(data) => data.channelId - case _ => Helpers.getChannelId(stateData) + case _ => stateData.channelId } Logs.mdc(category_opt, remoteNodeId_opt = Some(remoteNodeId), channelId_opt = Some(id)) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala index b62cf8bf4f..548601e695 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala @@ -42,7 +42,7 @@ case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortC case class LocalChannelDown(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey) extends ChannelEvent -case class ChannelStateChanged(channel: ActorRef, peer: ActorRef, remoteNodeId: PublicKey, previousState: State, currentState: State, currentData: Data) extends ChannelEvent +case class ChannelStateChanged(channel: ActorRef, channelId: ByteVector32, peer: ActorRef, remoteNodeId: PublicKey, previousState: State, currentState: State, commitments_opt: Option[AbstractCommitments]) extends ChannelEvent case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) extends ChannelEvent diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala index 3c7513cd95..b73c14ca00 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelTypes.scala @@ -244,13 +244,17 @@ object ChannelOpenResponse { 8888888P" d88P 888 888 d88P 888 */ -sealed trait Data +sealed trait Data { + def channelId: ByteVector32 +} -case object Nothing extends Data +case object Nothing extends Data { + val channelId: ByteVector32 = ByteVector32.Zeroes +} sealed trait HasCommitments extends Data { + val channelId: ByteVector32 = commitments.channelId def commitments: Commitments - def channelId: ByteVector32 = commitments.channelId } case class ClosingTxProposed(unsignedTx: Transaction, localClosingSigned: ClosingSigned) @@ -259,10 +263,18 @@ case class LocalCommitPublished(commitTx: Transaction, claimMainDelayedOutputTx: case class RemoteCommitPublished(commitTx: Transaction, claimMainOutputTx: Option[Transaction], claimHtlcSuccessTxs: List[Transaction], claimHtlcTimeoutTxs: List[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]) case class RevokedCommitPublished(commitTx: Transaction, claimMainOutputTx: Option[Transaction], mainPenaltyTx: Option[Transaction], htlcPenaltyTxs: List[Transaction], claimHtlcDelayedPenaltyTxs: List[Transaction], irrevocablySpent: Map[OutPoint, ByteVector32]) -final case class DATA_WAIT_FOR_OPEN_CHANNEL(initFundee: INPUT_INIT_FUNDEE) extends Data -final case class DATA_WAIT_FOR_ACCEPT_CHANNEL(initFunder: INPUT_INIT_FUNDER, lastSent: OpenChannel) extends Data -final case class DATA_WAIT_FOR_FUNDING_INTERNAL(temporaryChannelId: ByteVector32, localParams: LocalParams, remoteParams: RemoteParams, fundingAmount: Satoshi, pushAmount: MilliSatoshi, initialFeeratePerKw: FeeratePerKw, remoteFirstPerCommitmentPoint: PublicKey, channelVersion: ChannelVersion, lastSent: OpenChannel) extends Data -final case class DATA_WAIT_FOR_FUNDING_CREATED(temporaryChannelId: ByteVector32, localParams: LocalParams, remoteParams: RemoteParams, fundingAmount: Satoshi, pushAmount: MilliSatoshi, initialFeeratePerKw: FeeratePerKw, remoteFirstPerCommitmentPoint: PublicKey, channelFlags: Byte, channelVersion: ChannelVersion, lastSent: AcceptChannel) extends Data +final case class DATA_WAIT_FOR_OPEN_CHANNEL(initFundee: INPUT_INIT_FUNDEE) extends Data { + val channelId: ByteVector32 = initFundee.temporaryChannelId +} +final case class DATA_WAIT_FOR_ACCEPT_CHANNEL(initFunder: INPUT_INIT_FUNDER, lastSent: OpenChannel) extends Data { + val channelId: ByteVector32 = initFunder.temporaryChannelId +} +final case class DATA_WAIT_FOR_FUNDING_INTERNAL(temporaryChannelId: ByteVector32, localParams: LocalParams, remoteParams: RemoteParams, fundingAmount: Satoshi, pushAmount: MilliSatoshi, initialFeeratePerKw: FeeratePerKw, remoteFirstPerCommitmentPoint: PublicKey, channelVersion: ChannelVersion, lastSent: OpenChannel) extends Data { + val channelId: ByteVector32 = temporaryChannelId +} +final case class DATA_WAIT_FOR_FUNDING_CREATED(temporaryChannelId: ByteVector32, localParams: LocalParams, remoteParams: RemoteParams, fundingAmount: Satoshi, pushAmount: MilliSatoshi, initialFeeratePerKw: FeeratePerKw, remoteFirstPerCommitmentPoint: PublicKey, channelFlags: Byte, channelVersion: ChannelVersion, lastSent: AcceptChannel) extends Data { + val channelId: ByteVector32 = temporaryChannelId +} final case class DATA_WAIT_FOR_FUNDING_SIGNED(channelId: ByteVector32, localParams: LocalParams, remoteParams: RemoteParams, fundingTx: Transaction, fundingTxFee: Satoshi, localSpec: CommitmentSpec, localCommitTx: CommitTx, remoteCommit: RemoteCommit, channelFlags: Byte, channelVersion: ChannelVersion, lastSent: FundingCreated) extends Data final case class DATA_WAIT_FOR_FUNDING_CONFIRMED(commitments: Commitments, fundingTx: Option[Transaction], diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala index 2c2b60c56f..968864411e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala @@ -32,7 +32,7 @@ import fr.acinq.eclair.transactions.Scripts._ import fr.acinq.eclair.transactions.Transactions._ import fr.acinq.eclair.transactions._ import fr.acinq.eclair.wire._ -import fr.acinq.eclair.{NodeParams, ShortChannelId, addressToPublicKeyScript, _} +import fr.acinq.eclair._ import scodec.bits.ByteVector import scala.concurrent.Await @@ -44,22 +44,6 @@ import scala.util.{Failure, Success, Try} */ object Helpers { - - /** - * Depending on the state, returns the current temporaryChannelId or channelId - * - * @return the long identifier of the channel - */ - def getChannelId(stateData: Data): ByteVector32 = stateData match { - case Nothing => ByteVector32.Zeroes - case d: DATA_WAIT_FOR_OPEN_CHANNEL => d.initFundee.temporaryChannelId - case d: DATA_WAIT_FOR_ACCEPT_CHANNEL => d.initFunder.temporaryChannelId - case d: DATA_WAIT_FOR_FUNDING_INTERNAL => d.temporaryChannelId - case d: DATA_WAIT_FOR_FUNDING_CREATED => d.temporaryChannelId - case d: DATA_WAIT_FOR_FUNDING_SIGNED => d.channelId - case d: HasCommitments => d.channelId - } - /** * We update local/global features at reconnection */ diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/Auditor.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/Auditor.scala index b7c52da9bb..c325169a91 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/Auditor.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/Auditor.scala @@ -80,11 +80,11 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging { case e: ChannelStateChanged => // NB: order matters! e match { - case ChannelStateChanged(_, _, remoteNodeId, WAIT_FOR_FUNDING_LOCKED, NORMAL, d: DATA_NORMAL) => + case ChannelStateChanged(_, channelId, _, remoteNodeId, WAIT_FOR_FUNDING_LOCKED, NORMAL, Some(commitments: Commitments)) => ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Created).increment() - db.add(ChannelLifecycleEvent(d.channelId, remoteNodeId, d.commitments.commitInput.txOut.amount, d.commitments.localParams.isFunder, !d.commitments.announceChannel, "created")) - case ChannelStateChanged(_, _, _, WAIT_FOR_INIT_INTERNAL, _, _) => - case ChannelStateChanged(_, _, _, _, CLOSING, _) => + db.add(ChannelLifecycleEvent(channelId, remoteNodeId, commitments.capacity, commitments.localParams.isFunder, !commitments.announceChannel, "created")) + case ChannelStateChanged(_, _, _, _, WAIT_FOR_INIT_INTERNAL, _, _) => + case ChannelStateChanged(_, _, _, _, _, CLOSING, _) => ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Closing).increment() case _ => () } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala index ebc9b75c24..af514192de 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/relay/PostRestartHtlcCleaner.scala @@ -27,9 +27,8 @@ import fr.acinq.eclair.db._ import fr.acinq.eclair.payment.Monitoring.Tags import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPacket, PaymentFailed, PaymentSent} import fr.acinq.eclair.transactions.DirectedHtlc.outgoing -import fr.acinq.eclair.transactions.OutgoingHtlc -import fr.acinq.eclair.wire.{TemporaryNodeFailure, UpdateAddHtlc} -import fr.acinq.eclair.{LongToBtcAmount, NodeParams} +import fr.acinq.eclair.wire.{FailureMessage, TemporaryNodeFailure, UpdateAddHtlc} +import fr.acinq.eclair.{CustomCommitmentsPlugin, LongToBtcAmount, NodeParams} import scala.concurrent.Promise import scala.util.Try @@ -58,10 +57,26 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial context.system.eventStream.subscribe(self, classOf[ChannelStateChanged]) - val brokenHtlcs = { + // If we do nothing after a restart, incoming HTLCs that were committed upstream but not relayed will eventually + // expire and we won't lose money, but the channel will get closed, which is a major inconvenience. We want to detect + // this and fast-fail those HTLCs and thus preserve channels. + // + // Outgoing HTLC sets that are still pending may either succeed or fail: we need to watch them to properly forward the + // result upstream to preserve channels. + val brokenHtlcs: BrokenHtlcs = { val channels = listLocalChannels(nodeParams.db.channels) - cleanupRelayDb(channels, nodeParams.db.pendingRelay) - checkBrokenHtlcs(channels, nodeParams.db.payments, nodeParams.privateKey) + val nonStandardIncomingHtlcs: Seq[IncomingHtlc] = nodeParams.pluginParams.collect { case p: CustomCommitmentsPlugin => p.getIncomingHtlcs }.flatten + val htlcsIn: Seq[IncomingHtlc] = getIncomingHtlcs(channels, nodeParams.db.payments, nodeParams.privateKey) ++ nonStandardIncomingHtlcs + val nonStandardRelayedOutHtlcs: Map[Origin, Set[(ByteVector32, Long)]] = nodeParams.pluginParams.collect { case p: CustomCommitmentsPlugin => p.getHtlcsRelayedOut(htlcsIn) }.flatten.toMap + val relayedOut: Map[Origin, Set[(ByteVector32, Long)]] = getHtlcsRelayedOut(channels, htlcsIn) ++ nonStandardRelayedOutHtlcs + + val notRelayed = htlcsIn.filterNot(htlcIn => relayedOut.keys.exists(origin => matchesOrigin(htlcIn.add, origin))) + cleanupRelayDb(htlcsIn, nodeParams.db.pendingRelay) + + log.info(s"htlcsIn=${htlcsIn.length} notRelayed=${notRelayed.length} relayedOut=${relayedOut.values.flatten.size}") + log.info("notRelayed={}", notRelayed.map(htlc => (htlc.add.channelId, htlc.add.id))) + log.info("relayedOut={}", relayedOut) + BrokenHtlcs(notRelayed, relayedOut, Set.empty) } Metrics.PendingNotRelayed.update(brokenHtlcs.notRelayed.size) @@ -74,12 +89,12 @@ class PostRestartHtlcCleaner(nodeParams: NodeParams, register: ActorRef, initial def main(brokenHtlcs: BrokenHtlcs): Receive = { // When channels are restarted we immediately fail the incoming HTLCs that weren't relayed. - case e@ChannelStateChanged(channel, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING | CLOSING, NORMAL | SHUTDOWN | CLOSING | CLOSED, data: HasCommitments) => - log.debug("channel {}: {} -> {}", data.channelId, e.previousState, e.currentState) + case e@ChannelStateChanged(channel, channelId, _, _, WAIT_FOR_INIT_INTERNAL | OFFLINE | SYNCING | CLOSING, NORMAL | SHUTDOWN | CLOSING | CLOSED, Some(commitments: AbstractCommitments)) => + log.debug("channel {}: {} -> {}", channelId, e.previousState, e.currentState) val acked = brokenHtlcs.notRelayed - .filter(_.add.channelId == data.channelId) // only consider htlcs coming from this channel + .filter(_.add.channelId == channelId) // only consider htlcs coming from this channel .filter { - case IncomingHtlc(htlc, preimage_opt) if data.commitments.getIncomingHtlcCrossSigned(htlc.id).isDefined => + case IncomingHtlc(htlc, preimage_opt) if commitments.getIncomingHtlcCrossSigned(htlc.id).isDefined => // this htlc is cross signed in the current commitment, we can settle it preimage_opt match { case Some(preimage) => @@ -290,35 +305,33 @@ object PostRestartHtlcCleaner { case _ => None } - /** - * If we do nothing after a restart, incoming HTLCs that were committed upstream but not relayed will eventually - * expire and we won't lose money, but the channel will get closed, which is a major inconvenience. We want to detect - * this and fast-fail those HTLCs and thus preserve channels. - * - * Outgoing HTLC sets that are still pending may either succeed or fail: we need to watch them to properly forward the - * result upstream to preserve channels. - */ - private def checkBrokenHtlcs(channels: Seq[HasCommitments], paymentsDb: IncomingPaymentsDb, privateKey: PrivateKey)(implicit log: LoggingAdapter): BrokenHtlcs = { + def decryptedIncomingHtlcs(paymentsDb: IncomingPaymentsDb): PartialFunction[Either[FailureMessage, IncomingPacket], IncomingHtlc] = { + // When we're not the final recipient, we'll only consider HTLCs that aren't relayed downstream, so no need to look for a preimage. + case Right(IncomingPacket.ChannelRelayPacket(add, _, _)) => IncomingHtlc(add, None) + case Right(IncomingPacket.NodeRelayPacket(add, _, _, _)) => IncomingHtlc(add, None) + // When we're the final recipient, we want to know if we want to fulfill or fail. + case Right(p@IncomingPacket.FinalPacket(add, _)) => IncomingHtlc(add, shouldFulfill(p, paymentsDb)) + } + + /** @return incoming HTLCs that have been *cross-signed* (that potentially have been relayed). */ + private def getIncomingHtlcs(channels: Seq[HasCommitments], paymentsDb: IncomingPaymentsDb, privateKey: PrivateKey)(implicit log: LoggingAdapter): Seq[IncomingHtlc] = { // We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been relayed). // They signed it first, so the HTLC will first appear in our commitment tx, and later on in their commitment when // we subsequently sign it. That's why we need to look in *their* commitment with direction=OUT. - val htlcsIn = channels + channels .flatMap(_.commitments.remoteCommit.spec.htlcs) .collect(outgoing) .map(IncomingPacket.decrypt(_, privateKey)) - .collect { - // When we're not the final recipient, we'll only consider HTLCs that aren't relayed downstream, so no need to look for a preimage. - case Right(IncomingPacket.ChannelRelayPacket(add, _, _)) => IncomingHtlc(add, None) - case Right(IncomingPacket.NodeRelayPacket(add, _, _, _)) => IncomingHtlc(add, None) - // When we're the final recipient, we want to know if we want to fulfill or fail. - case Right(p@IncomingPacket.FinalPacket(add, _)) => IncomingHtlc(add, shouldFulfill(p, paymentsDb)) - } + .collect(decryptedIncomingHtlcs(paymentsDb)) + } - def isPendingUpstream(channelId: ByteVector32, htlcId: Long): Boolean = - htlcsIn.exists(htlc => htlc.add.channelId == channelId && htlc.add.id == htlcId) + /** @return whether a given HTLC is a pending incoming HTLC. */ + private def isPendingUpstream(channelId: ByteVector32, htlcId: Long, htlcsIn: Seq[IncomingHtlc]): Boolean = + htlcsIn.exists(htlc => htlc.add.channelId == channelId && htlc.add.id == htlcId) - // We group relayed outgoing HTLCs by their origin. - val relayedOut = channels + /** @return pending outgoing HTLCs, grouped by their upstream origin. */ + private def getHtlcsRelayedOut(channels: Seq[HasCommitments], htlcsIn: Seq[IncomingHtlc])(implicit log: LoggingAdapter): Map[Origin, Set[(ByteVector32, Long)]] = { + channels .flatMap { c => // Filter out HTLCs that will never reach the blockchain or have already been timed-out on-chain. val htlcsToIgnore: Set[Long] = c match { @@ -356,16 +369,10 @@ object PostRestartHtlcCleaner { // instant whereas the uncooperative close of the downstream channel will take time. .filterKeys { case _: Origin.Local => true - case o: Origin.ChannelRelayed => isPendingUpstream(o.originChannelId, o.originHtlcId) - case o: Origin.TrampolineRelayed => o.htlcs.exists { case (channelId, htlcId) => isPendingUpstream(channelId, htlcId) } + case o: Origin.ChannelRelayed => isPendingUpstream(o.originChannelId, o.originHtlcId, htlcsIn) + case o: Origin.TrampolineRelayed => o.htlcs.exists { case (channelId, htlcId) => isPendingUpstream(channelId, htlcId, htlcsIn) } } .toMap - - val notRelayed = htlcsIn.filterNot(htlcIn => relayedOut.keys.exists(origin => matchesOrigin(htlcIn.add, origin))) - log.info(s"htlcsIn=${htlcsIn.length} notRelayed=${notRelayed.length} relayedOut=${relayedOut.values.flatten.size}") - log.info("notRelayed={}", notRelayed.map(htlc => (htlc.add.channelId, htlc.add.id))) - log.info("relayedOut={}", relayedOut) - BrokenHtlcs(notRelayed, relayedOut, Set.empty) } /** @@ -387,16 +394,11 @@ object PostRestartHtlcCleaner { * * That's why we need to periodically clean up the pending relay db. */ - private def cleanupRelayDb(channels: Seq[HasCommitments], relayDb: PendingRelayDb)(implicit log: LoggingAdapter): Unit = { + private def cleanupRelayDb(htlcsIn: Seq[IncomingHtlc], relayDb: PendingRelayDb)(implicit log: LoggingAdapter): Unit = { // We are interested in incoming HTLCs, that have been *cross-signed* (otherwise they wouldn't have been relayed). // If the HTLC is not in their commitment, it means that we have already fulfilled/failed it and that we can remove // the command from the pending relay db. - val channel2Htlc: Set[(ByteVector32, Long)] = - channels - .flatMap(_.commitments.remoteCommit.spec.htlcs) - .collect { case OutgoingHtlc(add) => (add.channelId, add.id) } - .toSet - + val channel2Htlc: Seq[(ByteVector32, Long)] = htlcsIn.map { case IncomingHtlc(add, _) => (add.channelId, add.id) } val pendingRelay: Set[(ByteVector32, Long)] = relayDb.listPendingRelay() val toClean = pendingRelay -- channel2Htlc toClean.foreach { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 74f3abf65f..1609abecff 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -131,7 +131,11 @@ object TestConstants { val mandatory = 50000 } - val pluginParams: PluginParams = PluginParams(tags = Set(60003), TestFeature) + val pluginParams: CustomFeaturePlugin = new CustomFeaturePlugin { + def messageTags: Set[Int] = Set(60003) + def feature: Feature = TestFeature + def name: String = "plugin for testing" + } object Alice { val seed = ByteVector32(ByteVector.fill(32)(1)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/HelpersSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/HelpersSpec.scala index 3a7f1ab735..7f99f7a5b8 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/HelpersSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/HelpersSpec.scala @@ -59,7 +59,7 @@ class HelpersSpec extends AnyFunSuite { // only mutual close assert(Closing.isClosingTypeAlreadyKnown( DATA_CLOSING( - commitments = null, + commitments = commitments, fundingTx = None, waitingSince = 0, mutualCloseProposed = tx1 :: tx2 :: tx3 :: Nil, @@ -74,7 +74,7 @@ class HelpersSpec extends AnyFunSuite { // mutual + local close, but local commit tx isn't confirmed assert(Closing.isClosingTypeAlreadyKnown( DATA_CLOSING( - commitments = null, + commitments = commitments, fundingTx = None, waitingSince = 0, mutualCloseProposed = tx1 :: Nil, @@ -120,7 +120,7 @@ class HelpersSpec extends AnyFunSuite { // local close + remote close, none is confirmed assert(Closing.isClosingTypeAlreadyKnown( DATA_CLOSING( - commitments = null, + commitments = commitments, fundingTx = None, waitingSince = 0, mutualCloseProposed = Nil, @@ -214,7 +214,7 @@ class HelpersSpec extends AnyFunSuite { // future remote close, not confirmed assert(Closing.isClosingTypeAlreadyKnown( DATA_CLOSING( - commitments = null, + commitments = commitments, fundingTx = None, waitingSince = 0, mutualCloseProposed = Nil, @@ -235,7 +235,7 @@ class HelpersSpec extends AnyFunSuite { // future remote close, confirmed assert(Closing.isClosingTypeAlreadyKnown( DATA_CLOSING( - commitments = null, + commitments = commitments, fundingTx = None, waitingSince = 0, mutualCloseProposed = Nil, @@ -258,7 +258,7 @@ class HelpersSpec extends AnyFunSuite { // local close + revoked close, none confirmed assert(Closing.isClosingTypeAlreadyKnown( DATA_CLOSING( - commitments = null, + commitments = commitments, fundingTx = None, waitingSince = 0, mutualCloseProposed = Nil, @@ -305,7 +305,7 @@ class HelpersSpec extends AnyFunSuite { // local close + revoked close, one revoked confirmed assert(Closing.isClosingTypeAlreadyKnown( DATA_CLOSING( - commitments = null, + commitments = commitments, fundingTx = None, waitingSince = 0, mutualCloseProposed = Nil, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala index d1340d84e5..066d649889 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala @@ -77,7 +77,7 @@ class ThroughputSpec extends AnyFunSuite { val latch = new CountDownLatch(2) val listener = system.actorOf(Props(new Actor { override def receive: Receive = { - case ChannelStateChanged(_, _, _, _, NORMAL, _) => latch.countDown() + case ChannelStateChanged(_, _, _, _, _, NORMAL, _) => latch.countDown() } }), "listener") system.eventStream.subscribe(listener, classOf[ChannelEvent]) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala index d1e18772bd..ef559b7d51 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/StateTestsHelperMethods.scala @@ -288,7 +288,7 @@ trait StateTestsHelperMethods extends TestKitBase with FixtureTestSuite with Par getRemoteCommitPublished(s.stateData.asInstanceOf[DATA_CLOSING]).get } - def channelId(a: TestFSMRef[State, Data, Channel]): ByteVector32 = Helpers.getChannelId(a.stateData) + def channelId(a: TestFSMRef[State, Data, Channel]): ByteVector32 = a.stateData.channelId // @formatter:off implicit class ChannelWithTestFeeConf(a: TestFSMRef[State, Data, Channel]) { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala index 3af3579814..03da0fee11 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerSpec.scala @@ -169,7 +169,7 @@ class PeerSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with StateTe system.eventStream.subscribe(listener.ref, classOf[UnknownMessageReceived]) connect(remoteNodeId, peer, peerConnection, channels = Set(ChannelCodecsSpec.normal)) - peerConnection.send(peer, UnknownMessage(tag = TestConstants.pluginParams.tags.head, data = ByteVector.empty)) + peerConnection.send(peer, UnknownMessage(tag = TestConstants.pluginParams.messageTags.head, data = ByteVector.empty)) listener.expectMsgType[UnknownMessageReceived] peerConnection.send(peer, UnknownMessage(tag = 60005, data = ByteVector.empty)) // No plugin is subscribed to this tag listener.expectNoMessage() diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala index 15b0186c49..5fb5ce2a48 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PostRestartHtlcCleanerSpec.scala @@ -21,7 +21,8 @@ import java.util.UUID import akka.Done import akka.actor.ActorRef import akka.testkit.TestProbe -import fr.acinq.bitcoin.{Block, ByteVector32, Crypto} +import fr.acinq.bitcoin.Crypto.PublicKey +import fr.acinq.bitcoin.{Block, ByteVector32, Crypto, Satoshi} import fr.acinq.eclair.blockchain.WatchEventConfirmed import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.states.StateTestsHelperMethods @@ -33,7 +34,7 @@ import fr.acinq.eclair.router.Router.ChannelHop import fr.acinq.eclair.transactions.{DirectedHtlc, IncomingHtlc, OutgoingHtlc} import fr.acinq.eclair.wire.Onion.FinalLegacyPayload import fr.acinq.eclair.wire._ -import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, LongToBtcAmount, NodeParams, TestConstants, TestKitBaseClass, randomBytes32} +import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, CustomCommitmentsPlugin, LongToBtcAmount, MilliSatoshi, NodeParams, TestConstants, TestKitBaseClass, randomBytes32} import org.scalatest.Outcome import org.scalatest.funsuite.FixtureAnyFunSuiteLike import scodec.bits.ByteVector @@ -50,8 +51,8 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit import PostRestartHtlcCleanerSpec._ case class FixtureParam(nodeParams: NodeParams, register: TestProbe, sender: TestProbe, eventListener: TestProbe) { - def createRelayer(): ActorRef = { - system.actorOf(Relayer.props(nodeParams, TestProbe().ref, register.ref, TestProbe().ref)) + def createRelayer(nodeParams1: NodeParams): ActorRef = { + system.actorOf(Relayer.props(nodeParams1, TestProbe().ref, register.ref, TestProbe().ref)) } } @@ -109,34 +110,34 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit channels.foreach(c => nodeParams.db.channels.addOrUpdateChannel(c)) val channel = TestProbe() - f.createRelayer() + f.createRelayer(nodeParams) register.expectNoMsg(100 millis) // nothing should happen while channels are still offline. // channel 1 goes to NORMAL state: - system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, channels.head)) + system.eventStream.publish(ChannelStateChanged(channel.ref, channels.head.commitments.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(channels.head.commitments))) val fails_ab_1 = channel.expectMsgType[CMD_FAIL_HTLC] :: channel.expectMsgType[CMD_FAIL_HTLC] :: Nil assert(fails_ab_1.toSet === Set(CMD_FAIL_HTLC(1, Right(TemporaryNodeFailure), commit = true), CMD_FAIL_HTLC(4, Right(TemporaryNodeFailure), commit = true))) channel.expectNoMsg(100 millis) // channel 2 goes to NORMAL state: - system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, channels(1))) + system.eventStream.publish(ChannelStateChanged(channel.ref, channels(1).commitments.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(channels(1).commitments))) val fails_ab_2 = channel.expectMsgType[CMD_FAIL_HTLC] :: channel.expectMsgType[CMD_FAIL_HTLC] :: Nil assert(fails_ab_2.toSet === Set(CMD_FAIL_HTLC(0, Right(TemporaryNodeFailure), commit = true), CMD_FAIL_HTLC(4, Right(TemporaryNodeFailure), commit = true))) channel.expectNoMsg(100 millis) // let's assume that channel 1 was disconnected before having signed the fails, and gets connected again: - system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, channels.head)) + system.eventStream.publish(ChannelStateChanged(channel.ref, channels.head.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(channels.head.commitments))) val fails_ab_1_bis = channel.expectMsgType[CMD_FAIL_HTLC] :: channel.expectMsgType[CMD_FAIL_HTLC] :: Nil assert(fails_ab_1_bis.toSet === Set(CMD_FAIL_HTLC(1, Right(TemporaryNodeFailure), commit = true), CMD_FAIL_HTLC(4, Right(TemporaryNodeFailure), commit = true))) channel.expectNoMsg(100 millis) // let's now assume that channel 1 gets reconnected, and it had the time to fail the htlcs: val data1 = channels.head.copy(commitments = channels.head.commitments.copy(localCommit = channels.head.commitments.localCommit.copy(spec = channels.head.commitments.localCommit.spec.copy(htlcs = Set.empty)))) - system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, data1)) + system.eventStream.publish(ChannelStateChanged(channel.ref, data1.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(data1.commitments))) channel.expectNoMsg(100 millis) // post-restart cleaner has cleaned up the htlcs, so next time it won't fail them anymore, even if we artificially submit the former state: - system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, channels.head)) + system.eventStream.publish(ChannelStateChanged(channel.ref, channels.head.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(channels.head.commitments))) channel.expectNoMsg(100 millis) } @@ -171,11 +172,11 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit channels.foreach(c => nodeParams.db.channels.addOrUpdateChannel(c)) val channel = TestProbe() - f.createRelayer() + f.createRelayer(nodeParams) register.expectNoMsg(100 millis) // nothing should happen while channels are still offline. // channel 1 goes to NORMAL state: - system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, channels.head)) + system.eventStream.publish(ChannelStateChanged(channel.ref, channels.head.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(channels.head.commitments))) val expected1 = Set( CMD_FAIL_HTLC(0, Right(TemporaryNodeFailure), commit = true), CMD_FULFILL_HTLC(3, preimage, commit = true), @@ -187,7 +188,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit channel.expectNoMsg(100 millis) // channel 2 goes to NORMAL state: - system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, channels(1))) + system.eventStream.publish(ChannelStateChanged(channel.ref, channels(1).channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(channels(1).commitments))) val expected2 = Set( CMD_FAIL_HTLC(1, Right(TemporaryNodeFailure), commit = true), CMD_FAIL_HTLC(3, Right(TemporaryNodeFailure), commit = true), @@ -199,18 +200,18 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit channel.expectNoMsg(100 millis) // let's assume that channel 1 was disconnected before having signed the updates, and gets connected again: - system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, channels.head)) + system.eventStream.publish(ChannelStateChanged(channel.ref, channels.head.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(channels.head.commitments))) val received3 = expected1.map(_ => channel.expectMsgType[Command]) assert(received3 === expected1) channel.expectNoMsg(100 millis) // let's now assume that channel 1 gets reconnected, and it had the time to sign the htlcs: val data1 = channels.head.copy(commitments = channels.head.commitments.copy(localCommit = channels.head.commitments.localCommit.copy(spec = channels.head.commitments.localCommit.spec.copy(htlcs = Set.empty)))) - system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, data1)) + system.eventStream.publish(ChannelStateChanged(channel.ref, data1.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(data1.commitments))) channel.expectNoMsg(100 millis) // post-restart cleaner has cleaned up the htlcs, so next time it won't fail them anymore, even if we artificially submit the former state: - system.eventStream.publish(ChannelStateChanged(channel.ref, system.deadLetters, a, OFFLINE, NORMAL, channels.head)) + system.eventStream.publish(ChannelStateChanged(channel.ref, channels.head.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(channels.head.commitments))) channel.expectNoMsg(100 millis) } @@ -218,7 +219,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit import f._ val testCase = setupLocalPayments(nodeParams) - val relayer = createRelayer() + val relayer = createRelayer(nodeParams) register.expectNoMsg(100 millis) sender.send(relayer, testCase.fails(1)) @@ -247,7 +248,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit import f._ val testCase = setupLocalPayments(nodeParams) - val relayer = f.createRelayer() + val relayer = f.createRelayer(nodeParams) register.expectNoMsg(100 millis) sender.send(relayer, testCase.fulfills(1)) @@ -388,13 +389,13 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit nodeParams.db.channels.addOrUpdateChannel(data_upstream_3) nodeParams.db.channels.addOrUpdateChannel(data_downstream) - val relayer = f.createRelayer() + val relayer = f.createRelayer(nodeParams) register.expectNoMsg(100 millis) // nothing should happen while channels are still offline. val (channel_upstream_1, channel_upstream_2, channel_upstream_3) = (TestProbe(), TestProbe(), TestProbe()) - system.eventStream.publish(ChannelStateChanged(channel_upstream_1.ref, system.deadLetters, a, OFFLINE, NORMAL, data_upstream_1)) - system.eventStream.publish(ChannelStateChanged(channel_upstream_2.ref, system.deadLetters, a, OFFLINE, NORMAL, data_upstream_2)) - system.eventStream.publish(ChannelStateChanged(channel_upstream_3.ref, system.deadLetters, a, OFFLINE, NORMAL, data_upstream_3)) + system.eventStream.publish(ChannelStateChanged(channel_upstream_1.ref, data_upstream_1.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(data_upstream_1.commitments))) + system.eventStream.publish(ChannelStateChanged(channel_upstream_2.ref, data_upstream_2.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(data_upstream_2.commitments))) + system.eventStream.publish(ChannelStateChanged(channel_upstream_3.ref, data_upstream_3.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(data_upstream_3.commitments))) // Payment 1 should fail instantly. channel_upstream_1.expectMsg(CMD_FAIL_HTLC(0, Right(TemporaryNodeFailure), commit = true)) @@ -422,7 +423,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit import f._ val testCase = setupChannelRelayedPayments(nodeParams) - val relayer = f.createRelayer() + val relayer = f.createRelayer(nodeParams) register.expectNoMsg(100 millis) sender.send(relayer, buildForwardFail(testCase.downstream, testCase.origin)) @@ -437,7 +438,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit import f._ val testCase = setupChannelRelayedPayments(nodeParams) - val relayer = f.createRelayer() + val relayer = f.createRelayer(nodeParams) register.expectNoMsg(100 millis) // we need a reference to the post-htlc-restart child actor @@ -457,7 +458,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit import f._ val testCase = setupTrampolinePayments(nodeParams) - val relayer = f.createRelayer() + val relayer = f.createRelayer(nodeParams) register.expectNoMsg(100 millis) // we need a reference to the post-htlc-restart child actor @@ -491,7 +492,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit import f._ val testCase = setupTrampolinePayments(nodeParams) - val relayer = f.createRelayer() + val relayer = f.createRelayer(nodeParams) register.expectNoMsg(100 millis) // we need a reference to the post-htlc-restart child actor @@ -524,7 +525,7 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit import f._ val testCase = setupTrampolinePayments(nodeParams) - val relayer = f.createRelayer() + val relayer = f.createRelayer(nodeParams) register.expectNoMsg(100 millis) // we need a reference to the post-htlc-restart child actor @@ -543,6 +544,105 @@ class PostRestartHtlcCleanerSpec extends TestKitBaseClass with FixtureAnyFunSuit eventListener.expectNoMsg(100 millis) } + test("Relayed nonstandard->standard HTLC is retained") { f => + import f._ + + val relayedPaymentHash = randomBytes32 + val trampolineRelayedPaymentHash = randomBytes32 + val trampolineRelayed = Origin.TrampolineRelayedCold((channelId_ab_1, 0L) :: Nil) + val relayedHtlc1In = buildHtlcIn(0L, channelId_ab_1, trampolineRelayedPaymentHash) + val relayedhtlc1Out = buildHtlcOut(50L, channelId_ab_2, trampolineRelayedPaymentHash) + val nonRelayedHtlc2In = buildHtlcIn(1L, channelId_ab_1, relayedPaymentHash) + + val pluginParams = new CustomCommitmentsPlugin { + def name = "test with incoming HTLC from remote" + def getIncomingHtlcs: Seq[PostRestartHtlcCleaner.IncomingHtlc] = List(PostRestartHtlcCleaner.IncomingHtlc(relayedHtlc1In.add, None), PostRestartHtlcCleaner.IncomingHtlc(nonRelayedHtlc2In.add, None)) + def getHtlcsRelayedOut(htlcsIn: Seq[PostRestartHtlcCleaner.IncomingHtlc]): Map[Origin, Set[(ByteVector32, Long)]] = Map.empty + } + + val nodeParams1 = nodeParams.copy(pluginParams = List(pluginParams)) + val c = ChannelCodecsSpec.makeChannelDataNormal(List(relayedhtlc1Out), Map(50L -> trampolineRelayed)) + nodeParams1.db.channels.addOrUpdateChannel(c) + + val channel = TestProbe() + f.createRelayer(nodeParams1) + register.expectNoMsg(100 millis) // nothing should happen while channels are still offline. + + val cs = new AbstractCommitments { + def getOutgoingHtlcCrossSigned(htlcId: Long): Option[UpdateAddHtlc] = None + def getIncomingHtlcCrossSigned(htlcId: Long): Option[UpdateAddHtlc] = { + if (htlcId == 0L) Some(relayedHtlc1In.add) + else if (htlcId == 1L) Some(nonRelayedHtlc2In.add) + else None + } + def timedOutOutgoingHtlcs(blockheight: Long): Set[UpdateAddHtlc] = Set.empty + def localNodeId: PublicKey = randomExtendedPrivateKey.publicKey + def remoteNodeId: PublicKey = randomExtendedPrivateKey.publicKey + def capacity: Satoshi = Long.MaxValue.sat + def availableBalanceForReceive: MilliSatoshi = Long.MaxValue.msat + def availableBalanceForSend: MilliSatoshi = 0.msat + def originChannels: Map[Long, Origin] = Map.empty + def channelId: ByteVector32 = channelId_ab_1 + def announceChannel: Boolean = false + } + + // Non-standard channel goes to NORMAL state: + system.eventStream.publish(ChannelStateChanged(channel.ref, channelId_ab_1, system.deadLetters, a, OFFLINE, NORMAL, Some(cs))) + channel.expectMsg(CMD_FAIL_HTLC(1L, Right(TemporaryNodeFailure), commit = true)) + channel.expectNoMsg(100 millis) + } + + test("Relayed standard->nonstandard HTLC is retained") { f => + import f._ + + val relayedPaymentHash = randomBytes32 + val trampolineRelayedPaymentHash = randomBytes32 + val trampolineRelayed = Origin.TrampolineRelayedCold((channelId_ab_2, 0L) :: Nil) + val relayedHtlcIn = buildHtlcIn(0L, channelId_ab_2, trampolineRelayedPaymentHash) + val nonRelayedHtlcIn = buildHtlcIn(1L, channelId_ab_2, relayedPaymentHash) + + val pluginParams = new CustomCommitmentsPlugin { + def name = "test with outgoing HTLC to remote" + def getIncomingHtlcs: Seq[PostRestartHtlcCleaner.IncomingHtlc] = List.empty + def getHtlcsRelayedOut(htlcsIn: Seq[PostRestartHtlcCleaner.IncomingHtlc]): Map[Origin, Set[(ByteVector32, Long)]] = Map(trampolineRelayed -> Set((channelId_ab_1, 10L))) + } + + val nodeParams1 = nodeParams.copy(pluginParams = List(pluginParams)) + val c = ChannelCodecsSpec.makeChannelDataNormal(List(relayedHtlcIn, nonRelayedHtlcIn), Map.empty) + nodeParams1.db.channels.addOrUpdateChannel(c) + + val channel = TestProbe() + f.createRelayer(nodeParams1) + register.expectNoMsg(100 millis) // nothing should happen while channels are still offline. + + // Standard channel goes to NORMAL state: + system.eventStream.publish(ChannelStateChanged(channel.ref, c.commitments.channelId, system.deadLetters, a, OFFLINE, NORMAL, Some(c.commitments))) + channel.expectMsg(CMD_FAIL_HTLC(1L, Right(TemporaryNodeFailure), commit = true)) + channel.expectNoMsg(100 millis) + } + + test("Non-standard HTLC CMD_FAIL in relayDb is retained") { f => + import f._ + + val trampolineRelayedPaymentHash = randomBytes32 + val relayedHtlc1In = buildHtlcIn(0L, channelId_ab_1, trampolineRelayedPaymentHash) + + val pluginParams = new CustomCommitmentsPlugin { + def name = "test with incoming HTLC from remote" + def getIncomingHtlcs: Seq[PostRestartHtlcCleaner.IncomingHtlc] = List(PostRestartHtlcCleaner.IncomingHtlc(relayedHtlc1In.add, None)) + def getHtlcsRelayedOut(htlcsIn: Seq[PostRestartHtlcCleaner.IncomingHtlc]): Map[Origin, Set[(ByteVector32, Long)]] = Map.empty + } + + val cmd1 = CMD_FAIL_HTLC(id = 0L, reason = Left(ByteVector.empty), replyTo_opt = None) + val cmd2 = CMD_FAIL_HTLC(id = 1L, reason = Left(ByteVector.empty), replyTo_opt = None) + val nodeParams1 = nodeParams.copy(pluginParams = List(pluginParams)) + nodeParams1.db.pendingRelay.addPendingRelay(channelId_ab_1, cmd1) + nodeParams1.db.pendingRelay.addPendingRelay(channelId_ab_1, cmd2) + f.createRelayer(nodeParams1) + register.expectNoMsg(100 millis) + awaitCond(Seq(cmd1) == nodeParams1.db.pendingRelay.listPendingRelay(channelId_ab_1)) + } + } object PostRestartHtlcCleanerSpec { diff --git a/eclair-node-gui/src/main/scala/fr/acinq/eclair/gui/GUIUpdater.scala b/eclair-node-gui/src/main/scala/fr/acinq/eclair/gui/GUIUpdater.scala index a64d72a016..da8653111d 100644 --- a/eclair-node-gui/src/main/scala/fr/acinq/eclair/gui/GUIUpdater.scala +++ b/eclair-node-gui/src/main/scala/fr/acinq/eclair/gui/GUIUpdater.scala @@ -101,11 +101,11 @@ class GUIUpdater(mainController: MainController) extends Actor with ActorLogging val channelPaneController = m(channel) runInGuiThread(() => channelPaneController.channelId.setText(channelId.toHex)) - case ChannelStateChanged(channel, _, _, _, currentState, currentData) if m.contains(channel) => + case ChannelStateChanged(channel, _, _, _, _, currentState, commitments_opt) if m.contains(channel) => val channelPaneController = m(channel) runInGuiThread { () => - (currentState, currentData) match { - case (WAIT_FOR_FUNDING_CONFIRMED, d: HasCommitments) => channelPaneController.txId.setText(d.commitments.commitInput.outPoint.txid.toHex) + (currentState, commitments_opt) match { + case (WAIT_FOR_FUNDING_CONFIRMED, Some(c: Commitments)) => channelPaneController.txId.setText(c.commitInput.outPoint.txid.toHex) case _ => } channelPaneController.close.setVisible(STATE_MUTUAL_CLOSE.contains(currentState)) diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala b/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala index 14e12c5d95..3c834d2492 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/Boot.scala @@ -41,8 +41,7 @@ object Boot extends App with Logging { implicit val system: ActorSystem = ActorSystem("eclair-node", config) implicit val ec: ExecutionContext = system.dispatcher - val pluginParams = plugins.flatMap(_.params) - val setup = new Setup(datadir, pluginParams) + val setup = new Setup(datadir, plugins.map(_.params)) if (config.getBoolean("eclair.enable-kamon")) { Kamon.init(config) diff --git a/eclair-node/src/main/scala/fr/acinq/eclair/Plugin.scala b/eclair-node/src/main/scala/fr/acinq/eclair/Plugin.scala index ea1ac8a400..990364dc9c 100644 --- a/eclair-node/src/main/scala/fr/acinq/eclair/Plugin.scala +++ b/eclair-node/src/main/scala/fr/acinq/eclair/Plugin.scala @@ -23,7 +23,7 @@ import scala.util.{Failure, Success, Try} trait Plugin { - def params: Option[PluginParams] + def params: PluginParams def onSetup(setup: Setup): Unit diff --git a/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala b/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala index a13ee635a3..dfb8c93d6a 100644 --- a/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala +++ b/eclair-node/src/test/scala/fr/acinq/eclair/api/ApiServiceSpec.scala @@ -540,7 +540,7 @@ class ApiServiceSpec extends AnyFunSuite with ScalatestRouteTest with IdiomaticM system.eventStream.publish(chcr) wsClient.expectMessage(expectedSerializedChcr) - val chsc = ChannelStateChanged(system.deadLetters, system.deadLetters, bobNodeId, OFFLINE, NORMAL, null) + val chsc = ChannelStateChanged(system.deadLetters, null, system.deadLetters, bobNodeId, OFFLINE, NORMAL, null) val expectedSerializedChsc = """{"type":"channel-state-changed","remoteNodeId":"039dc0e0b1d25905e44fdf6f8e89755a5e219685840d0bc1d28d3308f9628a3585","previousState":"OFFLINE","currentState":"NORMAL"}""" assert(serialization.write(chsc) === expectedSerializedChsc) system.eventStream.publish(chsc)