Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

htlc post restart 4 #1586

Merged
merged 19 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 13 additions & 20 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,16 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
maxPaymentAttempts: Int,
enableTrampolinePayment: Boolean) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey

val nodeId: PublicKey = nodeKeyManager.nodeId
val keyPair = KeyPair(nodeId.value, privateKey.value)

val pluginMessageTags: Set[Int] = pluginParams.flatMap(_.tags).toSet
val keyPair: KeyPair = KeyPair(nodeId.value, privateKey.value)

val pluginMessageTags: Set[Int] = pluginParams.collect { case p: CustomFeaturePlugin => p.messageTags }.toSet.flatten

def currentBlockHeight: Long = blockCount.get

def featuresFor(nodeId: PublicKey) = overrideFeatures.getOrElse(nodeId, features)
def featuresFor(nodeId: PublicKey): Features = overrideFeatures.getOrElse(nodeId, features)
}

object NodeParams extends Logging {
Expand Down Expand Up @@ -241,23 +243,24 @@ object NodeParams extends Logging {
require(features.hasFeature(Features.VariableLengthOnion), s"${Features.VariableLengthOnion.rfcName} must be enabled")
}

val pluginMessageParams = pluginParams.collect { case p: CustomFeaturePlugin => p }
val features = Features.fromConfiguration(config)
validateFeatures(features)

require(pluginParams.forall(_.feature.mandatory > 128), "Plugin mandatory feature bit is too low, must be > 128")
require(pluginParams.forall(_.feature.mandatory % 2 == 0), "Plugin mandatory feature bit is odd, must be even")
require(pluginParams.flatMap(_.tags).forall(_ > 32768), "Plugin messages tags must be > 32768")
val pluginFeatureSet = pluginParams.map(_.feature.mandatory).toSet
require(pluginMessageParams.forall(_.feature.mandatory > 128), "Plugin mandatory feature bit is too low, must be > 128")
require(pluginMessageParams.forall(_.feature.mandatory % 2 == 0), "Plugin mandatory feature bit is odd, must be even")
require(pluginMessageParams.flatMap(_.messageTags).forall(_ > 32768), "Plugin messages tags must be > 32768")
val pluginFeatureSet = pluginMessageParams.map(_.feature.mandatory).toSet
require(Features.knownFeatures.map(_.mandatory).intersect(pluginFeatureSet).isEmpty, "Plugin feature bit overlaps with known feature bit")
require(pluginFeatureSet.size == pluginParams.size, "Duplicate plugin feature bits found")
require(pluginFeatureSet.size == pluginMessageParams.size, "Duplicate plugin feature bits found")

val coreAndPluginFeatures = features.copy(unknown = features.unknown ++ pluginParams.map(_.pluginFeature))
val coreAndPluginFeatures = features.copy(unknown = features.unknown ++ pluginMessageParams.map(_.pluginFeature))

val overrideFeatures: Map[PublicKey, Features] = config.getConfigList("override-features").asScala.map { e =>
val p = PublicKey(ByteVector.fromValidHex(e.getString("nodeid")))
val f = Features.fromConfiguration(e)
validateFeatures(f)
p -> f.copy(unknown = f.unknown ++ pluginParams.map(_.pluginFeature))
p -> f.copy(unknown = f.unknown ++ pluginMessageParams.map(_.pluginFeature))
}.toMap

val syncWhitelist: Set[PublicKey] = config.getStringList("sync-whitelist").asScala.map(s => PublicKey(ByteVector.fromValidHex(s))).toSet
Expand Down Expand Up @@ -382,13 +385,3 @@ object NodeParams extends Logging {
)
}
}

/**
* @param tags a set of LightningMessage tags that plugin is interested in
* @param feature a Feature bit that plugin advertises through Init message
*/
case class PluginParams(tags: Set[Int], feature: Feature) {
def pluginFeature: UnknownFeature = UnknownFeature(feature.optional)

override def toString: String = s"Messaging enabled plugin=${feature.rfcName} with feature bit=${feature.optional} and LN message tags=${tags.mkString(",")}"
}
56 changes: 56 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/PluginParams.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2020 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair

import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.channel.Origin
import fr.acinq.eclair.payment.relay.PostRestartHtlcCleaner.IncomingHtlc

/** Custom plugin parameters. */
sealed trait PluginParams {
/** Plugin's friendly name. */
def name: String
}

/** Parameters for a plugin that adds support for an experimental or unofficial Bolt9 feature. */
trait CustomFeaturePlugin extends PluginParams {
/** A set of LightningMessage tags that the plugin wants to listen to. */
def messageTags: Set[Int]

/** Feature bit that the plugin wants to advertise through Init message. */
def feature: Feature

/** Plugin feature is always defined as unknown and optional. */
def pluginFeature: UnknownFeature = UnknownFeature(feature.optional)
}

/** Parameters for a plugin that defines custom commitment transactions (or non-standard HTLCs). */
trait CustomCommitmentsPlugin extends PluginParams {
/**
* If we do nothing after a restart, incoming HTLCs that were committed upstream but not relayed will eventually
* expire. If your plugin defines non-standard HTLCs, and they need to be automatically failed, they should be
* returned by this method.
*/
def getIncomingHtlcs: Seq[IncomingHtlc]

/**
* Outgoing HTLC sets that are still pending may either succeed or fail: we need to watch them to properly forward the
* result upstream to preserve channels. If you have non-standard HTLCs that may be in this situation, they should be
* returned by this method.
*/
def getHtlcsRelayedOut(htlcsIn: Seq[IncomingHtlc]): Map[Origin, Set[(ByteVector32, Long)]]
}
9 changes: 5 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ import scala.util.{Failure, Success}
*
* Created by PM on 25/01/2016.
*
* @param datadir directory where eclair-core will write/read its data.
* @param seeds_opt optional seeds, if set eclair will use them instead of generating them and won't create a node_seed.dat and channel_seed.dat files.
* @param db optional databases to use, if not set eclair will create the necessary databases
* @param datadir directory where eclair-core will write/read its data.
* @param pluginParams parameters for all configured plugins.
* @param seeds_opt optional seeds, if set eclair will use them instead of generating them and won't create a node_seed.dat and channel_seed.dat files.
* @param db optional databases to use, if not set eclair will create the necessary databases
*/
class Setup(datadir: File,
pluginParams: Seq[PluginParams],
Expand Down Expand Up @@ -127,7 +128,7 @@ class Setup(datadir: File,
}

val nodeParams = NodeParams.makeNodeParams(config, instanceId, nodeKeyManager, channelKeyManager, initTor(), databases, blockCount, feeEstimator, pluginParams)
pluginParams.foreach(param => logger.info(param.toString))
pluginParams.foreach(param => logger.info(s"Using plugin: ${param.name}"))

val serverBindingAddress = new InetSocketAddress(
config.getString("server.binding-ip"),
Expand Down
44 changes: 22 additions & 22 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
}

case Event(c: CloseCommand, d) =>
val channelId = Helpers.getChannelId(d)
channelOpenReplyToUser(Right(ChannelOpenResponse.ChannelClosed(channelId)))
handleFastClose(c, channelId)
channelOpenReplyToUser(Right(ChannelOpenResponse.ChannelClosed(d.channelId)))
handleFastClose(c, d.channelId)

case Event(TickChannelOpenTimeout, _) =>
channelOpenReplyToUser(Left(LocalError(new RuntimeException("open channel cancelled, took too long"))))
Expand Down Expand Up @@ -334,9 +333,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
goto(WAIT_FOR_FUNDING_CREATED) using DATA_WAIT_FOR_FUNDING_CREATED(open.temporaryChannelId, localParams, remoteParams, open.fundingSatoshis, open.pushMsat, open.feeratePerKw, open.firstPerCommitmentPoint, open.channelFlags, channelVersion, accept) sending accept
}

case Event(c: CloseCommand, d) =>
val channelId = Helpers.getChannelId(d)
handleFastClose(c, channelId)
case Event(c: CloseCommand, d) => handleFastClose(c, d.channelId)

case Event(e: Error, d: DATA_WAIT_FOR_OPEN_CHANNEL) => handleRemoteError(e, d)

Expand Down Expand Up @@ -1666,27 +1663,26 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

case Event(c: CMD_GETINFO, _) =>
val replyTo = if (c.replyTo == ActorRef.noSender) sender else c.replyTo
val channelId = Helpers.getChannelId(stateData)
replyTo ! RES_GETINFO(remoteNodeId, channelId, stateName, stateData)
replyTo ! RES_GETINFO(remoteNodeId, stateData.channelId, stateName, stateData)
stay

case Event(c: CMD_ADD_HTLC, d: HasCommitments) =>
log.info(s"rejecting htlc request in state=$stateName")
val error = ChannelUnavailable(d.channelId)
handleAddHtlcCommandError(c, error, None) // we don't provide a channel_update: this will be a permanent channel failure

case Event(c: CMD_CLOSE, d) => handleCommandError(CommandUnavailableInThisState(Helpers.getChannelId(d), "close", stateName), c)
case Event(c: CMD_CLOSE, d) => handleCommandError(CommandUnavailableInThisState(d.channelId, "close", stateName), c)

case Event(c: CMD_FORCECLOSE, d) =>
d match {
case data: HasCommitments =>
val replyTo = if (c.replyTo == ActorRef.noSender) sender else c.replyTo
replyTo ! RES_SUCCESS(c, data.channelId)
handleLocalError(ForcedLocalCommit(data.channelId), data, Some(c))
case _ => handleCommandError(CommandUnavailableInThisState(Helpers.getChannelId(d), "forceclose", stateName), c)
case _ => handleCommandError(CommandUnavailableInThisState(d.channelId, "forceclose", stateName), c)
}

case Event(c: CMD_UPDATE_RELAY_FEE, d) => handleCommandError(CommandUnavailableInThisState(Helpers.getChannelId(d), "updaterelayfee", stateName), c)
case Event(c: CMD_UPDATE_RELAY_FEE, d) => handleCommandError(CommandUnavailableInThisState(d.channelId, "updaterelayfee", stateName), c)

// we only care about this event in NORMAL and SHUTDOWN state, and there may be cases where the task is not cancelled
case Event(_: RevocationTimeout, _) => stay
Expand Down Expand Up @@ -1718,8 +1714,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case WAIT_FOR_INIT_INTERNAL -> WAIT_FOR_INIT_INTERNAL => () // called at channel initialization
case state -> nextState =>
if (state != nextState) {
context.system.eventStream.publish(ChannelStateChanged(self, peer, remoteNodeId, state, nextState, nextStateData))
val commitments_opt = nextStateData match {
case hasCommitments: HasCommitments => Some(hasCommitments.commitments)
case _ => None
}
context.system.eventStream.publish(ChannelStateChanged(self, nextStateData.channelId, peer, remoteNodeId, state, nextState, commitments_opt))
}

if (nextState == CLOSED) {
// channel is closed, scheduling this actor for self destruction
context.system.scheduler.scheduleOnce(10 seconds, self, Symbol("shutdown"))
Expand Down Expand Up @@ -1876,8 +1877,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case hasReplyTo: HasReplyToCommand => if (hasReplyTo.replyTo == ActorRef.noSender) Some(sender) else Some(hasReplyTo.replyTo)
}
replyTo_opt.foreach { replyTo =>
val channelId = Helpers.getChannelId(newData)
replyTo ! RES_SUCCESS(c, channelId)
replyTo ! RES_SUCCESS(c, newData.channelId)
}
stay using newData
}
Expand All @@ -1886,7 +1886,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
log.warning(s"${cause.getMessage} while processing cmd=${c.getClass.getSimpleName} in state=$stateName")
val replyTo = if (c.replyTo == ActorRef.noSender) sender else c.replyTo
replyTo ! RES_ADD_FAILED(c, cause, channelUpdate)
context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(cause), isFatal = false))
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(cause), isFatal = false))
stay
}

Expand All @@ -1903,7 +1903,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
replyTo_opt.foreach { replyTo =>
replyTo ! RES_FAILURE(c, cause)
}
context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(cause), isFatal = false))
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(cause), isFatal = false))
stay
}

Expand Down Expand Up @@ -1956,15 +1956,15 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val error = Error(d.channelId, exc.getMessage)
// NB: we don't use the handleLocalError handler because it would result in the commit tx being published, which we don't want:
// implementation *guarantees* that in case of BITCOIN_FUNDING_PUBLISH_FAILED, the funding tx hasn't and will never be published, so we can close the channel right away
context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(exc), isFatal = true))
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(exc), isFatal = true))
goto(CLOSED) sending error
}

def handleFundingTimeout(d: HasCommitments) = {
log.warning(s"funding tx hasn't been confirmed in time, cancelling channel delay=$FUNDING_TIMEOUT_FUNDEE")
val exc = FundingTxTimedout(d.channelId)
val error = Error(d.channelId, exc.getMessage)
context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(exc), isFatal = true))
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(exc), isFatal = true))
goto(CLOSED) sending error
}

Expand Down Expand Up @@ -2035,8 +2035,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case _: ChannelException => ()
case _ => log.error(cause, s"msg=${msg.getOrElse("n/a")} stateData=$stateData ")
}
val error = Error(Helpers.getChannelId(d), cause.getMessage)
context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, LocalError(cause), isFatal = true))
val error = Error(d.channelId, cause.getMessage)
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, LocalError(cause), isFatal = true))

d match {
case dd: HasCommitments if Closing.nothingAtStake(dd) => goto(CLOSED)
Expand All @@ -2052,7 +2052,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
def handleRemoteError(e: Error, d: Data) = {
// see BOLT 1: only print out data verbatim if is composed of printable ASCII characters
log.error(s"peer sent error: ascii='${e.toAscii}' bin=${e.data.toHex}")
context.system.eventStream.publish(ChannelErrorOccurred(self, Helpers.getChannelId(stateData), remoteNodeId, stateData, RemoteError(e), isFatal = true))
context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, stateData, RemoteError(e), isFatal = true))

d match {
case _: DATA_CLOSING => stay // nothing to do, there is already a spending tx published
Expand Down Expand Up @@ -2402,7 +2402,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
val category_opt = LogCategory(currentMessage)
val id = currentMessage match {
case INPUT_RESTORED(data) => data.channelId
case _ => Helpers.getChannelId(stateData)
case _ => stateData.channelId
}
Logs.mdc(category_opt, remoteNodeId_opt = Some(remoteNodeId), channelId_opt = Some(id))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortC

case class LocalChannelDown(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey) extends ChannelEvent

case class ChannelStateChanged(channel: ActorRef, peer: ActorRef, remoteNodeId: PublicKey, previousState: State, currentState: State, currentData: Data) extends ChannelEvent
case class ChannelStateChanged(channel: ActorRef, channelId: ByteVector32, peer: ActorRef, remoteNodeId: PublicKey, previousState: State, currentState: State, commitments_opt: Option[AbstractCommitments]) extends ChannelEvent

case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) extends ChannelEvent

Expand Down
Loading