Skip to content

Commit

Permalink
Dedicated event for channel_update modifications (#1935)
Browse files Browse the repository at this point in the history
Use an event `ChannelUpdateParametersChanged` for the sole purpose of tracking changes to channel_update.

Also, conf change at restore is now treated like a regular fee update. We do handle `CMD_UPDATE_RELAY_FEES` in both `OFFLINE` and `SYNCING`, because there may be a race between `CMD_UPDATE_RELAY_FEES` and
`ChannelRestablish`. And there was no good reason to behave differently in those states anyway.

* fix updateRelayFee api call

The `Register` should be used to channel actors, not the `Router`.
The former tracks all channels, whereas the latter only contains
channels in certain states. We only query the `Router` when we need
reference to external (public) nodes and channels.
  • Loading branch information
pm47 authored Sep 3, 2021
1 parent 9f9f10e commit daace53
Show file tree
Hide file tree
Showing 23 changed files with 366 additions and 221 deletions.
12 changes: 9 additions & 3 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
for (nodeId <- nodes) {
appKit.nodeParams.db.peers.addOrUpdateRelayFees(nodeId, RelayFees(feeBaseMsat, feeProportionalMillionths))
}
(appKit.router ? Router.GetLocalChannels).mapTo[Iterable[LocalChannel]]
.map(channels => channels.filter(c => nodes.contains(c.remoteNodeId)).map(c => Right(c.shortChannelId)))
.flatMap(channels => sendToChannels[CommandResponse[CMD_UPDATE_RELAY_FEE]](channels.toList, CMD_UPDATE_RELAY_FEE(ActorRef.noSender, feeBaseMsat, feeProportionalMillionths)))
sendToNodes(nodes, CMD_UPDATE_RELAY_FEE(ActorRef.noSender, feeBaseMsat, feeProportionalMillionths, cltvExpiryDelta_opt = None))
}

override def peers()(implicit timeout: Timeout): Future[Iterable[PeerInfo]] = for {
Expand Down Expand Up @@ -424,6 +422,14 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {
Future.foldLeft(commands)(Map.empty[ApiTypes.ChannelIdentifier, Either[Throwable, T]])(_ + _)
}

/** Send a request to multiple channels using node ids */
private def sendToNodes[T: ClassTag](nodeids: List[PublicKey], request: Any)(implicit timeout: Timeout): Future[Map[ApiTypes.ChannelIdentifier, Either[Throwable, T]]] = {
for {
channelIds <- (appKit.register ? Symbol("channelsTo")).mapTo[Map[ByteVector32, PublicKey]].map(_.filter(kv => nodeids.contains(kv._2)).keys)
res <- sendToChannels[T](channelIds.map(Left(_)).toList, request)
} yield res
}

override def getInfo()(implicit timeout: Timeout): Future[GetInfoResponse] = Future.successful(
GetInfoResponse(
version = Kit.getVersionLong,
Expand Down
109 changes: 57 additions & 52 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ final case class CMD_SIGN(replyTo_opt: Option[ActorRef] = None) extends HasOptio
sealed trait CloseCommand extends HasReplyToCommand
final case class CMD_CLOSE(replyTo: ActorRef, scriptPubKey: Option[ByteVector]) extends CloseCommand
final case class CMD_FORCECLOSE(replyTo: ActorRef) extends CloseCommand
final case class CMD_UPDATE_RELAY_FEE(replyTo: ActorRef, feeBase: MilliSatoshi, feeProportionalMillionths: Long) extends HasReplyToCommand
final case class CMD_UPDATE_RELAY_FEE(replyTo: ActorRef, feeBase: MilliSatoshi, feeProportionalMillionths: Long, cltvExpiryDelta_opt: Option[CltvExpiryDelta]) extends HasReplyToCommand
final case class CMD_GETSTATE(replyTo: ActorRef) extends HasReplyToCommand
final case class CMD_GETSTATEDATA(replyTo: ActorRef) extends HasReplyToCommand
final case class CMD_GETINFO(replyTo: ActorRef)extends HasReplyToCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ case class ChannelIdAssigned(channel: ActorRef, remoteNodeId: PublicKey, tempora

case class ShortChannelIdAssigned(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, previousShortChannelId: Option[ShortChannelId]) extends ChannelEvent

case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey, channelAnnouncement_opt: Option[ChannelAnnouncement], channelUpdate: ChannelUpdate, previousChannelUpdate_opt: Option[ChannelUpdate], commitments: AbstractCommitments) extends ChannelEvent
case class LocalChannelUpdate(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey, channelAnnouncement_opt: Option[ChannelAnnouncement], channelUpdate: ChannelUpdate, commitments: AbstractCommitments) extends ChannelEvent

case class ChannelUpdateParametersChanged(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey, channelUpdate: ChannelUpdate) extends ChannelEvent

case class LocalChannelDown(channel: ActorRef, channelId: ByteVector32, shortChannelId: ShortChannelId, remoteNodeId: PublicKey) extends ChannelEvent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ trait AuditDb extends Closeable {

def add(channelErrorOccurred: ChannelErrorOccurred): Unit

def addChannelUpdate(localChannelUpdate: LocalChannelUpdate): Unit
def addChannelUpdate(channelUpdateParametersChanged: ChannelUpdateParametersChanged): Unit

def listSent(from: Long, to: Long): Seq[PaymentSent]

Expand Down
14 changes: 3 additions & 11 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
context.system.eventStream.subscribe(self, classOf[ChannelErrorOccurred])
context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])
context.system.eventStream.subscribe(self, classOf[ChannelClosed])
context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
context.system.eventStream.subscribe(self, classOf[ChannelUpdateParametersChanged])

override def receive: Receive = {

Expand Down Expand Up @@ -117,16 +117,8 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
auditDb.add(ChannelEvent(e.channelId, e.commitments.remoteParams.nodeId, e.commitments.commitInput.txOut.amount, e.commitments.localParams.isFunder, !e.commitments.announceChannel, event))
channelsDb.updateChannelMeta(e.channelId, event)

case u: LocalChannelUpdate =>
u.previousChannelUpdate_opt match {
case Some(previous) if
u.channelUpdate.feeBaseMsat == previous.feeBaseMsat &&
u.channelUpdate.feeProportionalMillionths == previous.feeProportionalMillionths &&
u.channelUpdate.cltvExpiryDelta == previous.cltvExpiryDelta &&
u.channelUpdate.htlcMinimumMsat == previous.htlcMinimumMsat &&
u.channelUpdate.htlcMaximumMsat == previous.htlcMaximumMsat => ()
case _ => auditDb.addChannelUpdate(u)
}
case u: ChannelUpdateParametersChanged =>
auditDb.addChannelUpdate(u)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ case class DualAuditDb(sqlite: SqliteAuditDb, postgres: PgAuditDb) extends Audit
sqlite.add(channelErrorOccurred)
}

override def addChannelUpdate(localChannelUpdate: LocalChannelUpdate): Unit = {
runAsync(postgres.addChannelUpdate(localChannelUpdate))
sqlite.addChannelUpdate(localChannelUpdate)
override def addChannelUpdate(channelUpdateParametersChanged: ChannelUpdateParametersChanged): Unit = {
runAsync(postgres.addChannelUpdate(channelUpdateParametersChanged))
sqlite.addChannelUpdate(channelUpdateParametersChanged)
}

override def listSent(from: Long, to: Long): Seq[PaymentSent] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package fr.acinq.eclair.db.pg

import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, Satoshi, SatoshiLong}
import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalChannelUpdate, LocalError, NetworkFeePaid, RemoteError}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
Expand Down Expand Up @@ -243,7 +243,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def addChannelUpdate(u: LocalChannelUpdate): Unit = withMetrics("audit/add-channel-update", DbBackends.Postgres) {
override def addChannelUpdate(u: ChannelUpdateParametersChanged): Unit = withMetrics("audit/add-channel-update", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO audit.channel_updates VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, u.channelId.toHex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package fr.acinq.eclair.db.sqlite

import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
import fr.acinq.bitcoin.{ByteVector32, Satoshi, SatoshiLong}
import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalChannelUpdate, LocalError, NetworkFeePaid, RemoteError}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
Expand Down Expand Up @@ -240,7 +240,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
}
}

override def addChannelUpdate(u: LocalChannelUpdate): Unit = withMetrics("audit/add-channel-update", DbBackends.Sqlite) {
override def addChannelUpdate(u: ChannelUpdateParametersChanged): Unit = withMetrics("audit/add-channel-update", DbBackends.Sqlite) {
using(sqlite.prepareStatement("INSERT INTO channel_updates VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setBytes(1, u.channelId.toArray)
statement.setBytes(2, u.remoteNodeId.value.toArray)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object ChannelRelayer {
replyTo ! Relayer.OutgoingChannels(channels.toSeq)
Behaviors.same

case WrappedLocalChannelUpdate(LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, _, commitments)) =>
case WrappedLocalChannelUpdate(LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments)) =>
context.log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments)
val channelUpdates1 = channelUpdates + (channelUpdate.shortChannelId -> Relayer.OutgoingChannel(remoteNodeId, channelUpdate, commitments))
val node2channels1 = node2channels.addOne(remoteNodeId, channelUpdate.shortChannelId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ object Announcements {
def areSame(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.copy(signature = ByteVector64.Zeroes, timestamp = 0) == u2.copy(signature = ByteVector64.Zeroes, timestamp = 0)

def areSameIgnoreFlags(u1: ChannelUpdate, u2: ChannelUpdate): Boolean =
u1.feeBaseMsat == u2.feeBaseMsat &&
u1.feeProportionalMillionths == u2.feeProportionalMillionths &&
u1.cltvExpiryDelta == u2.cltvExpiryDelta &&
u1.htlcMinimumMsat == u2.htlcMinimumMsat &&
u1.htlcMaximumMsat == u2.htlcMaximumMsat

def makeMessageFlags(hasOptionChannelHtlcMax: Boolean): Byte = BitVector.bits(hasOptionChannelHtlcMax :: Nil).padLeft(8).toByte()

def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ case class ChannelUpdate(signature: ByteVector64,
require(((messageFlags & 1) != 0) == htlcMaximumMsat.isDefined, "htlcMaximumMsat is not consistent with messageFlags")

def isNode1 = Announcements.isNode1(channelFlags)

def toStringShort: String = s"cltvExpiryDelta=$cltvExpiryDelta,feeBase=$feeBaseMsat,feeProportionalMillionths=$feeProportionalMillionths"
}

// @formatter:off
Expand Down
125 changes: 0 additions & 125 deletions eclair-core/src/test/scala/fr/acinq/eclair/channel/RecoverySpec.scala

This file was deleted.

Loading

0 comments on commit daace53

Please sign in to comment.