Skip to content

Commit

Permalink
hashsync: add minhash probing
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan4th committed May 3, 2024
1 parent 06f89dc commit 34a1943
Show file tree
Hide file tree
Showing 7 changed files with 574 additions and 132 deletions.
103 changes: 83 additions & 20 deletions hashsync/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,14 @@ func (c *wireConduit) NextMessage() (SyncMessage, error) {
return nil, err
}
return &m, nil
case MessageTypeQuery:
var m QueryMessage
case MessageTypeProbe:
var m ProbeMessage
if _, err := codec.DecodeFrom(c.stream, &m); err != nil {
return nil, err
}
return &m, nil
case MessageTypeProbeResponse:
var m ProbeResponseMessage
if _, err := codec.DecodeFrom(c.stream, &m); err != nil {
return nil, err
}
Expand All @@ -146,6 +152,7 @@ func (c *wireConduit) NextMessage() (SyncMessage, error) {
}

func (c *wireConduit) send(m sendable) error {
// fmt.Fprintf(os.Stderr, "QQQQQ: send: %s: %#v\n", m.Type(), m)
var stream io.Writer
if c.initReqBuf != nil {
stream = c.initReqBuf
Expand Down Expand Up @@ -219,15 +226,46 @@ func (c *wireConduit) SendDone() error {
return c.send(&DoneMessage{})
}

func (c *wireConduit) SendQuery(x, y Ordered) error {
func (c *wireConduit) SendProbe(x, y Ordered, fingerprint any, sampleSize int) error {
m := &ProbeMessage{
RangeFingerprint: fingerprint.(types.Hash12),
SampleSize: uint32(sampleSize),
}
if x == nil && y == nil {
return c.send(&QueryMessage{})
return c.send(m)
} else if x == nil || y == nil {
panic("BUG: SendQuery: bad range: just one of the bounds is nil")
panic("BUG: SendProbe: bad range: just one of the bounds is nil")
}
xh := x.(types.Hash32)
yh := y.(types.Hash32)
return c.send(&QueryMessage{RangeX: &xh, RangeY: &yh})
m.RangeX = &xh
m.RangeY = &yh
return c.send(m)
}

func (c *wireConduit) SendProbeResponse(x, y Ordered, fingerprint any, count, sampleSize int, it Iterator) error {
m := &ProbeResponseMessage{
RangeFingerprint: fingerprint.(types.Hash12),
NumItems: uint32(count),
Sample: make([]MinhashSampleItem, sampleSize),
}
// fmt.Fprintf(os.Stderr, "QQQQQ: begin sending items\n")
for n := 0; n < sampleSize; n++ {
m.Sample[n] = MinhashSampleItemFromHash32(it.Key().(types.Hash32))
// fmt.Fprintf(os.Stderr, "QQQQQ: m.Sample[%d] = %s\n", n, m.Sample[n])
it.Next()
}
// fmt.Fprintf(os.Stderr, "QQQQQ: end sending items\n")
if x == nil && y == nil {
return c.send(m)
} else if x == nil || y == nil {
panic("BUG: SendProbe: bad range: just one of the bounds is nil")
}
xh := x.(types.Hash32)
yh := y.(types.Hash32)
m.RangeX = &xh
m.RangeY = &yh
return c.send(m)
}

func (c *wireConduit) withInitialRequest(toCall func(Conduit) error) ([]byte, error) {
Expand All @@ -252,6 +290,11 @@ func (c *wireConduit) handleStream(stream io.ReadWriter, rsr *RangeSetReconciler
}
}

// ShortenKey implements Conduit.
func (c *wireConduit) ShortenKey(k Ordered) Ordered {
return MinhashSampleItemFromHash32(k.(types.Hash32))
}

func MakeServerHandler(is ItemStore, opts ...Option) server.StreamHandler {
return func(ctx context.Context, req []byte, stream io.ReadWriter) error {
c := wireConduit{newValue: is.New}
Expand Down Expand Up @@ -299,43 +342,63 @@ func syncStore(ctx context.Context, r requester, peer p2p.Peer, is ItemStore, x,
})
}

func Probe(ctx context.Context, r requester, peer p2p.Peer, opts ...Option) (fp any, count int, err error) {
return boundedProbe(ctx, r, peer, nil, nil, opts)
func Probe(ctx context.Context, r requester, peer p2p.Peer, is ItemStore, opts ...Option) (ProbeResult, error) {
return boundedProbe(ctx, r, peer, is, nil, nil, opts)
}

func BoundedProbe(ctx context.Context, r requester, peer p2p.Peer, x, y types.Hash32, opts ...Option) (fp any, count int, err error) {
return boundedProbe(ctx, r, peer, &x, &y, opts)
func BoundedProbe(
ctx context.Context,
r requester,
peer p2p.Peer,
is ItemStore,
x, y types.Hash32,
opts ...Option,
) (ProbeResult, error) {
return boundedProbe(ctx, r, peer, is, &x, &y, opts)
}

func boundedProbe(ctx context.Context, r requester, peer p2p.Peer, x, y *types.Hash32, opts []Option) (fp any, count int, err error) {
func boundedProbe(
ctx context.Context,
r requester,
peer p2p.Peer,
is ItemStore,
x, y *types.Hash32,
opts []Option,
) (ProbeResult, error) {
var (
err error
initReq []byte
info RangeInfo
pr ProbeResult
)
c := wireConduit{
newValue: func() any { return nil }, // not used
}
rsr := NewRangeSetReconciler(nil, opts...)
// c.rmmePrint = true
var initReq []byte
rsr := NewRangeSetReconciler(is, opts...)
if x == nil {
initReq, err = c.withInitialRequest(func(c Conduit) error {
return rsr.InitiateProbe(c)
info, err = rsr.InitiateProbe(c)
return err
})
} else {
initReq, err = c.withInitialRequest(func(c Conduit) error {
return rsr.InitiateBoundedProbe(c, *x, *y)
info, err = rsr.InitiateBoundedProbe(c, *x, *y)
return err
})
}
if err != nil {
return nil, 0, err
return ProbeResult{}, err
}
err = r.StreamRequest(ctx, peer, initReq, func(ctx context.Context, stream io.ReadWriter) error {
c.stream = stream
var err error
fp, count, err = rsr.HandleProbeResponse(&c)
pr, err = rsr.HandleProbeResponse(&c, info)
return err
})
if err != nil {
return nil, 0, err
return ProbeResult{}, err
}
return fp, count, nil
return pr, nil
}

// TODO: request duration
Expand Down
45 changes: 24 additions & 21 deletions hashsync/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,12 @@ func TestWireConduit(t *testing.T) {
type getRequesterFunc func(name string, handler server.StreamHandler, peers ...requester) (requester, p2p.Peer)

func withClientServer(
storeA, storeB ItemStore,
store ItemStore,
getRequester getRequesterFunc,
opts []Option,
toCall func(ctx context.Context, client requester, srvPeerID p2p.Peer),
) {
srvHandler := MakeServerHandler(storeA, opts...)
srvHandler := MakeServerHandler(store, opts...)
srv, srvPeerID := getRequester("srv", srvHandler)
var eg errgroup.Group
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -383,7 +383,7 @@ func withClientServer(
toCall(ctx, client, srvPeerID)
}

func fakeRequesterGetter(t *testing.T) getRequesterFunc {
func fakeRequesterGetter() getRequesterFunc {
return func(name string, handler server.StreamHandler, peers ...requester) (requester, p2p.Peer) {
pid := p2p.Peer(name)
return newFakeRequester(pid, handler, peers...), pid
Expand Down Expand Up @@ -428,7 +428,7 @@ func testWireSync(t *testing.T, getRequester getRequesterFunc) requester {
var client requester
verifyXORSync(t, cfg, func(storeA, storeB ItemStore, numSpecific int, opts []Option) bool {
withClientServer(
storeA, storeB, getRequester, opts,
storeA, getRequester, opts,
func(ctx context.Context, client requester, srvPeerID p2p.Peer) {
err := SyncStore(ctx, client, srvPeerID, storeB, opts...)
require.NoError(t, err)
Expand All @@ -445,7 +445,7 @@ func testWireSync(t *testing.T, getRequester getRequesterFunc) requester {

func TestWireSync(t *testing.T) {
t.Run("fake requester", func(t *testing.T) {
testWireSync(t, fakeRequesterGetter(t))
testWireSync(t, fakeRequesterGetter())
})
t.Run("p2p", func(t *testing.T) {
testWireSync(t, p2pRequesterGetter(t))
Expand All @@ -455,33 +455,36 @@ func TestWireSync(t *testing.T) {
func testWireProbe(t *testing.T, getRequester getRequesterFunc) requester {
cfg := xorSyncTestConfig{
maxSendRange: 1,
numTestHashes: 32,
minNumSpecificA: 4,
maxNumSpecificA: 4,
minNumSpecificB: 4,
maxNumSpecificB: 4,
numTestHashes: 10000,
minNumSpecificA: 130,
maxNumSpecificA: 130,
minNumSpecificB: 130,
maxNumSpecificB: 130,
}
var client requester
verifyXORSync(t, cfg, func(storeA, storeB ItemStore, numSpecific int, opts []Option) bool {
withClientServer(
storeA, storeB, getRequester, opts,
storeA, getRequester, opts,
func(ctx context.Context, client requester, srvPeerID p2p.Peer) {
minA := storeA.Min().Key()
infoA := storeA.GetRangeInfo(nil, minA, minA, -1)
fpA, countA, err := Probe(ctx, client, srvPeerID, opts...)
prA, err := Probe(ctx, client, srvPeerID, storeB, opts...)
require.NoError(t, err)
require.Equal(t, infoA.Fingerprint, fpA)
require.Equal(t, infoA.Count, countA)
require.Equal(t, infoA.Fingerprint, prA.FP)
require.Equal(t, infoA.Count, prA.Count)
require.InDelta(t, 0.98, prA.Sim, 0.05, "sim")

minA = storeA.Min().Key()
partInfoA := storeA.GetRangeInfo(nil, minA, minA, infoA.Count/2)
x := partInfoA.Start.Key().(types.Hash32)
y := partInfoA.End.Key().(types.Hash32)
// partInfoA = storeA.GetRangeInfo(nil, x, y, -1)
fpA, countA, err = BoundedProbe(ctx, client, srvPeerID, x, y, opts...)
prA, err = BoundedProbe(ctx, client, srvPeerID, storeB, x, y, opts...)
require.NoError(t, err)
require.Equal(t, partInfoA.Fingerprint, fpA)
require.Equal(t, partInfoA.Count, countA)
require.Equal(t, partInfoA.Fingerprint, prA.FP)
require.Equal(t, partInfoA.Count, prA.Count)
require.InDelta(t, 0.98, prA.Sim, 0.1, "sim")
// QQQQQ: TBD: check prA.Sim and prB.Sim values
})
return false
})
Expand All @@ -490,11 +493,11 @@ func testWireProbe(t *testing.T, getRequester getRequesterFunc) requester {

func TestWireProbe(t *testing.T) {
t.Run("fake requester", func(t *testing.T) {
testWireProbe(t, fakeRequesterGetter(t))
})
t.Run("p2p", func(t *testing.T) {
testWireProbe(t, p2pRequesterGetter(t))
testWireProbe(t, fakeRequesterGetter())
})
// t.Run("p2p", func(t *testing.T) {
// testWireProbe(t, p2pRequesterGetter(t))
// })
}

// TODO: test bounded sync
Expand Down
Loading

0 comments on commit 34a1943

Please sign in to comment.