diff --git a/go.mod b/go.mod index 655228de67..73b53a828e 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/multiformats/go-multiaddr v0.5.0 github.com/multiformats/go-multiaddr-dns v0.3.1 github.com/multiformats/go-multihash v0.1.0 - github.com/multiformats/go-multistream v0.3.0 + github.com/multiformats/go-multistream v0.3.1 github.com/multiformats/go-varint v0.0.6 github.com/raulk/go-watchdog v1.2.0 github.com/stretchr/testify v1.7.0 diff --git a/go.sum b/go.sum index d67d449db7..9131b90117 100644 --- a/go.sum +++ b/go.sum @@ -640,8 +640,8 @@ github.com/multiformats/go-multihash v0.1.0 h1:CgAgwqk3//SVEw3T+6DqI4mWMyRuDwZtO github.com/multiformats/go-multihash v0.1.0/go.mod h1:RJlXsxt6vHGaia+S8We0ErjhojtKzPP2AH4+kYM7k84= github.com/multiformats/go-multistream v0.1.1/go.mod h1:KmHZ40hzVxiaiwlj3MEbYgK9JFk2/9UktWZAF54Du38= github.com/multiformats/go-multistream v0.2.1/go.mod h1:5GZPQZbkWOLOn3J2y4Y99vVW7vOfsAflxARk3x14o6k= -github.com/multiformats/go-multistream v0.3.0 h1:yX1v4IWseLPmr0rmnDo148wWJbNx40JxBZGmQb5fUP4= -github.com/multiformats/go-multistream v0.3.0/go.mod h1:ODRoqamLUsETKS9BNcII4gcRsJBU5VAwRIv7O39cEXg= +github.com/multiformats/go-multistream v0.3.1 h1:GQM84yyQ5EZB9l0p5+5eDwFoQgwHI2tLmYGpaWlLF/U= +github.com/multiformats/go-multistream v0.3.1/go.mod h1:ODRoqamLUsETKS9BNcII4gcRsJBU5VAwRIv7O39cEXg= github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.2/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index 6b797b13ac..5a5f039a58 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -376,7 +376,7 @@ func (h *BasicHost) newStreamHandler(s network.Stream) { } } - lzc, protoID, handle, err := h.Mux().NegotiateLazy(s) + protoID, handle, err := h.Mux().Negotiate(s) took := time.Since(before) if err != nil { if err == io.EOF { @@ -392,11 +392,6 @@ func (h *BasicHost) newStreamHandler(s network.Stream) { return } - s = &streamWrapper{ - Stream: s, - rw: lzc, - } - if h.negtimeout > 0 { if err := s.SetDeadline(time.Time{}); err != nil { log.Debugf("resetting stream deadline: ", err) diff --git a/p2p/net/mock/mock_test.go b/p2p/net/mock/mock_test.go index 44bfe561df..f99d82c9db 100644 --- a/p2p/net/mock/mock_test.go +++ b/p2p/net/mock/mock_test.go @@ -3,10 +3,8 @@ package mocknet import ( "bytes" "context" - "errors" "io" "math" - "math/rand" "sync" "testing" "time" @@ -311,100 +309,6 @@ func TestStreams(t *testing.T) { } -func performPing(t *testing.T, st string, n int, s network.Stream) error { - t.Helper() - - defer s.Close() - - for i := 0; i < n; i++ { - b := make([]byte, 4+len(st)) - if _, err := s.Write([]byte("ping" + st)); err != nil { - return err - } - if _, err := io.ReadFull(s, b); err != nil { - return err - } - if !bytes.Equal(b, []byte("pong"+st)) { - return errors.New("bytes mismatch") - } - } - return nil -} - -func TestStreamsStress(t *testing.T) { - ctx := context.Background() - nnodes := 100 - if race.WithRace() { - nnodes = 30 - } - - mn, err := FullMeshConnected(nnodes) - if err != nil { - t.Fatal(err) - } - defer mn.Close() - - errs := make(chan error) - - hosts := mn.Hosts() - var wg sync.WaitGroup - for _, h := range hosts { - h.SetStreamHandler(protocol.TestingID, func(s network.Stream) { - const st = "pingpong" - defer wg.Done() - defer s.Close() - - for { - b := make([]byte, 4+len(st)) - if _, err := io.ReadFull(s, b); err != nil { - if err == io.EOF { - return - } - errs <- err - } - if !bytes.Equal(b, []byte("ping"+st)) { - errs <- errors.New("bytes mismatch") - } - if _, err := s.Write([]byte("pong" + st)); err != nil { - errs <- err - } - } - }) - } - - for i := 0; i < 1000; i++ { - wg.Add(2) - go func(i int) { - defer wg.Done() - var from, to int - for from == to { - from = rand.Intn(len(hosts)) - to = rand.Intn(len(hosts)) - } - s, err := hosts[from].NewStream(ctx, hosts[to].ID(), protocol.TestingID) - if err != nil { - log.Debugf("%d (%s) %d (%s)", from, hosts[from], to, hosts[to]) - panic(err) - } - - log.Infof("%d start pinging", i) - errs <- performPing(t, "pingpong", rand.Intn(100), s) - log.Infof("%d done pinging", i) - }(i) - } - - go func() { - wg.Wait() - close(errs) - }() - - for err := range errs { - if err != nil { - t.Fatal(err) - } - } -} - func TestAdding(t *testing.T) { mn := New() defer mn.Close()