Skip to content

Commit

Permalink
Initial full sync impl
Browse files Browse the repository at this point in the history
  • Loading branch information
yglukhov committed Feb 18, 2019
1 parent 0e9cc20 commit 64f5a5b
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 30 deletions.
7 changes: 7 additions & 0 deletions beacon_chain/beacon_chain_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,10 @@ proc getBlock*(db: BeaconChainDB, hash: Eth2Digest): BeaconBlock =
if not db.getBlock(hash, result):
raise newException(Exception, "Block not found")

proc getBlock*(db: BeaconChainDB, slot: uint64, output: var BeaconBlock): bool =
let h = db.backend.get(slotToBlockHashKey(slot))
if h.len == sizeof(Eth2Digest):
var hash: Eth2Digest
hash.data[0 .. ^1] = h
result = db.getBlock(hash, output)

66 changes: 38 additions & 28 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import
chronos, chronicles, confutils, eth/[p2p, keys],
spec/[datatypes, digest, crypto, beaconstate, helpers, validator], conf, time,
state_transition, fork_choice, ssz, beacon_chain_db, validator_pool, extras,
mainchain_monitor, sync_protocol, gossipsub_protocol, trusted_state_snapshots,
mainchain_monitor, gossipsub_protocol, trusted_state_snapshots,
eth/trie/db, eth/trie/backends/rocksdb_backend

type
Expand All @@ -18,7 +18,7 @@ type
mainchainMonitor: MainchainMonitor
lastScheduledEpoch: uint64
headBlock: BeaconBlock
headBlockRoot: Eth2Digest
headBlockRoot*: Eth2Digest
blocksChildren: Table[Eth2Digest, seq[Eth2Digest]]

const
Expand All @@ -30,6 +30,9 @@ const

stateStoragePeriod = EPOCH_LENGTH.uint64 * 10 # Save states once per this number of slots. TODO: Find a good number.

proc processBlock*(node: BeaconNode, newBlock: BeaconBlock)

import sync_protocol

func shortHash(x: auto): string =
($x)[0..7]
Expand Down Expand Up @@ -68,6 +71,9 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): T =
address.udpPort = Port(conf.udpPort)

result.network = newEthereumNode(result.keys, address, 0, nil, clientId, minPeers = 1)
let state = result.network.protocolState(BeaconSync)
state.node = result
state.db = result.db

writeFile(string(conf.dataDir) / "beacon_node.address",
$result.network.listeningAddress)
Expand Down Expand Up @@ -96,30 +102,31 @@ proc sync*(node: BeaconNode): Future[bool] {.async.} =
node.beaconState = await obtainTrustedStateSnapshot(node.db)
else:
node.beaconState = persistedState
var targetSlot = node.beaconState.getSlotFromTime()

let t = now()
if t < node.beaconState.genesisTime * 1000:
await sleepAsync int(node.beaconState.genesisTime * 1000 - t)

# TODO: change this to a full sync / block download
info "Syncing state from remote peers",
finalized_epoch = humaneEpochNum(node.beaconState.finalized_epoch),
target_slot_epoch = humaneEpochNum(targetSlot.slot_to_epoch)

while node.beaconState.finalized_epoch < targetSlot.slot_to_epoch:
var (peer, changeLog) = await node.network.getValidatorChangeLog(
node.beaconState.validator_registry_delta_chain_tip)

if peer == nil:
error "Failed to sync with any peer"
return false

if applyValidatorChangeLog(changeLog, node.beaconState):
node.db.persistState(node.beaconState)
node.db.persistBlock(changeLog.signedBlock)
else:
warn "Ignoring invalid validator change log", sentFrom = peer
await node.fullSync()
# var targetSlot = node.beaconState.getSlotFromTime()
#
# let t = now()
# if t < node.beaconState.genesisTime * 1000:
# await sleepAsync int(node.beaconState.genesisTime * 1000 - t)
#
# # TODO: change this to a full sync / block download
# info "Syncing state from remote peers",
# finalized_epoch = humaneEpochNum(node.beaconState.finalized_epoch),
# target_slot_epoch = humaneEpochNum(targetSlot.slot_to_epoch)
#
# while node.beaconState.finalized_epoch < targetSlot.slot_to_epoch:
# var (peer, changeLog) = await node.network.getValidatorChangeLog(
# node.beaconState.validator_registry_delta_chain_tip)
#
# if peer == nil:
# error "Failed to sync with any peer"
# return false
#
# if applyValidatorChangeLog(changeLog, node.beaconState):
# node.db.persistState(node.beaconState)
# node.db.persistBlock(changeLog.signedBlock)
# else:
# warn "Ignoring invalid validator change log", sentFrom = peer

return true

Expand Down Expand Up @@ -340,8 +347,7 @@ proc stateNeedsSaving(s: BeaconState): bool =
# TODO: Come up with a better predicate logic
s.slot mod stateStoragePeriod == 0

proc processBlocks*(node: BeaconNode) =
node.network.subscribe(topicBeaconBlocks) do (newBlock: BeaconBlock):
proc processBlock*(node: BeaconNode, newBlock: BeaconBlock) =
let stateSlot = node.beaconState.slot
info "Block received", slot = humaneSlotNum(newBlock.slot),
stateRoot = shortHash(newBlock.state_root),
Expand Down Expand Up @@ -386,6 +392,10 @@ proc processBlocks*(node: BeaconNode) =
# 3. Peform block processing / state recalculation / etc
#


proc processBlocks*(node: BeaconNode) =
node.network.subscribe(topicBeaconBlocks) do (newBlock: BeaconBlock):
node.processBlock(newBlock)
let epoch = newBlock.slot.epoch
if epoch != node.lastScheduledEpoch:
node.scheduleEpochActions(epoch)
Expand Down
66 changes: 64 additions & 2 deletions beacon_chain/sync_protocol.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import
options,
chronicles, eth/[rlp, p2p], chronos, ranges/bitranges, eth/p2p/rlpx,
spec/[datatypes, crypto, digest]
spec/[datatypes, crypto, digest],
beacon_node, beacon_chain_db, time

type
ValidatorChangeLogEntry* = object
Expand All @@ -13,8 +14,19 @@ type

ValidatorSet = seq[Validator]

BeaconSyncState* = ref object
node*: BeaconNode
db*: BeaconChainDB

const maxBlocksInRequest = 50

p2pProtocol BeaconSync(version = 1,
shortName = "bcs"):
shortName = "bcs",
networkState = BeaconSyncState):
# proc status(protocolVersion, networkId: int, latestFinalizedRoot: Eth2Digest,
# latestFinalizedEpoch: uint64, bestRoot: Eth2Digest, bestSlot: uint64) =
# discard

requestResponse:
proc getValidatorChangeLog(peer: Peer, changeLogHead: Eth2Digest) =
var bb: BeaconBlock
Expand All @@ -29,11 +41,61 @@ p2pProtocol BeaconSync(version = 1,
removed: openarray[uint32],
order: seq[byte])

requestResponse:
proc getBlocks(peer: Peer, fromHash: Eth2Digest, num: int = 1) =
let step = if num < 0: -1 else: 1
let num = abs(num)
if num > maxBlocksInRequest or num == 0:
# TODO: drop this peer
assert(false)

let db = peer.network.protocolState(BeaconSync).db
var blk: BeaconBlock
var response = newSeqOfCap[BeaconBlock](num)

if db.getBlock(fromHash, blk):
response.add(blk)
var slot = int64(blk.slot)
let targetSlot = slot + step * (num - 1)
while slot != targetSlot:
if slot < 0 or not db.getBlock(uint64(slot), blk):
break
response.add(blk)
slot += step

await peer.blocks(reqId, response)

proc blocks(peer: Peer, blocks: openarray[BeaconBlock])

type
# A bit shorter names for convenience
ChangeLog = BeaconSync.validatorChangeLog
ChangeLogEntry = ValidatorChangeLogEntry

proc applyBlocks(node: BeaconNode, blocks: openarray[BeaconBlock]) =
debug "sync blocks received", count = blocks.len
for b in blocks:
node.processBlock(b)

proc fullSync*(node: BeaconNode) {.async.} =
while true:
let curSlot = node.beaconState.slot
var targetSlot = node.beaconState.getSlotFromTime()
debug "Syncing", curSlot, targetSlot
assert(targetSlot >= curSlot)

let numBlocksToDownload = min(maxBlocksInRequest.uint64, targetSlot - curSlot)
if numBlocksToDownload == 0:
info "Full sync complete"
break

var p = node.network.randomPeerWith(BeaconSync)
assert(not p.isNil)

let blks = await p.getBlocks(node.headBlockRoot, numBlocksToDownload.int)
if blks.isSome:
node.applyBlocks(blks.get.blocks)

func validate*(log: ChangeLog): bool =
# TODO:
# Assert that the number of raised bits in log.order (a.k.a population count)
Expand Down

0 comments on commit 64f5a5b

Please sign in to comment.