Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve addresses when creating a new stream #1342

Merged
merged 2 commits into from
Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,17 @@ func (h *BasicHost) RemoveStreamHandler(pid protocol.ID) {
// to create one. If ProtocolID is "", writes no header.
// (Threadsafe)
func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error) {
// Ensure we have a connection, with peer addresses resolved by the routing system (#207)
// It is not sufficient to let the underlying host connect, it will most likely not have
// any addresses for the peer without any prior connections.
// If the caller wants to prevent the host from dialing, it should use the NoDial option.
if nodial, _ := network.GetNoDial(ctx); !nodial {
err := h.Connect(ctx, peer.AddrInfo{ID: p})
if err != nil {
return nil, err
}
}

s, err := h.Network().NewStream(ctx, p)
if err != nil {
return nil, err
Expand Down
46 changes: 46 additions & 0 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"io/ioutil"
"reflect"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -439,6 +440,51 @@ func TestNewDialOld(t *testing.T) {
require.Equal(t, s.Protocol(), protocol.ID("/testing"), "should have gotten /testing")
}

func TestNewStreamResolve(t *testing.T) {
h1, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h2, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// Get the tcp port that h2 is listening on.
h2pi := h2.Peerstore().PeerInfo(h2.ID())
var dialAddr string
const tcpPrefix = "/ip4/127.0.0.1/tcp/"
for _, addr := range h2pi.Addrs {
addrStr := addr.String()
if strings.HasPrefix(addrStr, tcpPrefix) {
port := addrStr[len(tcpPrefix):]
dialAddr = "/dns4/localhost/tcp/" + port
break
}
}
assert.NotEqual(t, dialAddr, "")

// Add the DNS multiaddr to h1's peerstore.
maddr, err := ma.NewMultiaddr(dialAddr)
require.NoError(t, err)
h1.Peerstore().AddAddr(h2.ID(), maddr, time.Second)

connectedOn := make(chan protocol.ID)
h2.SetStreamHandler("/testing", func(s network.Stream) {
connectedOn <- s.Protocol()
s.Close()
})

// NewStream will make a new connection using the DNS address in h1's
// peerstore.
s, err := h1.NewStream(ctx, h2.ID(), "/testing/1.0.0", "/testing")
require.NoError(t, err)

// force the lazy negotiation to complete
_, err = s.Write(nil)
require.NoError(t, err)
assertWait(t, connectedOn, "/testing")
}

func TestProtoDowngrade(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down