Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sync2: add sqlstore #6405

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added dev-docs/multipeer.excalidraw.gz
Binary file not shown.
Binary file added dev-docs/multipeer.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
185 changes: 178 additions & 7 deletions dev-docs/sync2-set-reconciliation.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<!-- markdown-toc start - Don't edit this section. Run M-x markdown-toc-refresh-toc -->
**Table of Contents**

- [Set Reconciliation Protocol (sync2)](#set-reconciliation-protocol-sync2)
- [Pairwise Set Reconciliation Protocol](#pairwise-set-reconciliation-protocol)
- [Basic concepts](#basic-concepts)
- [Simplified set reconciliation example](#simplified-set-reconciliation-example)
- [Attack mitigation](#attack-mitigation)
Expand All @@ -27,18 +27,18 @@
- [Redundant ItemChunk messages](#redundant-itemchunk-messages)
- [Range checksums](#range-checksums)
- [Bloom filters for recent sync](#bloom-filters-for-recent-sync)
- [Multi-peer Reconciliation](#multi-peer-reconciliation)
- [Deciding on the sync strategy](#deciding-on-the-sync-strategy)
- [Split sync](#split-sync)
- [Full sync](#full-sync)

<!-- markdown-toc end -->

# Set Reconciliation Protocol (sync2)
# Pairwise Set Reconciliation Protocol

The recursive set reconciliation protocol described in this document is based on
The recursive set reconciliation protocol described in this section is based on
[Range-Based Set Reconciliation](https://arxiv.org/pdf/2212.13567.pdf) paper by Aljoscha Meyer.

The multi-peer reconciliation approach is loosely based on
[SREP: Out-Of-Band Sync of Transaction Pools for Large-Scale Blockchains](https://people.bu.edu/staro/2023-ICBC-Novak.pdf)
paper by Novak Boškov, Sevval Simsek, Ari Trachtenberg, and David Starobinski.

## Basic concepts

The set reconciliation protocol is intended to synchronize **ordered sets**.
Expand Down Expand Up @@ -774,3 +774,174 @@ as we don't need the sets to be exactly same after the recent sync; we
just want to bring them closer to each other. That being said, a
sufficient size of the Bloom filter needs to be chosen to minimize the
number of missed elements.

# Multi-peer Reconciliation

The multi-peer reconciliation approach is loosely based on
[SREP: Out-Of-Band Sync of Transaction Pools for Large-Scale Blockchains](https://people.bu.edu/staro/2023-ICBC-Novak.pdf)
paper by Novak Boškov, Sevval Simsek, Ari Trachtenberg, and David Starobinski.

![Multi-peer set reconciliation](multipeer.png)

Due to the FPTree data structure being used, the copy operation on a
set is `O(1)`. When synchronizing the local set against the remote
peer's set, we need to make sure the set doesn't change while being
synchronized, except for the recent items being added at the beginning
of the sync. Thus, for the purpose of synchronization against each
remote peer, a separate copy of the original set is made. When new
items are being received during sync with a remote peer, these items
are passed to the fetcher which retrieves the actual data blobs from
the peers, after which the received objects are validated and stored
in the state database. The main set is refreshed from time to time to
include the items that were recently added; this doesn't affect the
derived copies currently in use for sync.

## Deciding on the sync strategy

When picking the peers for the purpose of multi-peer sync, each peer
is [probed](#minhash-based-set-difference-estimation) to determine how
many items it has in its set. The peers with substantially lower
number of items than in the local set (configurable threshold) are not
considered for sync, so as not to place additional load on the peer
which are not fully synced yet, and let them decide on their syncing
strategy on their own. Note that the sync is always bi-directional.

Synchronization against multiple peers can be done in two modes:
1. Split sync involves splitting the whole range into smaller ones,
one smaller range per peer, and limiting the sync to the
corresponding range of IDs when syncing with each peer.
2. Full sync involves syncing the full set against each peer's full
set.

The sync strategy is selected based on the set similarity between the
local peers and the peers that have been chosen for sync, as well as
on the number of items in the remote peer sets. Roughly it can be described using
the following diagram:

```mermaid
stateDiagram-v2
[*] --> Wait
Wait --> ProbePeers : Timeout
ProbePeers --> Wait : No peers / <br/> all probes failed
ProbePeers --> SplitSync : Enough peers for split + <br/> this one is too different + <br> last sync was not split
ProbePeers --> FullSync : Too few peers for split / <br/> this one is similar <br/> enough to peers
SplitSync --> Wait : Sync failed
SplitSync --> FullSync : Sync succeeded
FullSync --> Wait : Sync terminated
```

## Split sync

The split sync approach helps bringing nodes that went substantially
out of sync relatively quickly while also making sure too much load is
not placed on each of the syncing peers. It somewhat resembles
BitTorrent approach where a file is downloaded from multiple peers,
with different pieces being obtained from different peers, even if
this similarity is rather superficial, as the protocol involved is
very different. The split sync is followed by full sync against the
peers, as in some cases, as with ATXs during cycle gaps, the set might
became somewhat "outdated" while the sync was being done. Below is
a diagram describing split sync sequence:

```mermaid
sequenceDiagram
Note over A: Check how different is A <br/> from its peers
par
A ->> B: Probe
B ->> A: Sample <br/> sim=0.95 count=10001
and
A ->> C: Probe
C ->> A: Sample <br/> sim=0.94 count=10002
and
A ->> D: Probe
D ->> A: Sample <br/> sim=0.94 count=10003
and
A ->> E: Probe
E ->> A: Sample <br/> sim=0.96 count=10001
and
A ->> F: Probe
F ->> A: Sample <br/> sim=0.89 count=9000
end
Note over A: Not enough peers close to this one <br/> Enough peers eligible for split sync <br/> Peer F's count is too low <br/> Proceeding with split sync
par
A <<->> B: Sync [0x00..., 0x40...)
and
A <<->> C: Sync [0x40..., 0x80...)
and
A <<->> D: Sync [0x80..., 0xC0...)
and
A <<->> E: Sync [0xC0..., 0x00...)
end
Note over A: Full sync follows split sync <br/> Syncing against peers that are in sync <br/> is very cheap
par
A <<->> B: Sync [0x00..., 0x00...)
and
A <<->> C: Sync [0x00..., 0x00...)
and
A <<->> D: Sync [0x00..., 0x00...)
and
A <<->> E: Sync [0x00..., 0x00...)
end
Note over A: Node A is in sync with the network
```

When some of the peers are too slow, their ranges are additionally
assigned to faster peers that managed to complete their ranges
already. Synchronization against slower peers is not interrupted
though until each range is synced at least once:

```mermaid
sequenceDiagram
par
A <<->> B: Sync [0x00..., 0x40...)
and
A <<->> C: Sync [0x40..., 0x80...)
and
A <<->> D: Sync [0x80..., 0xC0...)
and
A <<->> E: Sync [0xC0..., 0x00...)
and
Note over A: Peer E being too slow
A <<->> E: Sync [0xC0..., 0x00...)
end
```

## Full sync

Full sync is used when this node's set is similar enough to its peers'
sets, or when there's not enough peers for split sync. The full sync
against each peer is more reliable than split sync against the same
peers, so after split sync completes, full sync is always done. The
diagram below illustrates the full sync sequence.

```mermaid
sequenceDiagram
Note over A: Check how different is A <br/> from its peers
par
A ->> B: Probe
B ->> A: Sample <br/> sim=0.999 count=10001
and
A ->> C: Probe
C ->> A: Sample <br/> sim=0.999 count=10002
and
A ->> D: Probe
D ->> A: Sample <br/> sim=0.999 count=10003
and
A ->> E: Probe
E ->> A: Sample <br/> sim=0.999 count=10001
and
A ->> F: Probe
F ->> A: Sample <br/> sim=0.090 count=9000
end
Note over A: Enough peers close to this one <br/> Peer F's count is too low <br/> Proceeding with full sync
par
A <<->> B: Sync [0x00..., 0x00...)
and
A <<->> C: Sync [0x00..., 0x00...)
and
A <<->> D: Sync [0x00..., 0x00...)
and
A <<->> E: Sync [0x00..., 0x00...)
end
Note over A: Node A is in sync with the network
```
4 changes: 2 additions & 2 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func NewFetch(
f.registerServer(host, hashProtocol, h.handleHashReqStream)
f.registerServer(
host, activeSetProtocol,
func(ctx context.Context, msg []byte, s io.ReadWriter) error {
func(ctx context.Context, _ p2p.Peer, msg []byte, s io.ReadWriter) error {
return h.doHandleHashReqStream(ctx, msg, s, datastore.ActiveSet)
})
f.registerServer(host, meshHashProtocol, h.handleMeshHashReqStream)
Expand All @@ -339,7 +339,7 @@ func NewFetch(
f.registerServer(host, hashProtocol, server.WrapHandler(h.handleHashReq))
f.registerServer(
host, activeSetProtocol,
server.WrapHandler(func(ctx context.Context, data []byte) ([]byte, error) {
server.WrapHandler(func(ctx context.Context, _ p2p.Peer, data []byte) ([]byte, error) {
return h.doHandleHashReq(ctx, data, datastore.ActiveSet)
}))
f.registerServer(host, meshHashProtocol, server.WrapHandler(h.handleMeshHashReq))
Expand Down
2 changes: 1 addition & 1 deletion fetch/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func TestFetch_PeerDroppedWhenMessageResultsInValidationReject(t *testing.T) {
require.Len(t, h.GetPeers(), 1)

// This handler returns a ResponseBatch with an empty response that will fail validation on the remote peer
badPeerHandler := func(_ context.Context, data []byte) ([]byte, error) {
badPeerHandler := func(_ context.Context, _ p2p.Peer, data []byte) ([]byte, error) {
var b RequestBatch
codec.Decode(data, &b)

Expand Down
21 changes: 11 additions & 10 deletions fetch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/datastore"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
"github.com/spacemeshos/go-spacemesh/p2p/server"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
Expand Down Expand Up @@ -45,7 +46,7 @@ func newHandler(
}

// handleMaliciousIDsReq returns the IDs of all known malicious nodes.
func (h *handler) handleMaliciousIDsReq(ctx context.Context, _ []byte) ([]byte, error) {
func (h *handler) handleMaliciousIDsReq(ctx context.Context, _ p2p.Peer, _ []byte) ([]byte, error) {
nodes, err := identities.AllMalicious(h.cdb)
if err != nil {
return nil, fmt.Errorf("getting malicious IDs: %w", err)
Expand All @@ -57,7 +58,7 @@ func (h *handler) handleMaliciousIDsReq(ctx context.Context, _ []byte) ([]byte,
return codec.MustEncode(malicious), nil
}

func (h *handler) handleMaliciousIDsReqStream(ctx context.Context, msg []byte, s io.ReadWriter) error {
func (h *handler) handleMaliciousIDsReqStream(ctx context.Context, _ p2p.Peer, msg []byte, s io.ReadWriter) error {
if err := h.streamIDs(ctx, s, func(cbk retrieveCallback) error {
nodeIDs, err := identities.AllMalicious(h.cdb)
if err != nil {
Expand All @@ -75,7 +76,7 @@ func (h *handler) handleMaliciousIDsReqStream(ctx context.Context, msg []byte, s
}

// handleEpochInfoReq returns the ATXs published in the specified epoch.
func (h *handler) handleEpochInfoReq(ctx context.Context, msg []byte) ([]byte, error) {
func (h *handler) handleEpochInfoReq(ctx context.Context, _ p2p.Peer, msg []byte) ([]byte, error) {
var epoch types.EpochID
if err := codec.Decode(msg, &epoch); err != nil {
return nil, err
Expand Down Expand Up @@ -103,7 +104,7 @@ func (h *handler) handleEpochInfoReq(ctx context.Context, msg []byte) ([]byte, e
}

// handleEpochInfoReq streams the ATXs published in the specified epoch.
func (h *handler) handleEpochInfoReqStream(ctx context.Context, msg []byte, s io.ReadWriter) error {
func (h *handler) handleEpochInfoReqStream(ctx context.Context, _ p2p.Peer, msg []byte, s io.ReadWriter) error {
var epoch types.EpochID
if err := codec.Decode(msg, &epoch); err != nil {
return err
Expand Down Expand Up @@ -181,7 +182,7 @@ func (h *handler) streamIDs(ctx context.Context, s io.ReadWriter, retrieve retri
}

// handleLayerDataReq returns all data in a layer, described in LayerData.
func (h *handler) handleLayerDataReq(ctx context.Context, req []byte) ([]byte, error) {
func (h *handler) handleLayerDataReq(ctx context.Context, _ p2p.Peer, req []byte) ([]byte, error) {
var (
lid types.LayerID
ld LayerData
Expand All @@ -202,7 +203,7 @@ func (h *handler) handleLayerDataReq(ctx context.Context, req []byte) ([]byte, e
return out, nil
}

func (h *handler) handleLayerOpinionsReq2(ctx context.Context, data []byte) ([]byte, error) {
func (h *handler) handleLayerOpinionsReq2(ctx context.Context, _ p2p.Peer, data []byte) ([]byte, error) {
var req OpinionRequest
if err := codec.Decode(data, &req); err != nil {
return nil, err
Expand Down Expand Up @@ -257,7 +258,7 @@ func (h *handler) handleCertReq(ctx context.Context, lid types.LayerID, bid type
return nil, err
}

func (h *handler) handleHashReq(ctx context.Context, data []byte) ([]byte, error) {
func (h *handler) handleHashReq(ctx context.Context, _ p2p.Peer, data []byte) ([]byte, error) {
return h.doHandleHashReq(ctx, data, datastore.NoHint)
}

Expand Down Expand Up @@ -327,7 +328,7 @@ func (h *handler) doHandleHashReq(ctx context.Context, data []byte, hint datasto
return bts, nil
}

func (h *handler) handleHashReqStream(ctx context.Context, msg []byte, s io.ReadWriter) error {
func (h *handler) handleHashReqStream(ctx context.Context, _ p2p.Peer, msg []byte, s io.ReadWriter) error {
return h.doHandleHashReqStream(ctx, msg, s, datastore.NoHint)
}

Expand Down Expand Up @@ -416,7 +417,7 @@ func (h *handler) doHandleHashReqStream(
return nil
}

func (h *handler) handleMeshHashReq(ctx context.Context, reqData []byte) ([]byte, error) {
func (h *handler) handleMeshHashReq(ctx context.Context, _ p2p.Peer, reqData []byte) ([]byte, error) {
var (
req MeshHashRequest
hashes []types.Hash32
Expand Down Expand Up @@ -447,7 +448,7 @@ func (h *handler) handleMeshHashReq(ctx context.Context, reqData []byte) ([]byte
return data, nil
}

func (h *handler) handleMeshHashReqStream(ctx context.Context, reqData []byte, s io.ReadWriter) error {
func (h *handler) handleMeshHashReqStream(ctx context.Context, _ p2p.Peer, reqData []byte, s io.ReadWriter) error {
var req MeshHashRequest
if err := codec.Decode(reqData, &req); err != nil {
return fmt.Errorf("%w: decoding request: %w", errBadRequest, err)
Expand Down
Loading
Loading