From b2def188150292065b5bfb5ed221bed4e13ae0a8 Mon Sep 17 00:00:00 2001 From: Dennis Trautwein Date: Thu, 28 Sep 2023 14:28:50 +0200 Subject: [PATCH] Add RoutingDHT --- v2/internal/coord/coordinator.go | 2 +- v2/query_test.go | 4 +- v2/routing.go | 42 ++++++++---- v2/routing_test.go | 114 +++++++++++++++---------------- 4 files changed, 89 insertions(+), 73 deletions(-) diff --git a/v2/internal/coord/coordinator.go b/v2/internal/coord/coordinator.go index f932f164..f0d13568 100644 --- a/v2/internal/coord/coordinator.go +++ b/v2/internal/coord/coordinator.go @@ -383,7 +383,7 @@ func (c *Coordinator) QueryMessage(ctx context.Context, msg *pb.Message, fn coor ctx, span := c.tele.Tracer.Start(ctx, "Coordinator.QueryMessage") defer span.End() if msg == nil { - return coordt.QueryStats{}, fmt.Errorf("no message supplied for query") + return nil, coordt.QueryStats{}, fmt.Errorf("no message supplied for query") } c.cfg.Logger.Debug("starting query with message", tele.LogAttrKey(msg.Target()), slog.String("type", msg.Type.String())) diff --git a/v2/query_test.go b/v2/query_test.go index bb628a0c..b6c5e94c 100644 --- a/v2/query_test.go +++ b/v2/query_test.go @@ -29,7 +29,7 @@ func TestRTAdditionOnSuccessfulQuery(t *testing.T) { require.ErrorIs(t, err, coordt.ErrNodeNotFound) // // but when d3 queries d2, d1 and d3 discover each other - _, _ = d3.FindPeer(ctx, "something") + _, _ = NewRouting(d3).FindPeer(ctx, "something") // ignore the error // d3 should update its routing table to include d1 during the query @@ -74,7 +74,7 @@ func TestRTEvictionOnFailedQuery(t *testing.T) { require.NoError(t, err) // failed queries should remove the queried peers from the routing table - _, _ = d1.FindPeer(ctx, "test") + _, _ = NewRouting(d1).FindPeer(ctx, "test") // d1 should update its routing table to remove d2 because of the failure _, err = top.ExpectRoutingRemoved(ctx, d1, d2.host.ID()) diff --git a/v2/routing.go b/v2/routing.go index a0745945..cd9cf7de 100644 --- a/v2/routing.go +++ b/v2/routing.go @@ -24,9 +24,25 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/v2/pb" ) -var _ routing.Routing = (*DHT)(nil) +// RoutingDHT is a wrapper around the [DHT] struct that implements the +// [routing.Routing] interface. As people have raised concerns about the +// interface, we decided to not "pollute" the DHTs public API surface with +// interface methods that we can already foresee will eventually change. +// Use the [NewRouting] convenience method to create a new RoutingDHT. +type RoutingDHT struct { + *DHT // the wrapped DHT +} + +var _ routing.Routing = (*RoutingDHT)(nil) + +// NewRouting wraps the given [DHT] in a [RoutingDHT] that implements the +// [routing.Routing] interface. See [RoutingDHT]'s documentation for more +// information. +func NewRouting(d *DHT) *RoutingDHT { + return &RoutingDHT{DHT: d} +} -func (d *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { +func (d *RoutingDHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { ctx, span := d.tele.Tracer.Start(ctx, "DHT.FindPeer") defer span.End() @@ -64,7 +80,7 @@ func (d *DHT) FindPeer(ctx context.Context, id peer.ID) (peer.AddrInfo, error) { return d.host.Peerstore().PeerInfo(foundPeer), nil } -func (d *DHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error { +func (d *RoutingDHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error { ctx, span := d.tele.Tracer.Start(ctx, "DHT.Provide", otel.WithAttributes(attribute.String("cid", c.String()))) defer span.End() @@ -109,13 +125,13 @@ func (d *DHT) Provide(ctx context.Context, c cid.Cid, brdcst bool) error { return d.kad.BroadcastRecord(ctx, msg) } -func (d *DHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo { +func (d *RoutingDHT) FindProvidersAsync(ctx context.Context, c cid.Cid, count int) <-chan peer.AddrInfo { peerOut := make(chan peer.AddrInfo) go d.findProvidersAsyncRoutine(ctx, c, count, peerOut) return peerOut } -func (d *DHT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count int, out chan<- peer.AddrInfo) { +func (d *RoutingDHT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count int, out chan<- peer.AddrInfo) { _, span := d.tele.Tracer.Start(ctx, "DHT.findProvidersAsyncRoutine", otel.WithAttributes(attribute.String("cid", c.String()), attribute.Int("count", count))) defer span.End() @@ -216,7 +232,7 @@ func (d *DHT) findProvidersAsyncRoutine(ctx context.Context, c cid.Cid, count in // format `/$namespace/$binary_id`. Namespace examples are `pk` or `ipns`. To // identify the closest peers to keyStr, that complete string will be SHA256 // hashed. -func (d *DHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ...routing.Option) error { +func (d *RoutingDHT) PutValue(ctx context.Context, keyStr string, value []byte, opts ...routing.Option) error { ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValue") defer span.End() @@ -255,7 +271,7 @@ func (d *DHT) PutValue(ctx context.Context, keyStr string, value []byte, opts .. // putValueLocal stores a value in the local datastore without reaching out to // the network. -func (d *DHT) putValueLocal(ctx context.Context, key string, value []byte) error { +func (d *RoutingDHT) putValueLocal(ctx context.Context, key string, value []byte) error { ctx, span := d.tele.Tracer.Start(ctx, "DHT.PutValueLocal") defer span.End() @@ -280,7 +296,7 @@ func (d *DHT) putValueLocal(ctx context.Context, key string, value []byte) error return nil } -func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { +func (d *RoutingDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { ctx, span := d.tele.Tracer.Start(ctx, "DHT.GetValue") defer span.End() @@ -307,7 +323,7 @@ func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) // SearchValue will search in the DHT for keyStr. keyStr must have the form // `/$namespace/$binary_id` -func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing.Option) (<-chan []byte, error) { +func (d *RoutingDHT) SearchValue(ctx context.Context, keyStr string, options ...routing.Option) (<-chan []byte, error) { _, span := d.tele.Tracer.Start(ctx, "DHT.SearchValue") defer span.End() @@ -363,7 +379,7 @@ func (d *DHT) SearchValue(ctx context.Context, keyStr string, options ...routing return out, nil } -func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string, path string, ropt *routing.Options, out chan<- []byte) { +func (d *RoutingDHT) searchValueRoutine(ctx context.Context, backend Backend, ns string, path string, ropt *routing.Options, out chan<- []byte) { _, span := d.tele.Tracer.Start(ctx, "DHT.searchValueRoutine") defer span.End() defer close(out) @@ -438,7 +454,7 @@ func (d *DHT) searchValueRoutine(ctx context.Context, backend Backend, ns string _, _, err := d.kad.QueryMessage(ctx, req, fn, d.cfg.BucketSize) if err != nil { - d.logErr(err, "Search value query failed") + d.warnErr(err, "Search value query failed") return } @@ -487,7 +503,7 @@ func RoutingQuorum(n int) routing.Option { // getQuorum extracts the quorum value from the given routing options and // returns [Config.DefaultQuorum] if no quorum value is present. -func (d *DHT) getQuorum(opts *routing.Options) int { +func (d *RoutingDHT) getQuorum(opts *routing.Options) int { quorum, ok := opts.Other[quorumOptionKey{}].(int) if !ok { quorum = d.cfg.Query.DefaultQuorum @@ -496,7 +512,7 @@ func (d *DHT) getQuorum(opts *routing.Options) int { return quorum } -func (d *DHT) Bootstrap(ctx context.Context) error { +func (d *RoutingDHT) Bootstrap(ctx context.Context) error { ctx, span := d.tele.Tracer.Start(ctx, "DHT.Bootstrap") defer span.End() d.log.Info("Starting bootstrap") diff --git a/v2/routing_test.go b/v2/routing_test.go index 50d77895..739001c2 100644 --- a/v2/routing_test.go +++ b/v2/routing_test.go @@ -61,7 +61,7 @@ func TestDHT_FindPeer_happy_path(t *testing.T) { d4 := top.AddServer(nil) top.ConnectChain(ctx, d1, d2, d3, d4) - addrInfo, err := d1.FindPeer(ctx, d4.host.ID()) + addrInfo, err := NewRouting(d1).FindPeer(ctx, d4.host.ID()) require.NoError(t, err) assert.Equal(t, d4.host.ID(), addrInfo.ID) } @@ -76,7 +76,7 @@ func TestDHT_FindPeer_not_found(t *testing.T) { d4 := top.AddServer(nil) top.ConnectChain(ctx, d1, d2, d3) - _, err := d1.FindPeer(ctx, d4.host.ID()) + _, err := NewRouting(d1).FindPeer(ctx, d4.host.ID()) assert.Error(t, err) } @@ -96,7 +96,7 @@ func TestDHT_FindPeer_already_connected(t *testing.T) { }) require.NoError(t, err) - _, err = d1.FindPeer(ctx, d4.host.ID()) + _, err = NewRouting(d1).FindPeer(ctx, d4.host.ID()) assert.NoError(t, err) } @@ -112,7 +112,7 @@ func TestDHT_PutValue_happy_path(t *testing.T) { k, v := makePkKeyValue(t) - err := d1.PutValue(ctx, k, v) + err := NewRouting(d1).PutValue(ctx, k, v) require.NoError(t, err) deadline, hasDeadline := ctx.Deadline() @@ -126,7 +126,7 @@ func TestDHT_PutValue_happy_path(t *testing.T) { // guaranteed. The data will be flushed at this point, but the remote might // not have handled it yet. Therefore, we use "EventuallyWithT" here. assert.EventuallyWithT(t, func(t *assert.CollectT) { - val, err := d2.GetValue(ctx, k, routing.Offline) + val, err := NewRouting(d2).GetValue(ctx, k, routing.Offline) assert.NoError(t, err) assert.Equal(t, v, val) }, time.Until(deadline), 10*time.Millisecond) @@ -140,7 +140,7 @@ func TestDHT_PutValue_local_only(t *testing.T) { key, v := makePkKeyValue(t) - err := d.PutValue(ctx, key, v, routing.Offline) + err := NewRouting(d).PutValue(ctx, key, v, routing.Offline) require.NoError(t, err) } @@ -153,12 +153,12 @@ func TestDHT_PutValue_invalid_key(t *testing.T) { _, v := makePkKeyValue(t) t.Run("unknown namespace", func(t *testing.T) { - err := d.PutValue(ctx, "/unknown/some_key", v) + err := NewRouting(d).PutValue(ctx, "/unknown/some_key", v) assert.ErrorIs(t, err, routing.ErrNotSupported) }) t.Run("no namespace", func(t *testing.T) { - err := d.PutValue(ctx, "no namespace", v) + err := NewRouting(d).PutValue(ctx, "no namespace", v) assert.ErrorContains(t, err, "splitting key") }) } @@ -171,7 +171,7 @@ func TestDHT_PutValue_routing_option_returns_error(t *testing.T) { return fmt.Errorf("some error") } - err := d.PutValue(ctx, "/ipns/some-key", []byte("some value"), errOption) + err := NewRouting(d).PutValue(ctx, "/ipns/some-key", []byte("some value"), errOption) assert.ErrorContains(t, err, "routing options") } @@ -183,14 +183,14 @@ func TestGetValueOnePeer(t *testing.T) { // store the value on the remote DHT key, v := makePkKeyValue(t) - err := remote.putValueLocal(ctx, key, v) + err := NewRouting(remote).putValueLocal(ctx, key, v) require.NoError(t, err) // connect the two DHTs top.Connect(ctx, local, remote) // ask the local DHT to find the value - val, err := local.GetValue(ctx, key) + val, err := NewRouting(local).GetValue(ctx, key) require.NoError(t, err) require.Equal(t, v, val) @@ -201,7 +201,7 @@ func TestDHT_Provide_no_providers_backend_registered(t *testing.T) { d := newTestDHT(t) delete(d.backends, namespaceProviders) - err := d.Provide(ctx, newRandomContent(t), true) + err := NewRouting(d).Provide(ctx, newRandomContent(t), true) assert.ErrorIs(t, err, routing.ErrNotSupported) } @@ -209,7 +209,7 @@ func TestDHT_Provide_undefined_cid(t *testing.T) { ctx := kadtest.CtxShort(t) d := newTestDHT(t) - err := d.Provide(ctx, cid.Cid{}, true) + err := NewRouting(d).Provide(ctx, cid.Cid{}, true) assert.ErrorContains(t, err, "invalid cid") } @@ -232,7 +232,7 @@ func TestDHT_Provide_erroneous_datastore(t *testing.T) { be.datastore = dstore - err = d.Provide(ctx, newRandomContent(t), true) + err = NewRouting(d).Provide(ctx, newRandomContent(t), true) assert.ErrorIs(t, err, testErr) } @@ -241,7 +241,7 @@ func TestDHT_Provide_does_nothing_if_broadcast_is_false(t *testing.T) { d := newTestDHT(t) // unconnected DHT c := newRandomContent(t) - err := d.Provide(ctx, c, false) + err := NewRouting(d).Provide(ctx, c, false) assert.NoError(t, err) // still stored locally @@ -260,7 +260,7 @@ func TestDHT_Provide_fails_if_routing_table_is_empty(t *testing.T) { ctx := kadtest.CtxShort(t) d := newTestDHT(t) - err := d.Provide(ctx, newRandomContent(t), true) + err := NewRouting(d).Provide(ctx, newRandomContent(t), true) assert.Error(t, err) } @@ -269,7 +269,7 @@ func TestDHT_FindProvidersAsync_empty_routing_table(t *testing.T) { d := newTestDHT(t) c := newRandomContent(t) - out := d.FindProvidersAsync(ctx, c, 1) + out := NewRouting(d).FindProvidersAsync(ctx, c, 1) assertClosed(t, ctx, out) } @@ -280,7 +280,7 @@ func TestDHT_FindProvidersAsync_dht_does_not_support_providers(t *testing.T) { delete(d.backends, namespaceProviders) - out := d.FindProvidersAsync(ctx, newRandomContent(t), 1) + out := NewRouting(d).FindProvidersAsync(ctx, newRandomContent(t), 1) assertClosed(t, ctx, out) } @@ -294,7 +294,7 @@ func TestDHT_FindProvidersAsync_providers_stored_locally(t *testing.T) { _, err := d.backends[namespaceProviders].Store(ctx, string(c.Hash()), provider) require.NoError(t, err) - out := d.FindProvidersAsync(ctx, c, 1) + out := NewRouting(d).FindProvidersAsync(ctx, c, 1) val := readItem(t, ctx, out) assert.Equal(t, provider.ID, val.ID) @@ -321,7 +321,7 @@ func TestDHT_FindProvidersAsync_returns_only_count_from_local_store(t *testing.T require.NoError(t, err) } - out := d.FindProvidersAsync(ctx, c, requestedCount) + out := NewRouting(d).FindProvidersAsync(ctx, c, requestedCount) returnedCount := 0 LOOP: @@ -355,7 +355,7 @@ func TestDHT_FindProvidersAsync_queries_other_peers(t *testing.T) { _, err := d3.backends[namespaceProviders].Store(ctx, string(c.Hash()), provider) require.NoError(t, err) - out := d1.FindProvidersAsync(ctx, c, 1) + out := NewRouting(d1).FindProvidersAsync(ctx, c, 1) val := readItem(t, ctx, out) assert.Equal(t, provider.ID, val.ID) @@ -388,7 +388,7 @@ func TestDHT_FindProvidersAsync_respects_cancelled_context_for_local_query(t *te cancelledCtx, cancel := context.WithCancel(ctx) cancel() - out := d.FindProvidersAsync(cancelledCtx, c, 0) + out := NewRouting(d).FindProvidersAsync(cancelledCtx, c, 0) returnedCount := 0 LOOP: @@ -438,7 +438,7 @@ func TestDHT_FindProvidersAsync_does_not_return_same_record_twice(t *testing.T) _, err = d2.backends[namespaceProviders].Store(ctx, string(c.Hash()), provider2) require.NoError(t, err) - out := d1.FindProvidersAsync(ctx, c, 0) + out := NewRouting(d1).FindProvidersAsync(ctx, c, 0) count := 0 LOOP: for { @@ -473,7 +473,7 @@ func TestDHT_FindProvidersAsync_datastore_error(t *testing.T) { be.datastore = dstore - out := d.FindProvidersAsync(ctx, newRandomContent(t), 0) + out := NewRouting(d).FindProvidersAsync(ctx, newRandomContent(t), 0) assertClosed(t, ctx, out) } @@ -481,7 +481,7 @@ func TestDHT_FindProvidersAsync_invalid_key(t *testing.T) { ctx := kadtest.CtxShort(t) d := newTestDHT(t) - out := d.FindProvidersAsync(ctx, cid.Cid{}, 0) + out := NewRouting(d).FindProvidersAsync(ctx, cid.Cid{}, 0) assertClosed(t, ctx, out) } @@ -510,16 +510,16 @@ func TestDHT_GetValue_happy_path(t *testing.T) { top.ConnectChain(ctx, d1, d2, d3, d4, d5) - err = d3.putValueLocal(ctx, key, validValue) + err = NewRouting(d3).putValueLocal(ctx, key, validValue) require.NoError(t, err) - err = d4.putValueLocal(ctx, key, worseValue) + err = NewRouting(d4).putValueLocal(ctx, key, worseValue) require.NoError(t, err) - err = d5.putValueLocal(ctx, key, betterValue) + err = NewRouting(d5).putValueLocal(ctx, key, betterValue) require.NoError(t, err) - val, err := d1.GetValue(ctx, key) + val, err := NewRouting(d1).GetValue(ctx, key) assert.NoError(t, err) assert.Equal(t, betterValue, val) } @@ -531,7 +531,7 @@ func TestDHT_GetValue_returns_context_error(t *testing.T) { cancelledCtx, cancel := context.WithCancel(ctx) cancel() - _, err := d.GetValue(cancelledCtx, "/"+namespaceIPNS+"/some-key") + _, err := NewRouting(d).GetValue(cancelledCtx, "/"+namespaceIPNS+"/some-key") assert.ErrorIs(t, err, context.Canceled) } @@ -539,7 +539,7 @@ func TestDHT_GetValue_returns_not_found_error(t *testing.T) { ctx := kadtest.CtxShort(t) d := newTestDHT(t) - valueChan, err := d.GetValue(ctx, "/"+namespaceIPNS+"/some-key") + valueChan, err := NewRouting(d).GetValue(ctx, "/"+namespaceIPNS+"/some-key") assert.ErrorIs(t, err, routing.ErrNotFound) assert.Nil(t, valueChan) } @@ -583,10 +583,10 @@ func TestDHT_SearchValue_simple(t *testing.T) { top.Connect(ctx, d1, d2) - err := d2.putValueLocal(ctx, key, v) + err := NewRouting(d2).putValueLocal(ctx, key, v) require.NoError(t, err) - valChan, err := d1.SearchValue(ctx, key) + valChan, err := NewRouting(d1).SearchValue(ctx, key) require.NoError(t, err) val := readItem(t, ctx, valChan) @@ -626,16 +626,16 @@ func TestDHT_SearchValue_returns_best_values(t *testing.T) { top.ConnectChain(ctx, d1, d2, d3, d4, d5) - err = d3.putValueLocal(ctx, key, validValue) + err = NewRouting(d3).putValueLocal(ctx, key, validValue) require.NoError(t, err) - err = d4.putValueLocal(ctx, key, worseValue) + err = NewRouting(d4).putValueLocal(ctx, key, worseValue) require.NoError(t, err) - err = d5.putValueLocal(ctx, key, betterValue) + err = NewRouting(d5).putValueLocal(ctx, key, betterValue) require.NoError(t, err) - valChan, err := d1.SearchValue(ctx, key) + valChan, err := NewRouting(d1).SearchValue(ctx, key) require.NoError(t, err) val := readItem(t, ctx, valChan) @@ -716,25 +716,25 @@ func (suite *SearchValueQuorumTestSuite) SetupTest() { // The first four DHT servers hold a valid but old value for i := 1; i < 4; i++ { - err = suite.servers[i].putValueLocal(ctx, suite.key, suite.validValue) + err = NewRouting(suite.servers[i]).putValueLocal(ctx, suite.key, suite.validValue) require.NoError(t, err) } // The remaining six DHT servers hold a valid and newer record for i := 4; i < 10; i++ { - err = suite.servers[i].putValueLocal(ctx, suite.key, suite.betterValue) + err = NewRouting(suite.servers[i]).putValueLocal(ctx, suite.key, suite.betterValue) require.NoError(t, err) } // one of the remaining returns and old record again - err = suite.servers[8].putValueLocal(ctx, suite.key, suite.betterValue) + err = NewRouting(suite.servers[8]).putValueLocal(ctx, suite.key, suite.betterValue) require.NoError(t, err) } func (suite *SearchValueQuorumTestSuite) TestQuorumReachedPrematurely() { t := suite.T() ctx := kadtest.CtxShort(t) - out, err := suite.d.SearchValue(ctx, suite.key, RoutingQuorum(3)) + out, err := NewRouting(suite.d).SearchValue(ctx, suite.key, RoutingQuorum(3)) require.NoError(t, err) val := readItem(t, ctx, out) @@ -746,7 +746,7 @@ func (suite *SearchValueQuorumTestSuite) TestQuorumReachedPrematurely() { func (suite *SearchValueQuorumTestSuite) TestQuorumReachedAfterDiscoveryOfBetter() { t := suite.T() ctx := kadtest.CtxShort(t) - out, err := suite.d.SearchValue(ctx, suite.key, RoutingQuorum(5)) + out, err := NewRouting(suite.d).SearchValue(ctx, suite.key, RoutingQuorum(5)) require.NoError(t, err) val := readItem(t, ctx, out) @@ -763,7 +763,7 @@ func (suite *SearchValueQuorumTestSuite) TestQuorumZero() { ctx := kadtest.CtxShort(t) // search until query exhausted - out, err := suite.d.SearchValue(ctx, suite.key, RoutingQuorum(0)) + out, err := NewRouting(suite.d).SearchValue(ctx, suite.key, RoutingQuorum(0)) require.NoError(t, err) val := readItem(t, ctx, out) @@ -780,7 +780,7 @@ func (suite *SearchValueQuorumTestSuite) TestQuorumUnspecified() { ctx := kadtest.CtxShort(t) // search until query exhausted - out, err := suite.d.SearchValue(ctx, suite.key) + out, err := NewRouting(suite.d).SearchValue(ctx, suite.key) require.NoError(t, err) val := readItem(t, ctx, out) @@ -800,7 +800,7 @@ func TestDHT_SearchValue_routing_option_returns_error(t *testing.T) { return fmt.Errorf("some error") } - valueChan, err := d.SearchValue(ctx, "/ipns/some-key", errOption) + valueChan, err := NewRouting(d).SearchValue(ctx, "/ipns/some-key", errOption) assert.ErrorContains(t, err, "routing options") assert.Nil(t, valueChan) } @@ -809,7 +809,7 @@ func TestDHT_SearchValue_quorum_negative(t *testing.T) { ctx := kadtest.CtxShort(t) d := newTestDHT(t) - out, err := d.SearchValue(ctx, "/"+namespaceIPNS+"/some-key", RoutingQuorum(-1)) + out, err := NewRouting(d).SearchValue(ctx, "/"+namespaceIPNS+"/some-key", RoutingQuorum(-1)) assert.ErrorContains(t, err, "quorum must not be negative") assert.Nil(t, out) } @@ -818,7 +818,7 @@ func TestDHT_SearchValue_invalid_key(t *testing.T) { ctx := kadtest.CtxShort(t) d := newTestDHT(t) - valueChan, err := d.SearchValue(ctx, "invalid-key") + valueChan, err := NewRouting(d).SearchValue(ctx, "invalid-key") assert.ErrorContains(t, err, "splitting key") assert.Nil(t, valueChan) } @@ -827,7 +827,7 @@ func TestDHT_SearchValue_key_for_unsupported_namespace(t *testing.T) { ctx := kadtest.CtxShort(t) d := newTestDHT(t) - valueChan, err := d.SearchValue(ctx, "/unsupported/key") + valueChan, err := NewRouting(d).SearchValue(ctx, "/unsupported/key") assert.ErrorIs(t, err, routing.ErrNotSupported) assert.Nil(t, valueChan) } @@ -843,7 +843,7 @@ func TestDHT_SearchValue_stops_with_cancelled_context(t *testing.T) { d2 := top.AddServer(nil) top.Connect(ctx, d1, d2) - valueChan, err := d1.SearchValue(cancelledCtx, "/"+namespaceIPNS+"/some-key") + valueChan, err := NewRouting(d1).SearchValue(cancelledCtx, "/"+namespaceIPNS+"/some-key") assert.NoError(t, err) assertClosed(t, ctx, valueChan) } @@ -864,13 +864,13 @@ func TestDHT_SearchValue_has_record_locally(t *testing.T) { top.Connect(ctx, d1, d2) - err := d1.putValueLocal(ctx, key, validValue) + err := NewRouting(d1).putValueLocal(ctx, key, validValue) require.NoError(t, err) - err = d2.putValueLocal(ctx, key, betterValue) + err = NewRouting(d2).putValueLocal(ctx, key, betterValue) require.NoError(t, err) - valChan, err := d1.SearchValue(ctx, key) + valChan, err := NewRouting(d1).SearchValue(ctx, key) require.NoError(t, err) val := readItem(t, ctx, valChan) // from local store @@ -889,10 +889,10 @@ func TestDHT_SearchValue_offline(t *testing.T) { d := newTestDHT(t) key, v := makePkKeyValue(t) - err := d.putValueLocal(ctx, key, v) + err := NewRouting(d).putValueLocal(ctx, key, v) require.NoError(t, err) - valChan, err := d.SearchValue(ctx, key, routing.Offline) + valChan, err := NewRouting(d).SearchValue(ctx, key, routing.Offline) require.NoError(t, err) val := readItem(t, ctx, valChan) @@ -915,10 +915,10 @@ func TestDHT_SearchValue_offline_not_found_locally(t *testing.T) { top.Connect(ctx, d1, d2) - err := d2.putValueLocal(ctx, key, v) + err := NewRouting(d2).putValueLocal(ctx, key, v) require.NoError(t, err) - valChan, err := d1.SearchValue(ctx, key, routing.Offline) + valChan, err := NewRouting(d1).SearchValue(ctx, key, routing.Offline) assert.ErrorIs(t, err, routing.ErrNotFound) assert.Nil(t, valChan) } @@ -937,7 +937,7 @@ func TestDHT_Bootstrap_no_peers_configured(t *testing.T) { {ID: d3.host.ID(), Addrs: d3.host.Addrs()}, } - err := d1.Bootstrap(ctx) + err := NewRouting(d1).Bootstrap(ctx) assert.NoError(t, err) deadline, hasDeadline := ctx.Deadline()