Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan4th committed Apr 17, 2024
1 parent 06f89dc commit cf6c052
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 26 deletions.
9 changes: 8 additions & 1 deletion hashsync/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,9 @@ func (c *wireConduit) SendDone() error {
return c.send(&DoneMessage{})
}

func (c *wireConduit) SendQuery(x, y Ordered) error {
func (c *wireConduit) SendQuery(x, y Ordered, fingerprint any, sampleSize int) error {
panic("TBD: incluide fingerprint and sampleSize")
panic("TBD: re-generate scale")
if x == nil && y == nil {
return c.send(&QueryMessage{})
} else if x == nil || y == nil {
Expand All @@ -230,6 +232,11 @@ func (c *wireConduit) SendQuery(x, y Ordered) error {
return c.send(&QueryMessage{RangeX: &xh, RangeY: &yh})
}

func (fc *wireConduit) SendQueryResponse(x, y Ordered, fingerprint any, count, sampleSize int, it Iterator) error {
panic("TBD: implement")
panic("TBD: use lower 32 bits (with better variability) for the sample")
}

func (c *wireConduit) withInitialRequest(toCall func(Conduit) error) ([]byte, error) {
c.initReqBuf = new(bytes.Buffer)
defer func() { c.initReqBuf = nil }()
Expand Down
102 changes: 83 additions & 19 deletions hashsync/rangesync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"errors"
"fmt"
"reflect"
"slices"
"strings"
)

const (
defaultMaxSendRange = 16
defaultItemChunkSize = 16
defaultSampleSize = 200
maxSampleSize = 1000
)

type MessageType byte
Expand All @@ -23,6 +26,7 @@ const (
MessageTypeRangeContents
MessageTypeItemBatch
MessageTypeQuery
MessageTypeQueryResponse
)

var messageTypes = []string{
Expand Down Expand Up @@ -105,9 +109,14 @@ type Conduit interface {
// SendDone sends a message that notifies the peer that sync is finished
SendDone() error
// SendQuery sends a message requesting fingerprint and count of the
// whole range or part of the range. The response will never contain any
// actual data items
SendQuery(x, y Ordered) error
// whole range or part of the range. If fingerprint is provided and
// it doesn't match the fingerprint on the query handler side,
// the handler must send a sample subset of its items for MinHash
// calculation.
SendQuery(x, y Ordered, fingerprint any, sampleSize int) error
// SendQueryResponse sends query response. If 'it' is not nil,
// the corresponding items are included in the sample
SendQueryResponse(x, y Ordered, fingerprint any, count, sampleSize int, it Iterator) error
}

type Option func(r *RangeSetReconciler)
Expand All @@ -124,6 +133,12 @@ func WithItemChunkSize(n int) Option {
}
}

func WithSampleSize(s int) Option {
return func(r *RangeSetReconciler) {
r.sampleSize = s
}
}

// Iterator points to in item in ItemStore
type Iterator interface {
// Equal returns true if this iterator is equal to another Iterator
Expand Down Expand Up @@ -165,13 +180,15 @@ type RangeSetReconciler struct {
is ItemStore
maxSendRange int
itemChunkSize int
sampleSize int
}

func NewRangeSetReconciler(is ItemStore, opts ...Option) *RangeSetReconciler {
rsr := &RangeSetReconciler{
is: is,
maxSendRange: defaultMaxSendRange,
itemChunkSize: defaultItemChunkSize,
sampleSize: defaultSampleSize,
}
for _, opt := range opts {
opt(rsr)
Expand Down Expand Up @@ -268,7 +285,19 @@ func (rsr *RangeSetReconciler) handleMessage(c Conduit, preceding Iterator, msg
}
}
case msg.Type() == MessageTypeQuery:
if err := c.SendFingerprint(x, y, info.Fingerprint, info.Count); err != nil {
sampleSize := msg.Count()
if sampleSize > maxSampleSize {
return nil, false, fmt.Errorf("bad minhash sample size %d (max %d)",
msg.Count(), maxSampleSize)
} else if sampleSize > info.Count {
sampleSize = info.Count
}
it := info.Start
if fingerprintEqual(msg.Fingerprint(), info.Fingerprint) {
// no need to send MinHash items if fingerprints match
it = nil
}
if err := c.SendQueryResponse(x, y, info.Fingerprint, info.Count, sampleSize, it); err != nil {
return nil, false, err
}
return nil, true, nil
Expand Down Expand Up @@ -381,47 +410,82 @@ func (rsr *RangeSetReconciler) getMessages(c Conduit) (msgs []SyncMessage, done
}
}

func (rsr *RangeSetReconciler) InitiateProbe(c Conduit) error {
func (rsr *RangeSetReconciler) InitiateProbe(c Conduit) (RangeInfo, error) {
return rsr.InitiateBoundedProbe(c, nil, nil)
}

func (rsr *RangeSetReconciler) InitiateBoundedProbe(c Conduit, x, y Ordered) error {
if err := c.SendQuery(x, y); err != nil {
return err
func (rsr *RangeSetReconciler) InitiateBoundedProbe(c Conduit, x, y Ordered) (RangeInfo, error) {
info := rsr.is.GetRangeInfo(nil, x, y, -1)
if err := c.SendQuery(x, y, info.Fingerprint, rsr.sampleSize); err != nil {
return RangeInfo{}, err
}
if err := c.SendEndRound(); err != nil {
return err
return RangeInfo{}, err
}
return nil
return info, nil
}

func (rsr *RangeSetReconciler) calcSim(info RangeInfo, items []Ordered, fp any) float64 {
if fingerprintEqual(info.Fingerprint, fp) {
return 1
}
slices.SortFunc(items, func(a, b Ordered) int { return a.Compare(b) })
it := info.Start
n := 0
numEq := 0
for !it.Equal(info.End) && n < len(items) {
c := it.Key().Compare(items[n])
switch {
case c < 0:
it.Next()
case c == 0:
numEq++
it.Next()
n++
default:
n++
}
}
return float64(numEq) / float64(rsr.sampleSize)
}

func (rsr *RangeSetReconciler) HandleProbeResponse(c Conduit) (fp any, count int, err error) {
func (rsr *RangeSetReconciler) HandleProbeResponse(c Conduit, info RangeInfo) (fp any, count int, sim float64, err error) {
gotRange := false
for {
msg, err := c.NextMessage()
switch {
case err != nil:
return nil, 0, err
return nil, 0, 0, err
case msg == nil:
return nil, 0, errors.New("no end round marker")
return nil, 0, 0, errors.New("no end round marker")
default:
switch mt := msg.Type(); mt {
case MessageTypeEndRound:
return nil, 0, errors.New("non-final round in response to a probe")
return nil, 0, 0, errors.New("non-final round in response to a probe")
case MessageTypeDone:
// the peer is not expecting any new messages
return fp, count, nil
case MessageTypeFingerprint:
if !gotRange {
return nil, 0, 0, errors.New("no range info received during probe")
}
return fp, count, sim, nil
case MessageTypeQueryResponse:
if gotRange {
return nil, 0, 0, errors.New("single range message expected")
}
sim = rsr.calcSim(info, msg.Keys(), msg.Fingerprint())
fp = msg.Fingerprint()
count = msg.Count()
fallthrough
gotRange = true
case MessageTypeEmptySet, MessageTypeEmptyRange:
if gotRange {
return nil, 0, errors.New("single range message expected")
return nil, 0, 0, errors.New("single range message expected")
}
if info.Count == 0 {
sim = 1
}
gotRange = true
default:
return nil, 0, fmt.Errorf("unexpected message type: %v", msg.Type())
return nil, 0, 0, fmt.Errorf("unexpected message type: %v", msg.Type())
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions hashsync/rangesync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,16 @@ func (fc *fakeConduit) SendDone() error {
return nil
}

func (fc *fakeConduit) SendQuery(x, y Ordered) error {
fc.sendMsg(MessageTypeQuery, x, y, nil, 0)
func (fc *fakeConduit) SendQuery(x, y Ordered, fingerprint any, sampleSize int) error {
fc.sendMsg(MessageTypeQuery, x, y, fingerprint, sampleSize)
return nil
}

func (fc *fakeConduit) SendQueryResponse(x, y Ordered, fingerprint any, count, sampleSize int, it Iterator) error {
panic("TBD: implement")
panic("TBD: use lower 32 bits (with better variability) for the sample - MinhashSampleItemFromHash32")
}

type dumbStoreIterator struct {
ds *dumbStore
n int
Expand Down
66 changes: 62 additions & 4 deletions hashsync/wire_types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package hashsync

import (
"cmp"

"github.com/spacemeshos/go-spacemesh/common/types"
)

Expand Down Expand Up @@ -101,9 +103,12 @@ type ItemBatchMessage struct {

func (m *ItemBatchMessage) Type() MessageType { return MessageTypeItemBatch }

// QueryMessage requests bounded range fingerprint and count from the peer
// QueryMessage requests bounded range fingerprint and count from the peer,
// along with a minhash sample if fingerprints differ
type QueryMessage struct {
RangeX, RangeY *types.Hash32
RangeX, RangeY *types.Hash32
RangeFingerprint types.Hash12
SampleSize uint32
}

var _ SyncMessage = &QueryMessage{}
Expand All @@ -115,15 +120,68 @@ func (m *QueryMessage) X() Ordered {
}
return *m.RangeX
}

func (m *QueryMessage) Y() Ordered {
if m.RangeY == nil {
return nil
}
return *m.RangeY
}
func (m *QueryMessage) Fingerprint() any { return nil }
func (m *QueryMessage) Count() int { return 0 }
func (m *QueryMessage) Fingerprint() any { return m.RangeFingerprint }
func (m *QueryMessage) Count() int { return int(m.SampleSize) }
func (m *QueryMessage) Keys() []Ordered { return nil }
func (m *QueryMessage) Values() []any { return nil }

// MinhashSampleItem represents an item of minhash sample subset
type MinhashSampleItem uint32

var _ Ordered = MinhashSampleItem(0)

// Compare implements Ordered
func (m MinhashSampleItem) Compare(other any) int {
return cmp.Compare(m, other.(MinhashSampleItem))
}

// MinhashSampleItemFromHash32 uses lower 32 bits of a Hash32 as a MinhashSampleItem
func MinhashSampleItemFromHash32(h types.Hash32) MinhashSampleItem {
return MinhashSampleItem(uint32(h[28])<<24 + uint32(h[29])<<16 + uint32(h[30])<<8 + uint32(h[31]))
}

// QueryResponseMessage is a response to QueryMessage
type QueryResponseMessage struct {
RangeX, RangeY *types.Hash32
RangeFingerprint types.Hash12
SampleSize uint32
// NOTE: max must be in sync with maxSampleSize in hashsync/rangesync.go
Sample []MinhashSampleItem `scale:"max=1000"`
}

var _ SyncMessage = &QueryResponseMessage{}

func (m *QueryResponseMessage) Type() MessageType { return MessageTypeQuery }
func (m *QueryResponseMessage) X() Ordered {
if m.RangeX == nil {
return nil
}
return *m.RangeX
}

func (m *QueryResponseMessage) Y() Ordered {
if m.RangeY == nil {
return nil
}
return *m.RangeY
}
func (m *QueryResponseMessage) Fingerprint() any { return m.RangeFingerprint }
func (m *QueryResponseMessage) Count() int { return int(m.SampleSize) }
func (m *QueryResponseMessage) Values() []any { return nil }

func (m *QueryResponseMessage) Keys() []Ordered {
r := make([]Ordered, len(m.Sample))
for n, item := range m.Sample {
r[n] = item
}
return r
}

// TODO: don't do scalegen for empty types

0 comments on commit cf6c052

Please sign in to comment.