Skip to content

Commit

Permalink
Add blockchain watchdog
Browse files Browse the repository at this point in the history
Introduce a blockchain watchdog that fetches headers from multiple sources
in order to detect whether we're being eclipsed.

Use blockchainheaders.net and blockstream.info as first sources of blockchain
data. More blockchain sources will be added in future commits.
  • Loading branch information
t-bast committed Oct 9, 2020
1 parent ed61750 commit e509818
Show file tree
Hide file tree
Showing 12 changed files with 590 additions and 13 deletions.
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class Setup(datadir: File,
case Bitcoind(bitcoinClient) =>
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqblock"), Some(zmqBlockConnected))), "zmqblock", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(Props(new ZMQActor(config.getString("bitcoind.zmqtx"), Some(zmqTxConnected))), "zmqtx", SupervisorStrategy.Restart))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(blockCount, new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoinClient))), "watcher", SupervisorStrategy.Resume))
system.actorOf(SimpleSupervisor.props(ZmqWatcher.props(nodeParams.chainHash, blockCount, new ExtendedBitcoinClient(new BatchingBitcoinJsonRPCClient(bitcoinClient))), "watcher", SupervisorStrategy.Resume))
case Electrum(electrumClient) =>
zmqBlockConnected.success(Done)
zmqTxConnected.success(Done)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import fr.acinq.bitcoin.{Block, Transaction}
import fr.acinq.eclair.blockchain.fee.FeeratesPerKw

/**
* Created by PM on 24/08/2016.
*/
* Created by PM on 24/08/2016.
*/

trait BlockchainEvent
sealed trait BlockchainEvent

case class NewBlock(block: Block) extends BlockchainEvent

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ package fr.acinq.eclair.blockchain.bitcoind
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicLong

import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps
import akka.actor.{Actor, ActorLogging, Cancellable, Props, Terminated}
import akka.pattern.pipe
import fr.acinq.bitcoin._
import fr.acinq.eclair.KamonExt
import fr.acinq.eclair.blockchain.Monitoring.Metrics
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog
import fr.acinq.eclair.channel.BITCOIN_PARENT_TX_CONFIRMED
import fr.acinq.eclair.transactions.Scripts
import org.json4s.JsonAST.JDecimal
Expand All @@ -42,24 +46,26 @@ import scala.util.Try
* - also uses bitcoin-core rpc api, most notably for tx confirmation count and blockcount (because reorgs)
* Created by PM on 21/02/2016.
*/
class ZmqWatcher(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) extends Actor with ActorLogging {
class ZmqWatcher(chainHash: ByteVector32, blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) extends Actor with ActorLogging {

import ZmqWatcher._

context.system.eventStream.subscribe(self, classOf[NewBlock])
context.system.eventStream.subscribe(self, classOf[NewTransaction])
context.system.eventStream.subscribe(self, classOf[CurrentBlockCount])

private val watchdog = context.spawn(Behaviors.supervise(BlockchainWatchdog(chainHash, 10, 60 seconds)).onFailure(SupervisorStrategy.resume), "blockchain-watchdog")

// this is to initialize block count
self ! TickNewBlock

// @formatter: off
// @formatter:off
private case class TriggerEvent(w: Watch, e: WatchEvent)

private sealed trait AddWatchResult
private case object Keep extends AddWatchResult
private case object Ignore extends AddWatchResult
// @formatter: on
// @formatter:on

def receive: Receive = watching(Set(), Map(), SortedMap(), None)

Expand Down Expand Up @@ -249,7 +255,7 @@ class ZmqWatcher(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit

object ZmqWatcher {

def props(blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) = Props(new ZmqWatcher(blockCount, client)(ec))
def props(chainHash: ByteVector32, blockCount: AtomicLong, client: ExtendedBitcoinClient)(implicit ec: ExecutionContext = ExecutionContext.global) = Props(new ZmqWatcher(chainHash, blockCount, client)(ec))

case object TickNewBlock

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2020 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.blockchain.watchdogs

import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import fr.acinq.bitcoin.{BlockHeader, ByteVector32}
import fr.acinq.eclair.blockchain.CurrentBlockCount
import fr.acinq.eclair.blockchain.watchdogs.Monitoring.{Metrics, Tags}

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.Random

/**
* Created by t-bast on 29/09/2020.
*/

/** Monitor secondary blockchain sources to detect when we're being eclipsed. */
object BlockchainWatchdog {

case class BlockHeaderAt(blockCount: Long, blockHeader: BlockHeader)

// @formatter:off
sealed trait Command
case class LatestHeaders(currentBlockCount: Long, blockHeaders: Set[BlockHeaderAt], source: String) extends Command
private case class WrappedCurrentBlockCount(currentBlockCount: Long) extends Command
private case class CheckLatestHeaders(currentBlockCount: Long) extends Command
// @formatter:on

/**
* @param blockCountDelta number of future blocks to try fetching from secondary blockchain sources.
* @param maxRandomDelay to avoid the herd effect whenever a block is created, we add a random delay before we query
* secondary blockchain sources. This parameter specifies the maximum delay we'll allow.
*/
def apply(chainHash: ByteVector32, blockCountDelta: Int, maxRandomDelay: FiniteDuration): Behavior[Command] = {
Behaviors.setup { context =>
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockCount](cbc => WrappedCurrentBlockCount(cbc.blockCount)))
Behaviors.withTimers { timers =>
Behaviors.receiveMessage {
case WrappedCurrentBlockCount(blockCount) =>
val delay = Random.nextInt(maxRandomDelay.toSeconds.toInt).seconds
timers.startSingleTimer(CheckLatestHeaders(blockCount), delay)
Behaviors.same
case CheckLatestHeaders(blockCount) =>
context.spawn(HeadersOverDns(chainHash, blockCount, blockCountDelta), HeadersOverDns.Source) ! HeadersOverDns.CheckLatestHeaders(context.self)
context.spawn(BlockstreamInfo(chainHash, blockCount, blockCountDelta), BlockstreamInfo.Source) ! BlockstreamInfo.CheckLatestHeaders(context.self)
Behaviors.same
case LatestHeaders(blockCount, blockHeaders, source) =>
val missingBlocks = blockHeaders match {
case h if h.isEmpty => 0
case _ => blockHeaders.map(_.blockCount).max - blockCount
}
if (missingBlocks >= 6) {
context.log.warn("{}: we are {} blocks late: we may be eclipsed from the bitcoin network", source, missingBlocks)
} else {
context.log.info("{}: we are {} blocks late", source, missingBlocks)
}
Metrics.BitcoinBlocksSkew.withTag(Tags.Source, source).update(missingBlocks)
Behaviors.same
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2020 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.blockchain.watchdogs

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import com.softwaremill.sttp.json4s.asJson
import com.softwaremill.sttp.okhttp.OkHttpFutureBackend
import com.softwaremill.sttp.{StatusCodes, SttpBackend, Uri, UriContext, sttp}
import fr.acinq.bitcoin.{Block, BlockHeader, ByteVector32}
import fr.acinq.eclair.blockchain.watchdogs.BlockchainWatchdog.{BlockHeaderAt, LatestHeaders}
import fr.acinq.eclair.blockchain.watchdogs.Monitoring.{Metrics, Tags}
import org.json4s.JsonAST.{JArray, JInt}
import org.json4s.jackson.Serialization
import org.json4s.{DefaultFormats, Serialization}

import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

/**
* Created by t-bast on 30/09/2020.
*/

/** This actor queries https://blockstream.info/ to fetch block headers. See https://github.com/Blockstream/esplora/blob/master/API.md. */
object BlockstreamInfo {

implicit val formats: DefaultFormats = DefaultFormats
implicit val serialization: Serialization = Serialization
implicit val sttpBackend: SttpBackend[Future, Nothing] = OkHttpFutureBackend()

val Source = "blockstream.info"

// @formatter:off
sealed trait Command
case class CheckLatestHeaders(replyTo: ActorRef[BlockchainWatchdog.LatestHeaders]) extends Command
private case class WrappedLatestHeaders(replyTo: ActorRef[LatestHeaders], headers: LatestHeaders) extends Command
private case class WrappedFailure(e: Throwable) extends Command
// @formatter:on

def apply(chainHash: ByteVector32, currentBlockCount: Long, blockCountDelta: Int): Behavior[Command] = {
Behaviors.setup { context =>
implicit val executionContext: ExecutionContext = context.executionContext
Behaviors.receiveMessage {
case CheckLatestHeaders(replyTo) =>
val uri_opt = chainHash match {
case Block.LivenetGenesisBlock.hash => Some(uri"https://blockstream.info/api")
case Block.TestnetGenesisBlock.hash => Some(uri"https://blockstream.info/testnet/api")
case _ => None
}
uri_opt match {
case Some(uri) =>
context.pipeToSelf(getHeaders(uri, currentBlockCount, currentBlockCount + blockCountDelta - 1)) {
case Success(headers) => WrappedLatestHeaders(replyTo, headers)
case Failure(e) => WrappedFailure(e)
}
Behaviors.same
case None =>
Behaviors.stopped
}

case WrappedLatestHeaders(replyTo, headers) =>
replyTo ! headers
Behaviors.stopped

case WrappedFailure(e) =>
context.log.error("blockstream.info failed: ", e)
Metrics.WatchdogError.withTag(Tags.Source, Source).increment()
Behaviors.stopped
}
}
}

private def getHeaders(baseUri: Uri, from: Long, to: Long)(implicit ec: ExecutionContext): Future[LatestHeaders] = {
val chunks = (0 to (to - from).toInt / 10).map(i => getChunk(baseUri, to - 10 * i))
Future.sequence(chunks).map(headers => LatestHeaders(from, headers.flatten.filter(_.blockCount >= from).toSet, Source))
}

/** blockstream.info returns chunks of 10 headers between ]startHeight-10; startHeight] */
private def getChunk(baseUri: Uri, startHeight: Long)(implicit ec: ExecutionContext): Future[Seq[BlockHeaderAt]] = for {
headers <- sttp.readTimeout(10 seconds).get(baseUri.path(baseUri.path :+ "blocks" :+ startHeight.toString))
.response(asJson[JArray])
.send()
.map(r => r.code match {
// HTTP 404 is a "normal" error: we're trying to lookup future blocks that haven't been mined.
case StatusCodes.NotFound => Seq.empty
case _ => r.unsafeBody.arr
})
.map(blocks => blocks.map(block => {
val JInt(height) = block \ "height"
val JInt(version) = block \ "version"
val JInt(time) = block \ "timestamp"
val JInt(bits) = block \ "bits"
val JInt(nonce) = block \ "nonce"
val previousBlockHash = (block \ "previousblockhash").extractOpt[String].map(ByteVector32.fromValidHex(_).reverse).getOrElse(ByteVector32.Zeroes)
val merkleRoot = (block \ "merkle_root").extractOpt[String].map(ByteVector32.fromValidHex(_).reverse).getOrElse(ByteVector32.Zeroes)
val header = BlockHeader(version.toLong, previousBlockHash, merkleRoot, time.toLong, bits.toLong, nonce.toLong)
BlockHeaderAt(height.toLong, header)
}))
} yield headers

}
Loading

0 comments on commit e509818

Please sign in to comment.