From 951281274024e24dd45290831963fe5a32d11e81 Mon Sep 17 00:00:00 2001 From: Gus Eggert Date: Wed, 8 Mar 2023 15:48:56 -0500 Subject: [PATCH 1/3] test: port legacy DHT tests to Go --- test/cli/dht_legacy_test.go | 137 ++++++++++++++++++++++++++++++ test/cli/harness/harness.go | 2 +- test/cli/harness/node.go | 44 +++++++--- test/cli/harness/nodes.go | 16 ++++ test/cli/harness/run.go | 8 +- test/sharness/t0170-legacy-dht.sh | 121 -------------------------- 6 files changed, 192 insertions(+), 136 deletions(-) create mode 100644 test/cli/dht_legacy_test.go delete mode 100755 test/sharness/t0170-legacy-dht.sh diff --git a/test/cli/dht_legacy_test.go b/test/cli/dht_legacy_test.go new file mode 100644 index 00000000000..437b62ae4ea --- /dev/null +++ b/test/cli/dht_legacy_test.go @@ -0,0 +1,137 @@ +package cli + +import ( + "sort" + "sync" + "testing" + + "github.com/ipfs/kubo/test/cli/harness" + "github.com/ipfs/kubo/test/cli/testutils" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLegacyDHT(t *testing.T) { + nodes := harness.NewT(t).NewNodes(5).Init() + nodes.ForEachPar(func(node *harness.Node) { + node.IPFS("config", "Routing.Type", "dht") + }) + nodes.StartDaemons().Connect() + + t.Run("ipfs dht findpeer", func(t *testing.T) { + t.Parallel() + res := nodes[1].RunIPFS("dht", "findpeer", nodes[0].PeerID().String()) + assert.Equal(t, 0, res.ExitCode()) + + swarmAddr := nodes[0].SwarmAddrsWithoutPeerIDs()[0] + require.Equal(t, swarmAddr.String(), res.Stdout.Trimmed()) + }) + + t.Run("ipfs dht get ", func(t *testing.T) { + t.Parallel() + hash := nodes[2].IPFSAddStr("hello world") + nodes[2].IPFS("name", "publish", "/ipfs/"+hash) + + res := nodes[1].IPFS("dht", "get", "/ipns/"+nodes[2].PeerID().String()) + assert.Contains(t, res.Stdout.String(), "/ipfs/"+hash) + + t.Run("put round trips (#3124)", func(t *testing.T) { + t.Parallel() + nodes[0].WriteBytes("get_result", res.Stdout.Bytes()) + res := nodes[0].IPFS("dht", "put", "/ipns/"+nodes[2].PeerID().String(), "get_result") + assert.Greater(t, len(res.Stdout.Lines()), 0, "should put to at least one node") + }) + + t.Run("put with bad keys fails (issue #5113, #4611)", func(t *testing.T) { + t.Parallel() + keys := []string{"foo", "/pk/foo", "/ipns/foo"} + for _, key := range keys { + key := key + t.Run(key, func(t *testing.T) { + t.Parallel() + res := nodes[0].RunIPFS("dht", "put", key) + assert.Equal(t, 1, res.ExitCode()) + assert.Contains(t, res.Stderr.String(), "invalid") + assert.Empty(t, res.Stdout.String()) + }) + } + }) + + t.Run("get with bad keys (issue #4611)", func(t *testing.T) { + for _, key := range []string{"foo", "/pk/foo"} { + key := key + t.Run(key, func(t *testing.T) { + t.Parallel() + res := nodes[0].RunIPFS("dht", "get", key) + assert.Equal(t, 1, res.ExitCode()) + assert.Contains(t, res.Stderr.String(), "invalid") + assert.Empty(t, res.Stdout.String()) + }) + } + }) + }) + + t.Run("ipfs dht findprovs", func(t *testing.T) { + t.Parallel() + hash := nodes[3].IPFSAddStr("some stuff") + res := nodes[4].IPFS("dht", "findprovs", hash) + assert.Equal(t, nodes[3].PeerID().String(), res.Stdout.Trimmed()) + }) + + t.Run("ipfs dht query ", func(t *testing.T) { + t.Parallel() + t.Run("normal DHT configuration", func(t *testing.T) { + t.Parallel() + hash := nodes[0].IPFSAddStr("some other stuff") + peerCounts := map[string]int{} + peerCountsMut := sync.Mutex{} + harness.Nodes(nodes).ForEachPar(func(node *harness.Node) { + res := node.IPFS("dht", "query", hash) + closestPeer := res.Stdout.Lines()[0] + // check that it's a valid peer ID + _, err := peer.Decode(closestPeer) + require.NoError(t, err) + + peerCountsMut.Lock() + peerCounts[closestPeer]++ + peerCountsMut.Unlock() + }) + // 4 nodes should see the same peer ID + // 1 node (the closest) should see a different one + var counts []int + for _, count := range peerCounts { + counts = append(counts, count) + } + sort.IntSlice(counts).Sort() + assert.Equal(t, []int{1, 4}, counts) + }) + + }) + + t.Run("dht commands fail when offline", func(t *testing.T) { + t.Parallel() + node := harness.NewT(t).NewNode().Init() + + // these cannot be run in parallel due to repo locking (seems like a bug) + + t.Run("dht findprovs", func(t *testing.T) { + res := node.RunIPFS("dht", "findprovs", testutils.CIDEmptyDir) + assert.Equal(t, 1, res.ExitCode()) + assert.Contains(t, res.Stderr.String(), "this command must be run in online mode") + }) + + t.Run("dht findpeer", func(t *testing.T) { + res := node.RunIPFS("dht", "findpeer", testutils.CIDEmptyDir) + assert.Equal(t, 1, res.ExitCode()) + assert.Contains(t, res.Stderr.String(), "this command must be run in online mode") + }) + + t.Run("dht put", func(t *testing.T) { + node.WriteBytes("foo", []byte("foo")) + res := node.RunIPFS("dht", "put", "/ipns/"+node.PeerID().String(), "foo") + assert.Equal(t, 1, res.ExitCode()) + assert.Contains(t, res.Stderr.String(), "this action must be run in online mode") + }) + }) +} diff --git a/test/cli/harness/harness.go b/test/cli/harness/harness.go index de962e1c120..a35fead3512 100644 --- a/test/cli/harness/harness.go +++ b/test/cli/harness/harness.go @@ -171,7 +171,7 @@ func (h *Harness) Mkdirs(paths ...string) { } } -func (h *Harness) Sh(expr string) RunResult { +func (h *Harness) Sh(expr string) *RunResult { return h.Runner.Run(RunRequest{ Path: "bash", Args: []string{"-c", expr}, diff --git a/test/cli/harness/node.go b/test/cli/harness/node.go index 0d0295307ff..181fca99bc4 100644 --- a/test/cli/harness/node.go +++ b/test/cli/harness/node.go @@ -129,23 +129,23 @@ func (n *Node) UpdateConfigAndUserSuppliedResourceManagerOverrides(f func(cfg *c n.WriteUserSuppliedResourceOverrides(overrides) } -func (n *Node) IPFS(args ...string) RunResult { +func (n *Node) IPFS(args ...string) *RunResult { res := n.RunIPFS(args...) n.Runner.AssertNoError(res) return res } -func (n *Node) PipeStrToIPFS(s string, args ...string) RunResult { +func (n *Node) PipeStrToIPFS(s string, args ...string) *RunResult { return n.PipeToIPFS(strings.NewReader(s), args...) } -func (n *Node) PipeToIPFS(reader io.Reader, args ...string) RunResult { +func (n *Node) PipeToIPFS(reader io.Reader, args ...string) *RunResult { res := n.RunPipeToIPFS(reader, args...) n.Runner.AssertNoError(res) return res } -func (n *Node) RunPipeToIPFS(reader io.Reader, args ...string) RunResult { +func (n *Node) RunPipeToIPFS(reader io.Reader, args ...string) *RunResult { return n.Runner.Run(RunRequest{ Path: n.IPFSBin, Args: args, @@ -153,7 +153,7 @@ func (n *Node) RunPipeToIPFS(reader io.Reader, args ...string) RunResult { }) } -func (n *Node) RunIPFS(args ...string) RunResult { +func (n *Node) RunIPFS(args ...string) *RunResult { return n.Runner.Run(RunRequest{ Path: n.IPFSBin, Args: args, @@ -216,7 +216,7 @@ func (n *Node) StartDaemon(ipfsArgs ...string) *Node { RunFunc: (*exec.Cmd).Start, }) - n.Daemon = &res + n.Daemon = res log.Debugf("node %d started, checking API", n.ID) n.WaitOnAPI() @@ -399,8 +399,6 @@ func (n *Node) SwarmAddrs() []multiaddr.Multiaddr { Path: n.IPFSBin, Args: []string{"swarm", "addrs", "local"}, }) - ipfsProtocol := multiaddr.ProtocolWithCode(multiaddr.P_IPFS).Name - peerID := n.PeerID() out := strings.TrimSpace(res.Stdout.String()) outLines := strings.Split(out, "\n") var addrs []multiaddr.Multiaddr @@ -409,9 +407,18 @@ func (n *Node) SwarmAddrs() []multiaddr.Multiaddr { if err != nil { panic(err) } + addrs = append(addrs, ma) + } + return addrs +} +func (n *Node) SwarmAddrsWithPeerIDs() []multiaddr.Multiaddr { + ipfsProtocol := multiaddr.ProtocolWithCode(multiaddr.P_IPFS).Name + peerID := n.PeerID() + var addrs []multiaddr.Multiaddr + for _, ma := range n.SwarmAddrs() { // add the peer ID to the multiaddr if it doesn't have it - _, err = ma.ValueForProtocol(multiaddr.P_IPFS) + _, err := ma.ValueForProtocol(multiaddr.P_IPFS) if errors.Is(err, multiaddr.ErrProtocolNotFound) { comp, err := multiaddr.NewComponent(ipfsProtocol, peerID.String()) if err != nil { @@ -424,10 +431,27 @@ func (n *Node) SwarmAddrs() []multiaddr.Multiaddr { return addrs } +func (n *Node) SwarmAddrsWithoutPeerIDs() []multiaddr.Multiaddr { + var addrs []multiaddr.Multiaddr + for _, ma := range n.SwarmAddrs() { + var components []multiaddr.Multiaddr + multiaddr.ForEach(ma, func(c multiaddr.Component) bool { + if c.Protocol().Code == multiaddr.P_IPFS { + return true + } + components = append(components, &c) + return true + }) + ma = multiaddr.Join(components...) + addrs = append(addrs, ma) + } + return addrs +} + func (n *Node) Connect(other *Node) *Node { n.Runner.MustRun(RunRequest{ Path: n.IPFSBin, - Args: []string{"swarm", "connect", other.SwarmAddrs()[0].String()}, + Args: []string{"swarm", "connect", other.SwarmAddrsWithPeerIDs()[0].String()}, }) return n } diff --git a/test/cli/harness/nodes.go b/test/cli/harness/nodes.go index dbc7de16ba1..872d7767913 100644 --- a/test/cli/harness/nodes.go +++ b/test/cli/harness/nodes.go @@ -4,6 +4,7 @@ import ( "sync" "github.com/multiformats/go-multiaddr" + "golang.org/x/sync/errgroup" ) // Nodes is a collection of Kubo nodes along with operations on groups of nodes. @@ -16,6 +17,21 @@ func (n Nodes) Init(args ...string) Nodes { return n } +func (n Nodes) ForEachPar(f func(*Node)) { + group := &errgroup.Group{} + for _, node := range n { + node := node + group.Go(func() error { + f(node) + return nil + }) + } + err := group.Wait() + if err != nil { + panic(err) + } +} + func (n Nodes) Connect() Nodes { wg := sync.WaitGroup{} for i, node := range n { diff --git a/test/cli/harness/run.go b/test/cli/harness/run.go index 9cbb871bc10..c2a3662be8b 100644 --- a/test/cli/harness/run.go +++ b/test/cli/harness/run.go @@ -51,7 +51,7 @@ func environToMap(environ []string) map[string]string { return m } -func (r *Runner) Run(req RunRequest) RunResult { +func (r *Runner) Run(req RunRequest) *RunResult { cmd := exec.Command(req.Path, req.Args...) stdout := &Buffer{} stderr := &Buffer{} @@ -86,17 +86,17 @@ func (r *Runner) Run(req RunRequest) RunResult { result.ExitErr = exitErr } - return result + return &result } // MustRun runs the command and fails the test if the command fails. -func (r *Runner) MustRun(req RunRequest) RunResult { +func (r *Runner) MustRun(req RunRequest) *RunResult { result := r.Run(req) r.AssertNoError(result) return result } -func (r *Runner) AssertNoError(result RunResult) { +func (r *Runner) AssertNoError(result *RunResult) { if result.ExitErr != nil { log.Panicf("'%s' returned error, code: %d, err: %s\nstdout:%s\nstderr:%s\n", result.Cmd.Args, result.ExitErr.ExitCode(), result.ExitErr.Error(), result.Stdout.String(), result.Stderr.String()) diff --git a/test/sharness/t0170-legacy-dht.sh b/test/sharness/t0170-legacy-dht.sh deleted file mode 100755 index fc11b9044dd..00000000000 --- a/test/sharness/t0170-legacy-dht.sh +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env bash - -# Legacy / deprecated, see: t0170-routing-dht.sh -test_description="Test dht command" - -. lib/test-lib.sh - -test_dht() { - NUM_NODES=5 - - test_expect_success 'init iptb' ' - rm -rf .iptb/ && - iptb testbed create -type localipfs -count $NUM_NODES -init - ' - - test_expect_success 'DHT-only routing' ' - iptb run -- ipfs config Routing.Type dht - ' - - startup_cluster $NUM_NODES $@ - - test_expect_success 'peer ids' ' - PEERID_0=$(iptb attr get 0 id) && - PEERID_2=$(iptb attr get 2 id) - ' - - # ipfs dht findpeer - test_expect_success 'findpeer' ' - ipfsi 1 dht findpeer $PEERID_0 | sort >actual && - ipfsi 0 id -f "" | cut -d / -f 1-5 | sort >expected && - test_cmp actual expected - ' - - # ipfs dht get - test_expect_success 'get with good keys works' ' - HASH="$(echo "hello world" | ipfsi 2 add -q)" && - ipfsi 2 name publish "/ipfs/$HASH" && - ipfsi 1 dht get "/ipns/$PEERID_2" >get_result - ' - - test_expect_success 'get with good keys contains the right value' ' - cat get_result | grep -aq "/ipfs/$HASH" - ' - - test_expect_success 'put round trips (#3124)' ' - ipfsi 0 dht put "/ipns/$PEERID_2" get_result | sort >putted && - [ -s putted ] || - test_fsh cat putted - ' - - test_expect_success 'put with bad keys fails (issue #5113)' ' - ipfsi 0 dht put "foo" <<putted - ipfsi 0 dht put "/pk/foo" <<>putted - ipfsi 0 dht put "/ipns/foo" <<>putted - [ ! -s putted ] || - test_fsh cat putted - ' - - test_expect_success 'put with bad keys returns error (issue #4611)' ' - test_must_fail ipfsi 0 dht put "foo" << afile && - HASH=$(ipfsi 3 add -q afile) - ' - - # ipfs dht findprovs - test_expect_success 'findprovs' ' - ipfsi 4 dht findprovs $HASH > provs && - iptb attr get 3 id > expected && - test_cmp provs expected - ' - - - # ipfs dht query - # - # We test all nodes. 4 nodes should see the same peer ID, one node (the - # closest) should see a different one. - - for i in $(test_seq 0 4); do - test_expect_success "query from $i" ' - ipfsi "$i" dht query "$HASH" | head -1 >closest-$i - ' - done - - test_expect_success "collecting results" ' - cat closest-* | sort | uniq -c | sed -e "s/ *\([0-9]\+\) .*/\1/g" | sort -g > actual && - echo 1 > expected && - echo 4 >> expected - ' - - test_expect_success "checking results" ' - test_cmp actual expected - ' - - test_expect_success 'stop iptb' ' - iptb stop - ' - - test_expect_success "dht commands fail when offline" ' - test_must_fail ipfsi 0 dht findprovs "$HASH" 2>err_findprovs && - test_must_fail ipfsi 0 dht findpeer "$HASH" 2>err_findpeer && - test_must_fail ipfsi 0 dht put "/ipns/$PEERID_2" "get_result" 2>err_put && - test_should_contain "this command must be run in online mode" err_findprovs && - test_should_contain "this command must be run in online mode" err_findpeer && - test_should_contain "this action must be run in online mode" err_put - ' -} - -test_dht -test_dht --enable-pubsub-experiment --enable-namesys-pubsub - -test_done From 983530cdb8bee03108751ae983ba2227fc0c65c2 Mon Sep 17 00:00:00 2001 From: Gus Eggert Date: Wed, 8 Mar 2023 16:25:45 -0500 Subject: [PATCH 2/3] test: parallelize more of rcmgr Go tests --- test/cli/harness/node.go | 8 ---- test/cli/rcmgr_test.go | 82 ++++++++++++++++++++++++---------------- 2 files changed, 50 insertions(+), 40 deletions(-) diff --git a/test/cli/harness/node.go b/test/cli/harness/node.go index 181fca99bc4..cc251e11b0f 100644 --- a/test/cli/harness/node.go +++ b/test/cli/harness/node.go @@ -121,14 +121,6 @@ func (n *Node) UpdateUserSuppliedResourceManagerOverrides(f func(overrides *rcmg n.WriteUserSuppliedResourceOverrides(overrides) } -func (n *Node) UpdateConfigAndUserSuppliedResourceManagerOverrides(f func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig)) { - overrides := n.ReadUserResourceOverrides() - cfg := n.ReadConfig() - f(cfg, overrides) - n.WriteConfig(cfg) - n.WriteUserSuppliedResourceOverrides(overrides) -} - func (n *Node) IPFS(args ...string) *RunResult { res := n.RunIPFS(args...) n.Runner.AssertNoError(res) diff --git a/test/cli/rcmgr_test.go b/test/cli/rcmgr_test.go index fb644e1a746..51b2b0452e3 100644 --- a/test/cli/rcmgr_test.go +++ b/test/cli/rcmgr_test.go @@ -49,6 +49,7 @@ func TestRcmgr(t *testing.T) { }) t.Run("Very high connmgr highwater", func(t *testing.T) { + t.Parallel() node := harness.NewT(t).NewNode().Init() node.UpdateConfig(func(cfg *config.Config) { cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(1000) @@ -74,6 +75,7 @@ func TestRcmgr(t *testing.T) { node.StartDaemon() t.Run("conns and streams are above 800 for default connmgr settings", func(t *testing.T) { + t.Parallel() res := node.RunIPFS("swarm", "resources", "--enc=json") require.Equal(t, 0, res.ExitCode()) limits := unmarshalLimits(t, res.Stdout.Bytes()) @@ -87,6 +89,7 @@ func TestRcmgr(t *testing.T) { }) t.Run("limits should succeed", func(t *testing.T) { + t.Parallel() res := node.RunIPFS("swarm", "resources", "--enc=json") assert.Equal(t, 0, res.ExitCode()) @@ -106,6 +109,7 @@ func TestRcmgr(t *testing.T) { }) t.Run("swarm stats works", func(t *testing.T) { + t.Parallel() res := node.RunIPFS("swarm", "resources", "--enc=json") require.Equal(t, 0, res.ExitCode()) @@ -123,6 +127,7 @@ func TestRcmgr(t *testing.T) { }) t.Run("smoke test transient scope", func(t *testing.T) { + t.Parallel() node := harness.NewT(t).NewNode().Init() node.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { overrides.Transient.Memory = 88888 @@ -135,6 +140,7 @@ func TestRcmgr(t *testing.T) { }) t.Run("smoke test service scope", func(t *testing.T) { + t.Parallel() node := harness.NewT(t).NewNode().Init() node.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { overrides.Service = map[string]rcmgr.ResourceLimits{"foo": {Memory: 77777}} @@ -147,6 +153,7 @@ func TestRcmgr(t *testing.T) { }) t.Run("smoke test protocol scope", func(t *testing.T) { + t.Parallel() node := harness.NewT(t).NewNode().Init() node.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { overrides.Protocol = map[protocol.ID]rcmgr.ResourceLimits{"foo": {Memory: 66666}} @@ -159,6 +166,7 @@ func TestRcmgr(t *testing.T) { }) t.Run("smoke test peer scope", func(t *testing.T) { + t.Parallel() validPeerID, err := peer.Decode("QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN") assert.NoError(t, err) node := harness.NewT(t).NewNode().Init() @@ -172,13 +180,17 @@ func TestRcmgr(t *testing.T) { assert.Equal(t, rcmgr.LimitVal64(55555), limits.Peers[validPeerID].Memory) }) - t.Run("", func(t *testing.T) { + t.Run("blocking and allowlists", func(t *testing.T) { + t.Parallel() nodes := harness.NewT(t).NewNodes(3).Init() node0, node1, node2 := nodes[0], nodes[1], nodes[2] - // peerID0, peerID1, peerID2 := node0.PeerID(), node1.PeerID(), node2.PeerID() peerID1, peerID2 := node1.PeerID().String(), node2.PeerID().String() - node0.UpdateConfigAndUserSuppliedResourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) { + node0.UpdateConfig(func(cfg *config.Config) { + cfg.Swarm.ResourceMgr.Enabled = config.True + cfg.Swarm.ResourceMgr.Allowlist = []string{"/ip4/0.0.0.0/ipcidr/0/p2p/" + peerID2} + }) + node0.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { *overrides = rcmgr.PartialLimitConfig{ System: rcmgr.ResourceLimits{ Conns: rcmgr.BlockAllLimit, @@ -186,91 +198,97 @@ func TestRcmgr(t *testing.T) { ConnsOutbound: rcmgr.BlockAllLimit, }, } - cfg.Swarm.ResourceMgr.Enabled = config.True - cfg.Swarm.ResourceMgr.Allowlist = []string{"/ip4/0.0.0.0/ipcidr/0/p2p/" + peerID2} }) nodes.StartDaemons() - t.Parallel() - t.Run("node 0 should fail to connect to node 1", func(t *testing.T) { + t.Run("node 0 should fail to connect to and ping node 1", func(t *testing.T) { + t.Parallel() res := node0.Runner.Run(harness.RunRequest{ Path: node0.IPFSBin, - Args: []string{"swarm", "connect", node1.SwarmAddrs()[0].String()}, + Args: []string{"swarm", "connect", node1.SwarmAddrsWithPeerIDs()[0].String()}, }) assert.Equal(t, 1, res.ExitCode()) assert.Contains(t, res.Stderr.String(), "failed to find any peer in table") + + res = node0.RunIPFS("ping", "-n2", peerID1) + assert.Equal(t, 1, res.ExitCode()) + assert.Contains(t, res.Stderr.String(), "Error: ping failed") }) - t.Run("node 0 should connect to node 2 since it is allowlisted", func(t *testing.T) { + t.Run("node 0 should connect to and ping node 2 since it is allowlisted", func(t *testing.T) { + t.Parallel() res := node0.Runner.Run(harness.RunRequest{ Path: node0.IPFSBin, - Args: []string{"swarm", "connect", node2.SwarmAddrs()[0].String()}, + Args: []string{"swarm", "connect", node2.SwarmAddrsWithPeerIDs()[0].String()}, }) assert.Equal(t, 0, res.ExitCode()) - }) - t.Run("node 0 should fail to ping node 1", func(t *testing.T) { - res := node0.RunIPFS("ping", "-n2", peerID1) - assert.Equal(t, 1, res.ExitCode()) - assert.Contains(t, res.Stderr.String(), "Error: ping failed") - }) - - t.Run("node 0 should be able to ping node 2", func(t *testing.T) { - res := node0.RunIPFS("ping", "-n2", peerID2) + res = node0.RunIPFS("ping", "-n2", peerID2) assert.Equal(t, 0, res.ExitCode()) }) }) t.Run("daemon should refuse to start if connmgr.highwater < resources inbound", func(t *testing.T) { - t.Parallel() t.Run("system conns", func(t *testing.T) { + t.Parallel() node := harness.NewT(t).NewNode().Init() - node.UpdateConfigAndUserSuppliedResourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) { + node.UpdateConfig(func(cfg *config.Config) { + cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) + cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) + }) + node.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { *overrides = rcmgr.PartialLimitConfig{ System: rcmgr.ResourceLimits{Conns: 128}, } - cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) - cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) }) res := node.RunIPFS("daemon") assert.Equal(t, 1, res.ExitCode()) }) t.Run("system conns inbound", func(t *testing.T) { + t.Parallel() node := harness.NewT(t).NewNode().Init() - node.UpdateConfigAndUserSuppliedResourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) { + node.UpdateConfig(func(cfg *config.Config) { + cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) + cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) + }) + node.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { *overrides = rcmgr.PartialLimitConfig{ System: rcmgr.ResourceLimits{ConnsInbound: 128}, } - cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) - cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) }) res := node.RunIPFS("daemon") assert.Equal(t, 1, res.ExitCode()) }) t.Run("system streams", func(t *testing.T) { + t.Parallel() node := harness.NewT(t).NewNode().Init() - node.UpdateConfigAndUserSuppliedResourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) { + node.UpdateConfig(func(cfg *config.Config) { + cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) + cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) + }) + node.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { *overrides = rcmgr.PartialLimitConfig{ System: rcmgr.ResourceLimits{Streams: 128}, } - cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) - cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) }) res := node.RunIPFS("daemon") assert.Equal(t, 1, res.ExitCode()) }) t.Run("system streams inbound", func(t *testing.T) { + t.Parallel() node := harness.NewT(t).NewNode().Init() - node.UpdateConfigAndUserSuppliedResourceManagerOverrides(func(cfg *config.Config, overrides *rcmgr.PartialLimitConfig) { + node.UpdateConfig(func(cfg *config.Config) { + cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) + cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) + }) + node.UpdateUserSuppliedResourceManagerOverrides(func(overrides *rcmgr.PartialLimitConfig) { *overrides = rcmgr.PartialLimitConfig{ System: rcmgr.ResourceLimits{StreamsInbound: 128}, } - cfg.Swarm.ConnMgr.HighWater = config.NewOptionalInteger(128) - cfg.Swarm.ConnMgr.LowWater = config.NewOptionalInteger(64) }) res := node.RunIPFS("daemon") From e9e41c4023efb0dee3135619d5c4f6212e27c5f5 Mon Sep 17 00:00:00 2001 From: Gus Eggert Date: Wed, 8 Mar 2023 16:26:47 -0500 Subject: [PATCH 3/3] feat: add "autoclient" routing type This routing type is the same as "auto" but it creates the DHT in "client" mode and hence does not start a DHT server. --- cmd/ipfs/daemon.go | 61 ++++++++++++++----------- config/routing.go | 2 +- core/node/libp2p/routingopt.go | 5 +- docs/changelogs/v0.19.md | 6 +++ docs/config.md | 6 ++- test/cli/delegated_routing_http_test.go | 2 +- test/cli/dht_autoclient_test.go | 39 ++++++++++++++++ 7 files changed, 88 insertions(+), 33 deletions(-) create mode 100644 test/cli/dht_autoclient_test.go diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 21495c498fa..880d26b0e42 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -45,32 +45,33 @@ import ( ) const ( - adjustFDLimitKwd = "manage-fdlimit" - enableGCKwd = "enable-gc" - initOptionKwd = "init" - initConfigOptionKwd = "init-config" - initProfileOptionKwd = "init-profile" - ipfsMountKwd = "mount-ipfs" - ipnsMountKwd = "mount-ipns" - migrateKwd = "migrate" - mountKwd = "mount" - offlineKwd = "offline" // global option - routingOptionKwd = "routing" - routingOptionSupernodeKwd = "supernode" - routingOptionDHTClientKwd = "dhtclient" - routingOptionDHTKwd = "dht" - routingOptionDHTServerKwd = "dhtserver" - routingOptionNoneKwd = "none" - routingOptionCustomKwd = "custom" - routingOptionDefaultKwd = "default" - routingOptionAutoKwd = "auto" - unencryptTransportKwd = "disable-transport-encryption" - unrestrictedAPIAccessKwd = "unrestricted-api" - writableKwd = "writable" - enablePubSubKwd = "enable-pubsub-experiment" - enableIPNSPubSubKwd = "enable-namesys-pubsub" - enableMultiplexKwd = "enable-mplex-experiment" - agentVersionSuffix = "agent-version-suffix" + adjustFDLimitKwd = "manage-fdlimit" + enableGCKwd = "enable-gc" + initOptionKwd = "init" + initConfigOptionKwd = "init-config" + initProfileOptionKwd = "init-profile" + ipfsMountKwd = "mount-ipfs" + ipnsMountKwd = "mount-ipns" + migrateKwd = "migrate" + mountKwd = "mount" + offlineKwd = "offline" // global option + routingOptionKwd = "routing" + routingOptionSupernodeKwd = "supernode" + routingOptionDHTClientKwd = "dhtclient" + routingOptionDHTKwd = "dht" + routingOptionDHTServerKwd = "dhtserver" + routingOptionNoneKwd = "none" + routingOptionCustomKwd = "custom" + routingOptionDefaultKwd = "default" + routingOptionAutoKwd = "auto" + routingOptionAutoClientKwd = "autoclient" + unencryptTransportKwd = "disable-transport-encryption" + unrestrictedAPIAccessKwd = "unrestricted-api" + writableKwd = "writable" + enablePubSubKwd = "enable-pubsub-experiment" + enableIPNSPubSubKwd = "enable-namesys-pubsub" + enableMultiplexKwd = "enable-mplex-experiment" + agentVersionSuffix = "agent-version-suffix" // apiAddrKwd = "address-api" // swarmAddrKwd = "address-swarm" ) @@ -416,6 +417,14 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment cfg.Identity.PeerID, cfg.Addresses.Swarm, cfg.Identity.PrivKey, + libp2p.DHTOption, + ) + case routingOptionAutoClientKwd: + ncfg.Routing = libp2p.ConstructDefaultRouting( + cfg.Identity.PeerID, + cfg.Addresses.Swarm, + cfg.Identity.PrivKey, + libp2p.DHTClientOption, ) case routingOptionDHTClientKwd: ncfg.Routing = libp2p.DHTClientOption diff --git a/config/routing.go b/config/routing.go index f19414ff308..1210bb3cecc 100644 --- a/config/routing.go +++ b/config/routing.go @@ -10,7 +10,7 @@ import ( type Routing struct { // Type sets default daemon routing mode. // - // Can be one of "auto", "dht", "dhtclient", "dhtserver", "none", or "custom". + // Can be one of "auto", "autoclient", "dht", "dhtclient", "dhtserver", "none", or "custom". // When unset or set to "auto", DHT and implicit routers are used. // When "custom" is set, user-provided Routing.Routers is used. Type *OptionalString `json:",omitempty"` diff --git a/core/node/libp2p/routingopt.go b/core/node/libp2p/routingopt.go index bfb45971cc9..d54f37acc5c 100644 --- a/core/node/libp2p/routingopt.go +++ b/core/node/libp2p/routingopt.go @@ -40,7 +40,7 @@ func init() { } // ConstructDefaultRouting returns routers used when Routing.Type is unset or set to "auto" -func ConstructDefaultRouting(peerID string, addrs []string, privKey string) func( +func ConstructDefaultRouting(peerID string, addrs []string, privKey string, routingOpt RoutingOption) func( ctx context.Context, host host.Host, dstore datastore.Batching, @@ -58,8 +58,7 @@ func ConstructDefaultRouting(peerID string, addrs []string, privKey string) func // Different trade-offs can be made by setting Routing.Type = "custom" with own Routing.Routers var routers []*routinghelpers.ParallelRouter - // Run the default DHT routing (same as Routing.Type = "dht") - dhtRouting, err := DHTOption(ctx, host, dstore, validator, bootstrapPeers...) + dhtRouting, err := routingOpt(ctx, host, dstore, validator, bootstrapPeers...) if err != nil { return nil, err } diff --git a/docs/changelogs/v0.19.md b/docs/changelogs/v0.19.md index 7663308a6db..fede7a4547f 100644 --- a/docs/changelogs/v0.19.md +++ b/docs/changelogs/v0.19.md @@ -7,6 +7,7 @@ - [Overview](#overview) - [๐Ÿ”ฆ Highlights](#-highlights) - [Improving the libp2p resource management integration](#improving-the-libp2p-resource-management-integration) + - [Addition of "autoclient" router type](#addition-of-autoclient-router-type) - [๐Ÿ“ Changelog](#-changelog) - [๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors](#-contributors) @@ -22,6 +23,11 @@ and [0.18.1](https://github.com/ipfs/kubo/blob/master/docs/changelogs/v0.18.md#i - Note: we don't expect most users to need these capablities, but they are there if so. 1. [Doc updates](https://github.com/ipfs/kubo/blob/master/docs/libp2p-resource-management.md). +#### Addition of "autoclient" router type +A new routing type "autoclient" has been added. This mode is similar to "auto", in that it is a hybrid of content routers (including Kademlia and HTTP routers), but it does not run a DHT server. This is similar to the difference between "dhtclient" and "dht" router types. + +See the [Routing.Type documentation](https://github.com/ipfs/kubo/blob/master/docs/config.md#routingtype) for more information. + ### ๐Ÿ“ Changelog ### ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ Contributors diff --git a/docs/config.md b/docs/config.md index adf956ce01d..68666191802 100644 --- a/docs/config.md +++ b/docs/config.md @@ -1349,11 +1349,13 @@ Contains options for content, peer, and IPNS routing mechanisms. ### `Routing.Type` -There are multiple routing options: "auto", "none", "dht" and "custom". +There are multiple routing options: "auto", "autoclient", "none", "dht", "dhtclient", and "custom". * **DEFAULT:** If unset, or set to "auto", your node will use the IPFS DHT and parallel HTTP routers listed below for additional speed. +* If set to "autoclient", your node will behave as in "auto" but without running a DHT server. + * If set to "none", your node will use _no_ routing system. You'll have to explicitly connect to peers that have the content you're looking for. @@ -1379,7 +1381,7 @@ To force a specific DHT-only mode, client or server, set `Routing.Type` to `dhtclient` or `dhtserver` respectively. Please do not set this to `dhtserver` unless you're sure your node is reachable from the public network. -When `Routing.Type` is set to `auto` your node will accelerate some types of routing +When `Routing.Type` is set to `auto` or `autoclient` your node will accelerate some types of routing by leveraging HTTP endpoints compatible with [IPIP-337](https://github.com/ipfs/specs/pull/337) in addition to the IPFS DHT. By default, an instance of [IPNI](https://github.com/ipni/specs/blob/main/IPNI.md#readme) diff --git a/test/cli/delegated_routing_http_test.go b/test/cli/delegated_routing_http_test.go index 0b39a9b12e6..446ea515049 100644 --- a/test/cli/delegated_routing_http_test.go +++ b/test/cli/delegated_routing_http_test.go @@ -94,7 +94,7 @@ func TestHTTPDelegatedRouting(t *testing.T) { })) t.Cleanup(server.Close) - node.IPFS("config", "Routing.Type", "--json", `"custom"`) + node.IPFS("config", "Routing.Type", "custom") node.IPFS("config", "Routing.Routers.TestDelegatedRouter", "--json", ToJSONStr(JSONObj{ "Type": "http", "Parameters": JSONObj{ diff --git a/test/cli/dht_autoclient_test.go b/test/cli/dht_autoclient_test.go new file mode 100644 index 00000000000..749e34b348e --- /dev/null +++ b/test/cli/dht_autoclient_test.go @@ -0,0 +1,39 @@ +package cli + +import ( + "bytes" + "testing" + + "github.com/ipfs/kubo/test/cli/harness" + "github.com/ipfs/kubo/test/cli/testutils" + "github.com/stretchr/testify/assert" +) + +func TestDHTAutoclient(t *testing.T) { + t.Parallel() + nodes := harness.NewT(t).NewNodes(10).Init() + harness.Nodes(nodes[8:]).ForEachPar(func(node *harness.Node) { + node.IPFS("config", "Routing.Type", "autoclient") + }) + nodes.StartDaemons().Connect() + + t.Run("file added on node in client mode is retrievable from node in client mode", func(t *testing.T) { + t.Parallel() + randomBytes := testutils.RandomBytes(1000) + hash := nodes[8].IPFSAdd(bytes.NewReader(randomBytes)) + + res := nodes[9].IPFS("cat", hash) + assert.Equal(t, randomBytes, []byte(res.Stdout.Trimmed())) + }) + + t.Run("file added on node in server mode is retrievable from all nodes", func(t *testing.T) { + t.Parallel() + randomBytes := testutils.RandomBytes(1000) + hash := nodes[0].IPFSAdd(bytes.NewReader(randomBytes)) + + for i := 0; i < 10; i++ { + res := nodes[i].IPFS("cat", hash) + assert.Equal(t, randomBytes, []byte(res.Stdout.Trimmed())) + } + }) +}