From e5098186d4d924854765e9074f5ba282a56c3360 Mon Sep 17 00:00:00 2001 From: t-bast Date: Fri, 9 Oct 2020 13:05:32 +0200 Subject: [PATCH] Add blockchain watchdog Introduce a blockchain watchdog that fetches headers from multiple sources in order to detect whether we're being eclipsed. Use blockchainheaders.net and blockstream.info as first sources of blockchain data. More blockchain sources will be added in future commits. --- .../main/scala/fr/acinq/eclair/Setup.scala | 2 +- .../eclair/blockchain/BlockchainEvents.scala | 6 +- .../blockchain/bitcoind/ZmqWatcher.scala | 14 +- .../watchdogs/BlockchainWatchdog.scala | 80 ++++++++++ .../watchdogs/BlockstreamInfo.scala | 116 +++++++++++++++ .../blockchain/watchdogs/HeadersOverDns.scala | 138 ++++++++++++++++++ .../blockchain/watchdogs/Monitoring.scala | 36 +++++ .../blockchain/bitcoind/ZmqWatcherSpec.scala | 8 +- .../watchdogs/BlockchainWatchdogSpec.scala | 46 ++++++ .../watchdogs/BlockstreamInfoSpec.scala | 77 ++++++++++ .../watchdogs/HeadersOverDnsSpec.scala | 78 ++++++++++ .../acinq/eclair/channel/ThroughputSpec.scala | 2 +- 12 files changed, 590 insertions(+), 13 deletions(-) create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/BlockchainWatchdog.scala create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/BlockstreamInfo.scala create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/HeadersOverDns.scala create mode 100644 eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/Monitoring.scala create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/BlockchainWatchdogSpec.scala create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/BlockstreamInfoSpec.scala create mode 100644 eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/HeadersOverDnsSpec.scala diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 204f1c0e52..e274fa46d5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -263,7 +263,7 @@ class Setup(datadir: File, case Bitcoind(bitcoinClient) => system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart)) system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart)) - system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(blockCount, new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoinClient))), "watcher", SupervisorStrategy.Resume)) + system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(nodeParams.chainHash, blockCount, new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoinClient))), "watcher", SupervisorStrategy.Resume)) case Electrum(electrumClient) => zmqBlockConnected.success(Done) zmqTxConnected.success(Done) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/BlockchainEvents.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/BlockchainEvents.scala index 87bd5e5aab..32729a0348 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/BlockchainEvents.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/BlockchainEvents.scala @@ -20,10 +20,10 @@ import fr.acinq.bitcoin.{Block, Transaction} import fr.acinq.eclair.blockchain.fee.FeeratesPerKw /** - * Created by PM on 24/08/2016. - */ + * Created by PM on 24/08/2016. + */ -trait BlockchainEvent +sealed trait BlockchainEvent case class NewBlock(block: Block) extends BlockchainEvent diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala index a9fa80f37f..6abce964f8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala @@ -19,6 +19,9 @@ package fr.acinq.eclair.blockchain.bitcoind import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicLong +import akka.actor.typed.SupervisorStrategy +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps import akka.actor.{Actor, ActorLogging, Cancellable, Props, Terminated} import akka.pattern.pipe import fr.acinq.bitcoin._ @@ -26,6 +29,7 @@ import fr.acinq.eclair.KamonExt import fr.acinq.eclair.blockchain.Monitoring.Metrics import fr.acinq.eclair.blockchain._ import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient +import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog import fr.acinq.eclair.channel.BITCOIN_PARENT_TX_CONFIRMED import fr.acinq.eclair.transactions.Scripts import org.json4s.JsonAST.JDecimal @@ -42,7 +46,7 @@ import scala.util.Try * - also uses bitcoin-core rpc api, most notably for tx confirmation count and blockcount (because reorgs) * Created by PM on 21/02/2016. */ -class ZmqWatcher(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) extends Actor with ActorLogging { +class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) extends Actor with ActorLogging { import ZmqWatcher._ @@ -50,16 +54,18 @@ class ZmqWatcher(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit context.system.eventStream.subscribe(self, classOf[NewTransaction]) context.system.eventStream.subscribe(self, classOf[CurrentBlockCount]) + private val watchdog = context.spawn(Behaviors.supervise(BlockchainWatchdog(chainHash, 10, 60 seconds)).onFailure(SupervisorStrategy.resume), "blockchain-watchdog") + // this is to initialize block count self ! TickNewBlock - // @formatter: off + // @formatter:off private case class TriggerEvent(w: Watch, e: WatchEvent) private sealed trait AddWatchResult private case object Keep extends AddWatchResult private case object Ignore extends AddWatchResult - // @formatter: on + // @formatter:on def receive: Receive = watching(Set(), Map(), SortedMap(), None) @@ -249,7 +255,7 @@ class ZmqWatcher(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit object ZmqWatcher { - def props(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) = Props(new ZmqWatcher(blockCount, client)(ec)) + def props(chainHash: ByteVector32, blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) = Props(new ZmqWatcher(chainHash, blockCount, client)(ec)) case object TickNewBlock diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/BlockchainWatchdog.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/BlockchainWatchdog.scala new file mode 100644 index 0000000000..b3c3596d52 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/BlockchainWatchdog.scala @@ -0,0 +1,80 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.blockchain.watchdogs + +import akka.actor.typed.Behavior +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.Behaviors +import fr.acinq.bitcoin.{BlockHeader, ByteVector32} +import fr.acinq.eclair.blockchain.CurrentBlockCount +import fr.acinq.eclair.blockchain.watchdogs.Monitoring.{Metrics, Tags} + +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.util.Random + +/** + * Created by t-bast on 29/09/2020. + */ + +/** Monitor secondary blockchain sources to detect when we're being eclipsed. */ +object BlockchainWatchdog { + + case class BlockHeaderAt(blockCount: Long, blockHeader: BlockHeader) + + // @formatter:off + sealed trait Command + case class LatestHeaders(currentBlockCount: Long, blockHeaders: Set[BlockHeaderAt], source: String) extends Command + private case class WrappedCurrentBlockCount(currentBlockCount: Long) extends Command + private case class CheckLatestHeaders(currentBlockCount: Long) extends Command + // @formatter:on + + /** + * @param blockCountDelta number of future blocks to try fetching from secondary blockchain sources. + * @param maxRandomDelay to avoid the herd effect whenever a block is created, we add a random delay before we query + * secondary blockchain sources. This parameter specifies the maximum delay we'll allow. + */ + def apply(chainHash: ByteVector32, blockCountDelta: Int, maxRandomDelay: FiniteDuration): Behavior[Command] = { + Behaviors.setup { context => + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockCount](cbc => WrappedCurrentBlockCount(cbc.blockCount))) + Behaviors.withTimers { timers => + Behaviors.receiveMessage { + case WrappedCurrentBlockCount(blockCount) => + val delay = Random.nextInt(maxRandomDelay.toSeconds.toInt).seconds + timers.startSingleTimer(CheckLatestHeaders(blockCount), delay) + Behaviors.same + case CheckLatestHeaders(blockCount) => + context.spawn(HeadersOverDns(chainHash, blockCount, blockCountDelta), HeadersOverDns.Source) ! HeadersOverDns.CheckLatestHeaders(context.self) + context.spawn(BlockstreamInfo(chainHash, blockCount, blockCountDelta), BlockstreamInfo.Source) ! BlockstreamInfo.CheckLatestHeaders(context.self) + Behaviors.same + case LatestHeaders(blockCount, blockHeaders, source) => + val missingBlocks = blockHeaders match { + case h if h.isEmpty => 0 + case _ => blockHeaders.map(_.blockCount).max - blockCount + } + if (missingBlocks >= 6) { + context.log.warn("{}: we are {} blocks late: we may be eclipsed from the bitcoin network", source, missingBlocks) + } else { + context.log.info("{}: we are {} blocks late", source, missingBlocks) + } + Metrics.BitcoinBlocksSkew.withTag(Tags.Source, source).update(missingBlocks) + Behaviors.same + } + } + } + } + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/BlockstreamInfo.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/BlockstreamInfo.scala new file mode 100644 index 0000000000..ad09a672b4 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/BlockstreamInfo.scala @@ -0,0 +1,116 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.blockchain.watchdogs + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ActorRef, Behavior} +import com.softwaremill.sttp.json4s.asJson +import com.softwaremill.sttp.okhttp.OkHttpFutureBackend +import com.softwaremill.sttp.{StatusCodes, SttpBackend, Uri, UriContext, sttp} +import fr.acinq.bitcoin.{Block, BlockHeader, ByteVector32} +import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog.{BlockHeaderAt, LatestHeaders} +import fr.acinq.eclair.blockchain.watchdogs.Monitoring.{Metrics, Tags} +import org.json4s.JsonAST.{JArray, JInt} +import org.json4s.jackson.Serialization +import org.json4s.{DefaultFormats, Serialization} + +import scala.concurrent.duration.DurationInt +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +/** + * Created by t-bast on 30/09/2020. + */ + +/** This actor queries https://blockstream.info/ to fetch block headers. See https://github.com/Blockstream/esplora/blob/master/API.md. */ +object BlockstreamInfo { + + implicit val formats: DefaultFormats = DefaultFormats + implicit val serialization: Serialization = Serialization + implicit val sttpBackend: SttpBackend[Future, Nothing] = OkHttpFutureBackend() + + val Source = "blockstream.info" + + // @formatter:off + sealed trait Command + case class CheckLatestHeaders(replyTo: ActorRef[BlockchainWatchdog.LatestHeaders]) extends Command + private case class WrappedLatestHeaders(replyTo: ActorRef[LatestHeaders], headers: LatestHeaders) extends Command + private case class WrappedFailure(e: Throwable) extends Command + // @formatter:on + + def apply(chainHash: ByteVector32, currentBlockCount: Long, blockCountDelta: Int): Behavior[Command] = { + Behaviors.setup { context => + implicit val executionContext: ExecutionContext = context.executionContext + Behaviors.receiveMessage { + case CheckLatestHeaders(replyTo) => + val uri_opt = chainHash match { + case Block.LivenetGenesisBlock.hash => Some(uri"https://blockstream.info/api") + case Block.TestnetGenesisBlock.hash => Some(uri"https://blockstream.info/testnet/api") + case _ => None + } + uri_opt match { + case Some(uri) => + context.pipeToSelf(getHeaders(uri, currentBlockCount, currentBlockCount + blockCountDelta - 1)) { + case Success(headers) => WrappedLatestHeaders(replyTo, headers) + case Failure(e) => WrappedFailure(e) + } + Behaviors.same + case None => + Behaviors.stopped + } + + case WrappedLatestHeaders(replyTo, headers) => + replyTo ! headers + Behaviors.stopped + + case WrappedFailure(e) => + context.log.error("blockstream.info failed: ", e) + Metrics.WatchdogError.withTag(Tags.Source, Source).increment() + Behaviors.stopped + } + } + } + + private def getHeaders(baseUri: Uri, from: Long, to: Long)(implicit ec: ExecutionContext): Future[LatestHeaders] = { + val chunks = (0 to (to - from).toInt / 10).map(i => getChunk(baseUri, to - 10 * i)) + Future.sequence(chunks).map(headers => LatestHeaders(from, headers.flatten.filter(_.blockCount >= from).toSet, Source)) + } + + /** blockstream.info returns chunks of 10 headers between ]startHeight-10; startHeight] */ + private def getChunk(baseUri: Uri, startHeight: Long)(implicit ec: ExecutionContext): Future[Seq[BlockHeaderAt]] = for { + headers <- sttp.readTimeout(10 seconds).get(baseUri.path(baseUri.path :+ "blocks" :+ startHeight.toString)) + .response(asJson[JArray]) + .send() + .map(r => r.code match { + // HTTP 404 is a "normal" error: we're trying to lookup future blocks that haven't been mined. + case StatusCodes.NotFound => Seq.empty + case _ => r.unsafeBody.arr + }) + .map(blocks => blocks.map(block => { + val JInt(height) = block \ "height" + val JInt(version) = block \ "version" + val JInt(time) = block \ "timestamp" + val JInt(bits) = block \ "bits" + val JInt(nonce) = block \ "nonce" + val previousBlockHash = (block \ "previousblockhash").extractOpt[String].map(ByteVector32.fromValidHex(_).reverse).getOrElse(ByteVector32.Zeroes) + val merkleRoot = (block \ "merkle_root").extractOpt[String].map(ByteVector32.fromValidHex(_).reverse).getOrElse(ByteVector32.Zeroes) + val header = BlockHeader(version.toLong, previousBlockHash, merkleRoot, time.toLong, bits.toLong, nonce.toLong) + BlockHeaderAt(height.toLong, header) + })) + } yield headers + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/HeadersOverDns.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/HeadersOverDns.scala new file mode 100644 index 0000000000..614f32f6c4 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/HeadersOverDns.scala @@ -0,0 +1,138 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.blockchain.watchdogs + +import akka.actor.Status +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter.TypedActorRefOps +import akka.actor.typed.{ActorRef, Behavior} +import akka.io.dns.{AAAARecord, DnsProtocol} +import akka.io.{Dns, IO} +import fr.acinq.bitcoin.{Block, BlockHeader, ByteVector32} +import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog.BlockHeaderAt +import fr.acinq.eclair.blockchain.watchdogs.Monitoring.{Metrics, Tags} +import org.slf4j.Logger +import scodec.bits.BitVector + +/** + * Created by t-bast on 29/09/2020. + */ + +/** This actor queries https://bitcoinheaders.net/ to fetch block headers. */ +object HeadersOverDns { + + val Source = "bitcoinheaders.net" + + // @formatter:off + sealed trait Command + case class CheckLatestHeaders(replyTo: ActorRef[BlockchainWatchdog.LatestHeaders]) extends Command + private case class WrappedDnsResolved(response: DnsProtocol.Resolved) extends Command + private case class WrappedDnsFailed(cause: Throwable) extends Command + // @formatter:on + + def apply(chainHash: ByteVector32, currentBlockCount: Long, blockCountDelta: Int): Behavior[Command] = { + Behaviors.setup { context => + val dnsAdapters = { + context.messageAdapter[DnsProtocol.Resolved](WrappedDnsResolved) + context.messageAdapter[Status.Failure](f => WrappedDnsFailed(f.cause)) + }.toClassic + Behaviors.receiveMessage { + case CheckLatestHeaders(replyTo) => chainHash match { + case Block.LivenetGenesisBlock.hash => + (currentBlockCount until currentBlockCount + blockCountDelta).foreach(blockCount => { + val hostname = s"$blockCount.${blockCount / 10000}.bitcoinheaders.net" + IO(Dns)(context.system.classicSystem).tell(DnsProtocol.resolve(hostname, DnsProtocol.Ip(ipv4 = false, ipv6 = true)), dnsAdapters) + }) + collect(replyTo, currentBlockCount, Set.empty, blockCountDelta) + case _ => + // Headers over DNS is only supported for mainnet. + Behaviors.stopped + } + case _ => Behaviors.unhandled + } + } + } + + private def collect(replyTo: ActorRef[BlockchainWatchdog.LatestHeaders], currentBlockCount: Long, received: Set[BlockHeaderAt], remaining: Int): Behavior[Command] = { + Behaviors.setup { context => + Behaviors.receiveMessage { + case WrappedDnsResolved(response) => + val blockHeader_opt = for { + blockCount <- parseBlockCount(response)(context.log) + blockHeader <- parseBlockHeader(response)(context.log) + } yield BlockHeaderAt(blockCount, blockHeader) + val received1 = blockHeader_opt match { + case Some(blockHeader) => received + blockHeader + case None => + Metrics.WatchdogError.withTag(Tags.Source, Source).increment() + received + } + stopOrCollect(replyTo, currentBlockCount, received1, remaining - 1) + + case WrappedDnsFailed(ex) => + context.log.warn("bitcoinheaders.net failed to resolve: {}", ex) + stopOrCollect(replyTo, currentBlockCount, received, remaining - 1) + + case _ => Behaviors.unhandled + } + } + } + + private def stopOrCollect(replyTo: ActorRef[BlockchainWatchdog.LatestHeaders], currentBlockCount: Long, received: Set[BlockHeaderAt], remaining: Int): Behavior[Command] = remaining match { + case 0 => + replyTo ! BlockchainWatchdog.LatestHeaders(currentBlockCount, received, Source) + Behaviors.stopped + case _ => + collect(replyTo, currentBlockCount, received, remaining) + } + + private def parseBlockCount(response: DnsProtocol.Resolved)(implicit log: Logger): Option[Long] = { + response.name.split('.').headOption match { + case Some(blockCount) => blockCount.toLongOption + case None => + log.error("bitcoinheaders.net response did not contain block count: {}", response) + None + } + } + + private def parseBlockHeader(response: DnsProtocol.Resolved)(implicit log: Logger): Option[BlockHeader] = { + val addresses = response.records.collect { case record: AAAARecord => record.ip.getAddress } + val countOk = addresses.length == 6 + // addresses must be prefixed with 0x2001 + val prefixOk = addresses.forall(_.startsWith(Array(0x20.toByte, 0x01.toByte))) + // the first nibble after the prefix encodes the order since nameservers often reorder responses + val orderOk = addresses.map(a => a(2) & 0xf0).toSet == Set(0x00, 0x10, 0x20, 0x30, 0x40, 0x50) + if (countOk && prefixOk && orderOk) { + val header = addresses.sortBy(a => a(2)).foldLeft(BitVector.empty) { + case (current, address) => + // The first address contains an additional 0x00 prefix + val toDrop = if (current.isEmpty) 28 else 20 + current ++ BitVector(address).drop(toDrop) + }.bytes + header.length match { + case 80 => Some(BlockHeader.read(header.toArray)) + case _ => + log.error("bitcoinheaders.net response did not contain block header (invalid length): {}", response) + None + } + } else { + log.error("invalid response from bitcoinheaders.net: {}", response) + None + } + } + +} \ No newline at end of file diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/Monitoring.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/Monitoring.scala new file mode 100644 index 0000000000..8efb45dde1 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/watchdogs/Monitoring.scala @@ -0,0 +1,36 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.blockchain.watchdogs + +import kamon.Kamon + +/** + * Created by t-bast on 29/09/2020. + */ + +object Monitoring { + + object Metrics { + val BitcoinBlocksSkew = Kamon.gauge("bitcoin.watchdog.blocks.skew", "Number of blocks we're missing compared to other blockchain sources") + val WatchdogError = Kamon.counter("bitcoin.watchdog.error", "Number watchdog errors") + } + + object Tags { + val Source = "source" + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala index b99398e0bf..05b75ee617 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala @@ -107,7 +107,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind test("watch for confirmed transactions") { val probe = TestProbe() val blockCount = new AtomicLong() - val watcher = system.actorOf(ZmqWatcher.props(blockCount, new ExtendedBitcoinClient(bitcoinrpcclient))) + val watcher = system.actorOf(ZmqWatcher.props(randomBytes32, blockCount, new ExtendedBitcoinClient(bitcoinrpcclient))) val (address, _) = getNewAddress(bitcoincli) val tx = sendToAddress(bitcoincli, address, 1.0) @@ -128,7 +128,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind test("watch for spent transactions") { val probe = TestProbe() val blockCount = new AtomicLong() - val watcher = system.actorOf(ZmqWatcher.props(blockCount, new ExtendedBitcoinClient(bitcoinrpcclient))) + val watcher = system.actorOf(ZmqWatcher.props(randomBytes32, blockCount, new ExtendedBitcoinClient(bitcoinrpcclient))) val (address, priv) = getNewAddress(bitcoincli) val tx = sendToAddress(bitcoincli, address, 1.0) val outputIndex = tx.txOut.indexWhere(_.publicKeyScript == Script.write(Script.pay2wpkh(priv.publicKey))) @@ -169,7 +169,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind val blockCount = new AtomicLong() val wallet = new BitcoinCoreWallet(bitcoinrpcclient) val client = new ExtendedBitcoinClient(bitcoinrpcclient) - val watcher = system.actorOf(ZmqWatcher.props(blockCount, client)) + val watcher = system.actorOf(ZmqWatcher.props(randomBytes32, blockCount, client)) // create a chain of transactions that we don't broadcast yet val (_, priv) = getNewAddress(bitcoincli) @@ -204,7 +204,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind val blockCount = new AtomicLong() val wallet = new BitcoinCoreWallet(bitcoinrpcclient) val client = new ExtendedBitcoinClient(bitcoinrpcclient) - val watcher = system.actorOf(ZmqWatcher.props(blockCount, client)) + val watcher = system.actorOf(ZmqWatcher.props(randomBytes32, blockCount, client)) awaitCond(blockCount.get > 0) val (_, priv) = getNewAddress(bitcoincli) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/BlockchainWatchdogSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/BlockchainWatchdogSpec.scala new file mode 100644 index 0000000000..d16cd781fb --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/BlockchainWatchdogSpec.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.blockchain.watchdogs + +import akka.actor.testkit.typed.scaladsl.{LogCapturing, LoggingTestKit, ScalaTestWithActorTestKit} +import akka.actor.typed.eventstream.EventStream +import com.typesafe.config.ConfigFactory +import fr.acinq.bitcoin.Block +import fr.acinq.eclair.blockchain.CurrentBlockCount +import org.scalatest.funsuite.AnyFunSuiteLike + +import scala.concurrent.duration.DurationInt + +class BlockchainWatchdogSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike with LogCapturing { + + test("fetch block headers from DNS on mainnet") { + val watchdog = testKit.spawn(BlockchainWatchdog(Block.LivenetGenesisBlock.hash, 10, 1 second)) + LoggingTestKit.warn("bitcoinheaders.net: we are 9 blocks late: we may be eclipsed from the bitcoin network").expect { + system.eventStream ! EventStream.Publish(CurrentBlockCount(630561)) + } + testKit.stop(watchdog) + } + + test("fetch block headers from blockstream.info on testnet") { + val watchdog = testKit.spawn(BlockchainWatchdog(Block.TestnetGenesisBlock.hash, 16, 1 second)) + LoggingTestKit.warn("blockstream.info: we are 15 blocks late: we may be eclipsed from the bitcoin network").expect { + system.eventStream ! EventStream.Publish(CurrentBlockCount(500000)) + } + testKit.stop(watchdog) + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/BlockstreamInfoSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/BlockstreamInfoSpec.scala new file mode 100644 index 0000000000..32c0d87f34 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/BlockstreamInfoSpec.scala @@ -0,0 +1,77 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.blockchain.watchdogs + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import com.typesafe.config.ConfigFactory +import fr.acinq.bitcoin.{Block, BlockHeader} +import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog.{BlockHeaderAt, LatestHeaders} +import fr.acinq.eclair.blockchain.watchdogs.BlockstreamInfo.CheckLatestHeaders +import org.scalatest.funsuite.AnyFunSuiteLike +import scodec.bits.HexStringSyntax + +class BlockstreamInfoSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { + + test("fetch genesis block header") { + val blockstreamInfo = testKit.spawn(BlockstreamInfo(Block.LivenetGenesisBlock.hash, 0, 1)) + val sender = testKit.createTestProbe[LatestHeaders]() + blockstreamInfo ! CheckLatestHeaders(sender.ref) + sender.expectMessage(LatestHeaders(0, Set(BlockHeaderAt(0, Block.LivenetGenesisBlock.header)), BlockstreamInfo.Source)) + } + + test("fetch first 3 block headers") { + val blockstreamInfo = testKit.spawn(BlockstreamInfo(Block.LivenetGenesisBlock.hash, 0, 3)) + val sender = testKit.createTestProbe[LatestHeaders]() + blockstreamInfo ! CheckLatestHeaders(sender.ref) + val expectedHeaders = Set( + BlockHeaderAt(0, Block.LivenetGenesisBlock.header), + BlockHeaderAt(1, BlockHeader.read(hex"010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e36299".toArray)), + BlockHeaderAt(2, BlockHeader.read(hex"010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd61".toArray)), + ) + sender.expectMessage(LatestHeaders(0, expectedHeaders, BlockstreamInfo.Source)) + } + + test("fetch some block headers") { + val blockstreamInfo = testKit.spawn(BlockstreamInfo(Block.LivenetGenesisBlock.hash, 630450, 13)) + val sender = testKit.createTestProbe[LatestHeaders]() + blockstreamInfo ! CheckLatestHeaders(sender.ref) + val expectedHeaders = Set( + BlockHeaderAt(630450, BlockHeader.read(hex"0000802069ed618b1e0f87ce804e2624a34f507c13322c8f28b3010000000000000000000e370896b5a57abf5426bc51e82a3fcb626a5af32c9eb8147fe9c513efbb1701235abe5e397a111728cf439c".toArray)), + BlockHeaderAt(630451, BlockHeader.read(hex"00e0ff3f45367792598312e4cdbbf1caaeceafe38aef17eed8340d00000000000000000047e447403a779461d013581137f66425c609fac6ed426bdba4f83a35f00786f3db5cbe5e397a11173e155226".toArray)), + BlockHeaderAt(630452, BlockHeader.read(hex"000000200fce94e6ace8c26d3e9bb1fc7e9b85dc9c94ad51d8b00f000000000000000000ca0a5ba5b2cdba07b8926809e4909278c392587e7c8362dd6bd2a6669affff32715ebe5e397a1117efc39f19".toArray)), + BlockHeaderAt(630453, BlockHeader.read(hex"00000020db608b10a44b0b60b556f40775edcfb29d5f39eeeb7f0b000000000000000000312b2dece5824b0cb3153a8ef7a06af70ae68fc2e45a694936cbbd609c747aa56762be5e397a1117fba42eac".toArray)), + BlockHeaderAt(630454, BlockHeader.read(hex"0000c0200951bd9340565493bc25101f4f7cbad1d094614ea8010e000000000000000000fd0377e4c6753830fe345657921aface6159e41a57c09be4a6658ca9e2704ff0c665be5e397a11172d2ee501".toArray)), + BlockHeaderAt(630455, BlockHeader.read(hex"000000204119a86146d81a66ac2670f5f36e0508d1312385f75d0200000000000000000075871a6f838207ac1f55d201d5f4a306cb37e52b2be8006d7ac4cc3114ac6e9aba6abe5e397a11173085c79c".toArray)), + BlockHeaderAt(630456, BlockHeader.read(hex"00000020b87220b7dd743fe4f1ee09d4fd4fd1d70608544ffaf0010000000000000000009de701d6bd397e1be047ccc5fe30ff92f6d67bd71c526f9f67c1413c8a4c65cc5070be5e397a111706c44dd8".toArray)), + BlockHeaderAt(630457, BlockHeader.read(hex"0000002028a1a12e1de0d77e1f58e1d90de97dd52aebcd0a8b570500000000000000000093669292cc29488c77d159fd5f153b66d689c0edaa284f556a587dc77585f4422c76be5e397a11171c47271b".toArray)), + BlockHeaderAt(630458, BlockHeader.read(hex"00000020ff4dc9eaac1ba97f16bd7287a2a05ecb4f2b013cfd640e0000000000000000000a8ada0e68ebb19b68be274938df178a68de89031ff5bfc1c7612e82a83830b18780be5e397a11178e24035e".toArray)), + BlockHeaderAt(630459, BlockHeader.read(hex"0000002030b5d50005fb4b56c1dbe12bc70e951779ab0d07e4ad02000000000000000000a73e505c3943437b0fc732ee69bfda59b48c58709b4543c7b62af5134200684deb8fbe5e397a111745743f34".toArray)), + BlockHeaderAt(630460, BlockHeader.read(hex"000040206e390ff12d1568de2700470ff3417e3ef153c2960df4030000000000000000008638741f5e6870335fb5a129fa22e7dcb295ea8e1daa06fac8a1230c75143c8eee8fbe5e397a11170d01e68e".toArray)), + BlockHeaderAt(630461, BlockHeader.read(hex"0000402090c95977a62b4f32bec3ed16642dbd33512df20d986400000000000000000000a3ab107298a7191e3aa0d49168db2d6798ec173d19522bec32c393cb31762468eb92be5e397a111719708a4b".toArray)), + BlockHeaderAt(630462, BlockHeader.read(hex"00e0ff2705182872bc51f1a9896a72aad2ed3c0404386bd2071c09000000000000000000a2af76bc9352b01b6510dd1520542f2898fe3af97ba7a804ee496a00aa27b38f0e93be5e397a1117eb36157a".toArray)), + ) + sender.expectMessage(LatestHeaders(630450, expectedHeaders, BlockstreamInfo.Source)) + } + + test("fetch future block headers") { + val blockstreamInfo = testKit.spawn(BlockstreamInfo(Block.LivenetGenesisBlock.hash, 60000000, 2)) + val sender = testKit.createTestProbe[LatestHeaders]() + blockstreamInfo ! CheckLatestHeaders(sender.ref) + sender.expectMessage(LatestHeaders(60000000, Set.empty, BlockstreamInfo.Source)) + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/HeadersOverDnsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/HeadersOverDnsSpec.scala new file mode 100644 index 0000000000..51ec302d76 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/watchdogs/HeadersOverDnsSpec.scala @@ -0,0 +1,78 @@ +/* + * Copyright 2020 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.blockchain.watchdogs + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import com.typesafe.config.ConfigFactory +import fr.acinq.bitcoin.{Block, BlockHeader} +import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog.{BlockHeaderAt, LatestHeaders} +import fr.acinq.eclair.blockchain.watchdogs.HeadersOverDns.CheckLatestHeaders +import org.scalatest.funsuite.AnyFunSuiteLike +import scodec.bits.HexStringSyntax + +import scala.concurrent.duration.DurationInt + +class HeadersOverDnsSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { + + test("fetch genesis block header") { + val headersOverDns = testKit.spawn(HeadersOverDns(Block.LivenetGenesisBlock.hash, 0, 1)) + val sender = testKit.createTestProbe[LatestHeaders]() + headersOverDns ! CheckLatestHeaders(sender.ref) + sender.expectMessage(LatestHeaders(0, Set(BlockHeaderAt(0, Block.LivenetGenesisBlock.header)), HeadersOverDns.Source)) + } + + test("fetch first 3 block headers") { + val headersOverDns = testKit.spawn(HeadersOverDns(Block.LivenetGenesisBlock.hash, 0, 3)) + val sender = testKit.createTestProbe[LatestHeaders]() + headersOverDns ! CheckLatestHeaders(sender.ref) + val expectedHeaders = Set( + BlockHeaderAt(0, Block.LivenetGenesisBlock.header), + BlockHeaderAt(1, BlockHeader.read(hex"010000006fe28c0ab6f1b372c1a6a246ae63f74f931e8365e15a089c68d6190000000000982051fd1e4ba744bbbe680e1fee14677ba1a3c3540bf7b1cdb606e857233e0e61bc6649ffff001d01e36299".toArray)), + BlockHeaderAt(2, BlockHeader.read(hex"010000004860eb18bf1b1620e37e9490fc8a427514416fd75159ab86688e9a8300000000d5fdcc541e25de1c7a5addedf24858b8bb665c9f36ef744ee42c316022c90f9bb0bc6649ffff001d08d2bd61".toArray)), + ) + sender.expectMessage(LatestHeaders(0, expectedHeaders, HeadersOverDns.Source)) + } + + test("fetch some block headers") { + val headersOverDns = testKit.spawn(HeadersOverDns(Block.LivenetGenesisBlock.hash, 630450, 5)) + val sender = testKit.createTestProbe[LatestHeaders]() + headersOverDns ! CheckLatestHeaders(sender.ref) + val expectedHeaders = Set( + BlockHeaderAt(630450, BlockHeader.read(hex"0000802069ed618b1e0f87ce804e2624a34f507c13322c8f28b3010000000000000000000e370896b5a57abf5426bc51e82a3fcb626a5af32c9eb8147fe9c513efbb1701235abe5e397a111728cf439c".toArray)), + BlockHeaderAt(630451, BlockHeader.read(hex"00e0ff3f45367792598312e4cdbbf1caaeceafe38aef17eed8340d00000000000000000047e447403a779461d013581137f66425c609fac6ed426bdba4f83a35f00786f3db5cbe5e397a11173e155226".toArray)), + BlockHeaderAt(630452, BlockHeader.read(hex"000000200fce94e6ace8c26d3e9bb1fc7e9b85dc9c94ad51d8b00f000000000000000000ca0a5ba5b2cdba07b8926809e4909278c392587e7c8362dd6bd2a6669affff32715ebe5e397a1117efc39f19".toArray)), + BlockHeaderAt(630453, BlockHeader.read(hex"00000020db608b10a44b0b60b556f40775edcfb29d5f39eeeb7f0b000000000000000000312b2dece5824b0cb3153a8ef7a06af70ae68fc2e45a694936cbbd609c747aa56762be5e397a1117fba42eac".toArray)), + BlockHeaderAt(630454, BlockHeader.read(hex"0000c0200951bd9340565493bc25101f4f7cbad1d094614ea8010e000000000000000000fd0377e4c6753830fe345657921aface6159e41a57c09be4a6658ca9e2704ff0c665be5e397a11172d2ee501".toArray)), + ) + sender.expectMessage(LatestHeaders(630450, expectedHeaders, HeadersOverDns.Source)) + } + + test("fetch future block headers") { + val headersOverDns = testKit.spawn(HeadersOverDns(Block.LivenetGenesisBlock.hash, 60000000, 2)) + val sender = testKit.createTestProbe[LatestHeaders]() + headersOverDns ! CheckLatestHeaders(sender.ref) + sender.expectMessage(LatestHeaders(60000000, Set.empty, HeadersOverDns.Source)) + } + + test("ignore testnet requests") { + val headersOverDns = testKit.spawn(HeadersOverDns(Block.TestnetGenesisBlock.hash, 0, 1)) + val sender = testKit.createTestProbe[LatestHeaders]() + headersOverDns ! CheckLatestHeaders(sender.ref) + sender.expectNoMessage(1 second) + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala index 2ddaf86d5c..d1340d84e5 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/ThroughputSpec.scala @@ -39,7 +39,7 @@ class ThroughputSpec extends AnyFunSuite { implicit val system = ActorSystem("test") val pipe = system.actorOf(Props[Pipe], "pipe") val blockCount = new AtomicLong() - val blockchain = system.actorOf(ZmqWatcher.props(blockCount, new TestBitcoinClient()), "blockchain") + val blockchain = system.actorOf(ZmqWatcher.props(randomBytes32, blockCount, new TestBitcoinClient()), "blockchain") val paymentHandler = system.actorOf(Props(new Actor() { val random = new Random()