Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial full sync impl #117

Merged
merged 3 commits into from
Mar 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions beacon_chain/beacon_chain_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type
kHashToBlock
kHeadBlock # Pointer to the most recent block seen
kTailBlock # Pointer to the earliest finalized block
kSlotToBlockRoots

func subkey(kind: DbKeyKind): array[1, byte] =
result[0] = byte ord(kind)
Expand All @@ -22,6 +23,10 @@ func subkey[N: static int](kind: DbKeyKind, key: array[N, byte]):
result[0] = byte ord(kind)
result[1 .. ^1] = key

func subkey(kind: DbKeyKind, key: uint64): array[sizeof(key) + 1, byte] =
result[0] = byte ord(kind)
copyMem(addr result[1], unsafeAddr key, sizeof(key))

func subkey(kind: type BeaconState, key: Eth2Digest): auto =
subkey(kHashToState, key.data)

Expand All @@ -32,6 +37,26 @@ proc init*(T: type BeaconChainDB, backend: TrieDatabaseRef): BeaconChainDB =
new result
result.backend = backend

proc toSeq(v: openarray[byte], ofType: type): seq[ofType] =
if v.len != 0:
assert(v.len mod sizeof(ofType) == 0)
let sz = v.len div sizeof(ofType)
result = newSeq[ofType](sz)
copyMem(addr result[0], unsafeAddr v[0], v.len)

proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) =
let slotKey = subkey(kSlotToBlockRoots, value.slot)
var blockRootsBytes = db.backend.get(slotKey)
var blockRoots = blockRootsBytes.toSeq(Eth2Digest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically could just use ssz here no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Just... why?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for simplicity so we just have one serialization format in db (and reuse the code for flattening a seq)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I don't have a strong opinion, but a few reasons I like my approach more:

  • The serialization format is localized to the beacon_db, so not a lot to keep in mind
  • My format is pretty much memcpy which can easily be optimized to the least possible amount of allocations, thus more efficient.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was more curious than anything.. I wouldn't have bothered with coming up with a special serialization for it (and the additional unsafe code that comes with). I get the feeling the real optimization here will be to drop rocksdb at some point.

if key notin blockRoots:
db.backend.put(subkey(type value, key), ssz.serialize(value))
blockRootsBytes.setLen(blockRootsBytes.len + sizeof(key))
copyMem(addr blockRootsBytes[^sizeof(key)], unsafeAddr key, sizeof(key))
db.backend.put(slotKey, blockRootsBytes)

proc putHead*(db: BeaconChainDB, key: Eth2Digest) =
db.backend.put(subkey(kHeadBlock), key.data) # TODO head block?

proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) =
# TODO: prune old states
# TODO: it might be necessary to introduce the concept of a "last finalized
Expand All @@ -50,9 +75,6 @@ proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) =
proc putState*(db: BeaconChainDB, value: BeaconState) =
db.putState(hash_tree_root_final(value), value)

proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) =
db.backend.put(subkey(type value, key), ssz.serialize(value))

proc putBlock*(db: BeaconChainDB, value: BeaconBlock) =
db.putBlock(hash_tree_root_final(value), value)

Expand Down Expand Up @@ -81,6 +103,9 @@ proc getHeadBlock*(db: BeaconChainDB): Option[Eth2Digest] =
proc getTailBlock*(db: BeaconChainDB): Option[Eth2Digest] =
db.get(subkey(kTailBlock), Eth2Digest)

proc getBlockRootsForSlot*(db: BeaconChainDB, slot: uint64): seq[Eth2Digest] =
db.backend.get(subkey(kSlotToBlockRoots, slot)).toSeq(Eth2Digest)

proc containsBlock*(
db: BeaconChainDB, key: Eth2Digest): bool =
db.backend.contains(subkey(BeaconBlock, key))
Expand Down
13 changes: 11 additions & 2 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import
spec/[datatypes, digest, crypto, beaconstate, helpers, validator], conf, time,
state_transition, fork_choice, ssz, beacon_chain_db, validator_pool, extras,
attestation_pool, block_pool,
mainchain_monitor, sync_protocol, gossipsub_protocol, trusted_state_snapshots,
mainchain_monitor, gossipsub_protocol, trusted_state_snapshots,
eth/trie/db, eth/trie/backends/rocksdb_backend

type
Expand All @@ -15,7 +15,7 @@ type
keys*: KeyPair
attachedValidators: ValidatorPool
blockPool: BlockPool
state: StateData
state*: StateData
attestationPool: AttestationPool
mainchainMonitor: MainchainMonitor
potentialHeads: seq[Eth2Digest]
Expand All @@ -28,6 +28,12 @@ const
topicAttestations = "ethereum/2.1/beacon_chain/attestations"
topicfetchBlocks = "ethereum/2.1/beacon_chain/fetch"


proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}

import sync_protocol


func shortValidatorKey(node: BeaconNode, validatorIdx: int): string =
($node.state.data.validator_registry[validatorIdx].pubkey)[0..7]

Expand Down Expand Up @@ -86,6 +92,9 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): T =

result.network =
newEthereumNode(result.keys, address, 0, nil, clientId, minPeers = 1)
let state = result.network.protocolState(BeaconSync)
state.node = result
state.db = result.db

let
head = result.blockPool.get(result.db.getHeadBlock().get())
Expand Down
7 changes: 5 additions & 2 deletions beacon_chain/spec/crypto.nim
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,18 @@ proc readValue*(reader: var JsonReader, value: var ValidatorPrivKey) {.inline.}

proc newPrivKey*(): ValidatorPrivKey = SigKey.random()

# RLP serialization (TODO: remove if no longer necessary)
proc append*(writer: var RlpWriter, value: ValidatorPubKey) =
writer.append value.getBytes()

proc read*(rlp: var Rlp, T: type ValidatorPubKey): T {.inline.} =
ValidatorPubKey.init rlp.toBytes.toOpenArray
result = ValidatorPubKey.init(rlp.toBytes.toOpenArray)
rlp.skipElem()

proc append*(writer: var RlpWriter, value: ValidatorSig) =
writer.append value.getBytes()

proc read*(rlp: var Rlp, T: type ValidatorSig): T {.inline.} =
ValidatorSig.init rlp.toBytes.toOpenArray
result = ValidatorSig.init(rlp.toBytes.toOpenArray)
rlp.skipElem()

12 changes: 12 additions & 0 deletions beacon_chain/spec/datatypes.nim
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,18 @@ type

body*: BeaconBlockBody

BeaconBlockHeader* = object
## Same as BeaconBlock, except `body` is the `hash_tree_root` of the
## associated BeaconBlockBody.
# TODO: Dry it up with BeaconBlock
slot*: uint64
parent_root*: Eth2Digest
state_root*: Eth2Digest
randao_reveal*: ValidatorSig
eth1_data*: Eth1Data
signature*: ValidatorSig
body*: Eth2Digest

# https://github.com/ethereum/eth2.0-specs/blob/v0.3.0/specs/core/0_beacon-chain.md#beaconblockbody
BeaconBlockBody* = object
proposer_slashings*: seq[ProposerSlashing]
Expand Down
133 changes: 130 additions & 3 deletions beacon_chain/sync_protocol.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import
options,
options, tables,
chronicles, eth/[rlp, p2p], chronos, ranges/bitranges, eth/p2p/rlpx,
spec/[datatypes, crypto, digest]
spec/[datatypes, crypto, digest],
beacon_node, beacon_chain_db, time, ssz

type
ValidatorChangeLogEntry* = object
Expand All @@ -13,8 +14,134 @@ type

ValidatorSet = seq[Validator]

BeaconSyncState* = ref object
node*: BeaconNode
db*: BeaconChainDB

func toHeader(b: BeaconBlock): BeaconBlockHeader =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for not using construction syntax? ie toHeader(b): .. = BeaconBlockHeader(slot: b.slot, ...)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason really. Somehow I keep forgetting about this option :)

BeaconBlockHeader(
slot: b.slot,
parent_root: b.parent_root,
state_root: b.state_root,
randao_reveal: b.randao_reveal,
eth1_data : b.eth1_data,
signature: b.signature,
body: hash_tree_root_final(b.body)
)

proc fromHeader(b: var BeaconBlock, h: BeaconBlockHeader) =
b.slot = h.slot
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

humm, this looks a bit dangerous in that it lets BeaconBlock.data keep its old value.. b = BeaconBlock(...)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or ideally take both a body and a header, so it's impossible to forget to set data..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will do.

b.parent_root = h.parent_root
b.state_root = h.state_root
b.randao_reveal = h.randao_reveal
b.eth1_data = h.eth1_data
b.signature = h.signature

proc importBlocks(node: BeaconNode, roots: openarray[(Eth2Digest, uint64)], headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]) =
var bodyMap = initTable[Eth2Digest, int]()

for i, b in bodies:
bodyMap[hash_tree_root_final(b)] = i

var goodBlocks, badBlocks = 0
for h in headers:
let iBody = bodyMap.getOrDefault(h.body, -1)
if iBody >= 0:
var blk: BeaconBlock
blk.fromHeader(h)
blk.body = bodies[iBody]
node.onBeaconBlock(blk)
inc goodBlocks
else:
inc badBlocks

info "Forward sync imported blocks", goodBlocks, badBlocks, headers = headers.len, bodies = bodies.len, roots = roots.len


p2pProtocol BeaconSync(version = 1,
shortName = "bcs"):
shortName = "bcs",
networkState = BeaconSyncState):

onPeerConnected do(peer: Peer):
const
protocolVersion = 1 # TODO: Spec doesn't specify this yet
networkId = 1
let node = peer.networkState.node

var
latestFinalizedRoot: Eth2Digest # TODO
latestFinalizedEpoch: uint64 = node.state.data.finalized_epoch
bestRoot: Eth2Digest # TODO
bestSlot: uint64 = node.state.data.slot

await peer.status(protocolVersion, networkId, latestFinalizedRoot, latestFinalizedEpoch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the new handshake proc that gets a number of tricky details right, but I'll fix it myself in a later commit.

bestRoot, bestSlot)

let m = await peer.nextMsg(BeaconSync.status)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens here if peer sends something different, or doesn't send anything at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different dispatchers will be called (e.g. getBeaconBlockHeaders, getBeaconBlockBodies, etc), while this coroutine will just hang forever, until the peer is disconnected, and nextMsg eventually raises.

let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot))
if bestDiff == 0:
# Nothing to do?
trace "Nothing to sync", peer = peer.node
else:
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
# connection if it's too big.
let db = peer.networkState.db

if bestDiff > 0:
# Send roots
# TODO: Currently we send all block roots in one "packet". Maybe
# they should be split to multiple packets.
type Root = (Eth2Digest, uint64)
var roots = newSeqOfCap[Root](128)
for i in m.bestSlot .. bestSlot:
for r in db.getBlockRootsForSlot(i):
roots.add((r, i))

await peer.beaconBlockRoots(roots)
else:
# Receive roots
let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots)
let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0)
var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len)
for r in roots.roots:
bodiesRequest.add(r[0])
let bodies = await peer.getBeaconBlockBodies(bodiesRequest)
node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies)

proc status(peer: Peer, protocolVersion, networkId: int, latestFinalizedRoot: Eth2Digest,
latestFinalizedEpoch: uint64, bestRoot: Eth2Digest, bestSlot: uint64)

proc beaconBlockRoots(peer: Peer, roots: openarray[(Eth2Digest, uint64)])

requestResponse:
proc getBeaconBlockHeaders(peer: Peer, blockRoot: Eth2Digest, slot: uint64, maxHeaders: int, skipSlots: int) =
# TODO: validate maxHeaders
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO and implement skipSlots

var s = slot
var headers = newSeqOfCap[BeaconBlockHeader](maxHeaders)
let db = peer.networkState.db
while headers.len < maxHeaders:
let blkRoots = db.getBlockRootsForSlot(s)
for r in blkRoots:
headers.add(db.getBlock(r).get().toHeader)
if headers.len == maxHeaders: break
inc s
await peer.beaconBlockHeaders(reqId, headers)

proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeader])

requestResponse:
proc getBeaconBlockBodies(peer: Peer, blockRoots: openarray[Eth2Digest]) =
# TODO: Validate blockRoots.len
var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len)
let db = peer.networkState.db
for r in blockRoots:
if (let blk = db.getBlock(r); blk.isSome):
bodies.add(blk.get().body)
await peer.beaconBlockBodies(reqId, bodies)

proc beaconBlockBodies(peer: Peer, blockBodies: openarray[BeaconBlockBody])


requestResponse:
proc getValidatorChangeLog(peer: Peer, changeLogHead: Eth2Digest) =
var bb: BeaconBlock
Expand Down