Skip to content

Commit

Permalink
Clean up DHT test helpers (#928)
Browse files Browse the repository at this point in the history
* Implement GetValue

* Add failing TestGetValueOnePeer test

* Unexport methods
  • Loading branch information
iand authored Sep 19, 2023
1 parent e86381e commit 83329a4
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 163 deletions.
15 changes: 4 additions & 11 deletions v2/dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,9 @@ func TestNew(t *testing.T) {
func TestAddAddresses(t *testing.T) {
ctx := kadtest.CtxShort(t)

localCfg := DefaultConfig()
rn := coord.NewBufferedRoutingNotifier()
localCfg.Kademlia.RoutingNotifier = rn

local := newClientDht(t, localCfg)

remote := newServerDht(t, nil)

// Populate entries in remote's routing table so it passes a connectivity check
fillRoutingTable(t, remote, 1)
top := NewTopology(t)
local := top.AddClient(nil)
remote := top.AddServer(nil)

// local routing table should not contain the node
_, err := local.kad.GetNode(ctx, kadt.PeerID(remote.host.ID()))
Expand All @@ -103,7 +96,7 @@ func TestAddAddresses(t *testing.T) {
require.NoError(t, err)

// the include state machine runs in the background and eventually should add the node to routing table
_, err = rn.Expect(ctx, &coord.EventRoutingUpdated{})
_, err = top.ExpectRoutingUpdated(ctx, local, remote.host.ID())
require.NoError(t, err)

// the routing table should now contain the node
Expand Down
13 changes: 4 additions & 9 deletions v2/notifee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p-kad-dht/v2/coord"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
"github.com/libp2p/go-libp2p/core/network"

"github.com/libp2p/go-libp2p/core/event"
Expand Down Expand Up @@ -74,12 +72,9 @@ func TestDHT_consumeNetworkEvents_onEvtLocalReachabilityChanged(t *testing.T) {
func TestDHT_consumeNetworkEvents_onEvtPeerIdentificationCompleted(t *testing.T) {
ctx := kadtest.CtxShort(t)

cfg1 := DefaultConfig()
rn1 := coord.NewBufferedRoutingNotifier()
cfg1.Kademlia.RoutingNotifier = rn1
d1 := newServerDht(t, cfg1)

d2 := newServerDht(t, nil)
top := NewTopology(t)
d1 := top.AddServer(nil)
d2 := top.AddServer(nil)

// make sure d1 has the address of d2 in its peerstore
d1.host.Peerstore().AddAddrs(d2.host.ID(), d2.host.Addrs(), time.Minute)
Expand All @@ -89,6 +84,6 @@ func TestDHT_consumeNetworkEvents_onEvtPeerIdentificationCompleted(t *testing.T)
Peer: d2.host.ID(),
})

_, err := rn1.ExpectRoutingUpdated(ctx, kadt.PeerID(d2.host.ID()))
_, err := top.ExpectRoutingUpdated(ctx, d1, d2.host.ID())
require.NoError(t, err)
}
142 changes: 12 additions & 130 deletions v2/query_test.go
Original file line number Diff line number Diff line change
@@ -1,135 +1,24 @@
package dht

import (
"context"
"testing"
"time"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"

"github.com/libp2p/go-libp2p-kad-dht/v2/coord"
"github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest"
"github.com/libp2p/go-libp2p-kad-dht/v2/kadt"
)

func newServerHost(t testing.TB) host.Host {
listenAddr := libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0")

h, err := libp2p.New(listenAddr)
require.NoError(t, err)

t.Cleanup(func() {
if err = h.Close(); err != nil {
t.Logf("unexpected error when closing host: %s", err)
}
})

return h
}

func newClientHost(t testing.TB) host.Host {
h, err := libp2p.New(libp2p.NoListenAddrs)
require.NoError(t, err)

t.Cleanup(func() {
if err = h.Close(); err != nil {
t.Logf("unexpected error when closing host: %s", err)
}
})

return h
}

func newServerDht(t testing.TB, cfg *Config) *DHT {
h := newServerHost(t)

var err error
if cfg == nil {
cfg = DefaultConfig()
}
cfg.Mode = ModeOptServer

d, err := New(h, cfg)
require.NoError(t, err)

// add at least 1 entry in the routing table so the server will pass connectivity checks
fillRoutingTable(t, d, 1)
require.NotEmpty(t, d.rt.NearestNodes(kadt.PeerID(d.host.ID()).Key(), 1))

t.Cleanup(func() {
if err = d.Close(); err != nil {
t.Logf("unexpected error when closing dht: %s", err)
}
})
return d
}

func newClientDht(t testing.TB, cfg *Config) *DHT {
h := newClientHost(t)

var err error
if cfg == nil {
cfg = DefaultConfig()
}
cfg.Mode = ModeOptClient
d, err := New(h, cfg)
require.NoError(t, err)

t.Cleanup(func() {
if err = d.Close(); err != nil {
t.Logf("unexpected error when closing dht: %s", err)
}
})
return d
}

func connect(t *testing.T, ctx context.Context, a, b *DHT, arn *coord.BufferedRoutingNotifier) {
t.Helper()

remoteAddrInfo := peer.AddrInfo{
ID: b.host.ID(),
Addrs: b.host.Addrs(),
}

// Add b's addresss to a
err := a.AddAddresses(ctx, []peer.AddrInfo{remoteAddrInfo}, time.Minute)
require.NoError(t, err)

// the include state machine runs in the background for a and eventually should add the node to routing table
_, err = arn.ExpectRoutingUpdated(ctx, kadt.PeerID(b.host.ID()))
require.NoError(t, err)

// the routing table should now contain the node
_, err = a.kad.GetNode(ctx, kadt.PeerID(b.host.ID()))
require.NoError(t, err)
}

func TestRTAdditionOnSuccessfulQuery(t *testing.T) {
ctx := kadtest.CtxShort(t)

// create dhts and associated routing notifiers so we can inspect routing events
cfg1 := DefaultConfig()
rn1 := coord.NewBufferedRoutingNotifier()
cfg1.Kademlia.RoutingNotifier = rn1
d1 := newServerDht(t, cfg1)

cfg2 := DefaultConfig()
rn2 := coord.NewBufferedRoutingNotifier()
cfg2.Kademlia.RoutingNotifier = rn2
d2 := newServerDht(t, cfg2)
top := NewTopology(t)
d1 := top.AddServer(nil)
d2 := top.AddServer(nil)
d3 := top.AddServer(nil)

cfg3 := DefaultConfig()
rn3 := coord.NewBufferedRoutingNotifier()
cfg3.Kademlia.RoutingNotifier = rn3
d3 := newServerDht(t, cfg3)

connect(t, ctx, d1, d2, rn1)
connect(t, ctx, d2, d1, rn2)
connect(t, ctx, d2, d3, rn2)
connect(t, ctx, d3, d2, rn3)
top.ConnectChain(ctx, d1, d2, d3)

// d3 does not know about d1
_, err := d3.kad.GetNode(ctx, kadt.PeerID(d1.host.ID()))
Expand All @@ -144,15 +33,15 @@ func TestRTAdditionOnSuccessfulQuery(t *testing.T) {
// ignore the error

// d3 should update its routing table to include d1 during the query
_, err = rn3.ExpectRoutingUpdated(ctx, kadt.PeerID(d1.host.ID()))
_, err = top.ExpectRoutingUpdated(ctx, d3, d1.host.ID())
require.NoError(t, err)

// d3 now has d1 in its routing table
_, err = d3.kad.GetNode(ctx, kadt.PeerID(d1.host.ID()))
require.NoError(t, err)

// d1 should update its routing table to include d3 during the query
_, err = rn1.ExpectRoutingUpdated(ctx, kadt.PeerID(d3.host.ID()))
_, err = top.ExpectRoutingUpdated(ctx, d1, d3.host.ID())
require.NoError(t, err)

// d1 now has d3 in its routing table
Expand All @@ -163,18 +52,11 @@ func TestRTAdditionOnSuccessfulQuery(t *testing.T) {
func TestRTEvictionOnFailedQuery(t *testing.T) {
ctx := kadtest.CtxShort(t)

cfg1 := DefaultConfig()
rn1 := coord.NewBufferedRoutingNotifier()
cfg1.Kademlia.RoutingNotifier = rn1
d1 := newServerDht(t, cfg1)

cfg2 := DefaultConfig()
rn2 := coord.NewBufferedRoutingNotifier()
cfg2.Kademlia.RoutingNotifier = rn2
d2 := newServerDht(t, cfg2)
top := NewTopology(t)
d1 := top.AddServer(nil)
d2 := top.AddServer(nil)

connect(t, ctx, d1, d2, rn1)
connect(t, ctx, d2, d1, rn2)
top.Connect(ctx, d1, d2)

// close both hosts so query fails
require.NoError(t, d1.host.Close())
Expand All @@ -195,6 +77,6 @@ func TestRTEvictionOnFailedQuery(t *testing.T) {
_, _ = d1.FindPeer(ctx, "test")

// d1 should update its routing table to remove d2 because of the failure
_, err = rn1.ExpectRoutingRemoved(ctx, kadt.PeerID(d2.host.ID()))
_, err = top.ExpectRoutingRemoved(ctx, d1, d2.host.ID())
require.NoError(t, err)
}
48 changes: 35 additions & 13 deletions v2/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,18 @@ func (d *DHT) PutValue(ctx context.Context, key string, value []byte, option ...
ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValue")
defer span.End()

if err := d.putValueLocal(ctx, key, value); err != nil {
return fmt.Errorf("put value locally: %w", err)
}

panic("implement me")
}

// putValueLocal stores a value in the local datastore without querying the network.
func (d *DHT) putValueLocal(ctx context.Context, key string, value []byte) error {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValueLocal")
defer span.End()

ns, path, err := record.SplitKey(key)
if err != nil {
return fmt.Errorf("splitting key: %w", err)
Expand All @@ -131,14 +143,29 @@ func (d *DHT) PutValue(ctx context.Context, key string, value []byte, option ...
return fmt.Errorf("store record locally: %w", err)
}

// TODO reach out to Zikade
panic("implement me")
return nil
}

func (d *DHT) GetValue(ctx context.Context, key string, option ...routing.Option) ([]byte, error) {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.GetValue")
defer span.End()

v, err := d.getValueLocal(ctx, key)
if err != nil {
return v, nil
}
if !errors.Is(err, ds.ErrNotFound) {
return nil, fmt.Errorf("put value locally: %w", err)
}

panic("implement me")
}

// getValueLocal retrieves a value from the local datastore without querying the network.
func (d *DHT) getValueLocal(ctx context.Context, key string) ([]byte, error) {
ctx, span := d.tele.Tracer.Start(ctx, "DHT.GetValueLocal")
defer span.End()

ns, path, err := record.SplitKey(key)
if err != nil {
return nil, fmt.Errorf("splitting key: %w", err)
Expand All @@ -151,19 +178,14 @@ func (d *DHT) GetValue(ctx context.Context, key string, option ...routing.Option

val, err := b.Fetch(ctx, path)
if err != nil {
if !errors.Is(err, ds.ErrNotFound) {
return nil, fmt.Errorf("fetch value locally: %w", err)
}
} else {
rec, ok := val.(*recpb.Record)
if !ok {
return nil, fmt.Errorf("expected *recpb.Record from backend, got: %T", val)
}
return rec.GetValue(), nil
return nil, fmt.Errorf("fetch from backend: %w", err)
}

// TODO reach out to Zikade
panic("implement me")
rec, ok := val.(*recpb.Record)
if !ok {
return nil, fmt.Errorf("expected *recpb.Record from backend, got: %T", val)
}
return rec.GetValue(), nil
}

func (d *DHT) SearchValue(ctx context.Context, s string, option ...routing.Option) (<-chan []byte, error) {
Expand Down
67 changes: 67 additions & 0 deletions v2/routing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package dht

import (
"fmt"
"testing"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"

"github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest"
)

func makePkKeyValue(t *testing.T) (string, []byte) {
t.Helper()

_, pub, _ := crypto.GenerateEd25519Key(rng)
v, err := crypto.MarshalPublicKey(pub)
require.NoError(t, err)

id, err := peer.IDFromPublicKey(pub)
require.NoError(t, err)

key := fmt.Sprintf("/pk/%s", string(id))

return key, v
}

func TestGetSetValueLocal(t *testing.T) {
ctx := kadtest.CtxShort(t)

top := NewTopology(t)
d := top.AddServer(nil)

key, v := makePkKeyValue(t)

err := d.putValueLocal(ctx, key, v)
require.NoError(t, err)

val, err := d.getValueLocal(ctx, key)
require.NoError(t, err)

require.Equal(t, v, val)
}

func TestGetValueOnePeer(t *testing.T) {
t.Skip("not implemented yet")

ctx := kadtest.CtxShort(t)
top := NewTopology(t)
local := top.AddServer(nil)
remote := top.AddServer(nil)

// store the value on the remote DHT
key, v := makePkKeyValue(t)
err := remote.putValueLocal(ctx, key, v)
require.NoError(t, err)

// connect the two DHTs
top.Connect(ctx, local, remote)

// ask the local DHT to find the value
val, err := local.GetValue(ctx, key)
require.NoError(t, err)

require.Equal(t, v, val)
}
Loading

0 comments on commit 83329a4

Please sign in to comment.