From 63319f6cea9afc405b76f7e0f8093a6c5b960414 Mon Sep 17 00:00:00 2001 From: Hlib Date: Fri, 28 Feb 2020 17:55:09 +0200 Subject: [PATCH 1/6] restructure files layout --- conn.go | 41 +++++++++++++++++++ mux.go | 48 ++++++++++++++++++++++ yamux_test.go => mux_test.go | 2 +- yamux.go | 77 ------------------------------------ 4 files changed, 90 insertions(+), 78 deletions(-) create mode 100644 conn.go create mode 100644 mux.go rename yamux_test.go => mux_test.go (75%) delete mode 100644 yamux.go diff --git a/conn.go b/conn.go new file mode 100644 index 0000000..6c39cc2 --- /dev/null +++ b/conn.go @@ -0,0 +1,41 @@ +package sm_yamux + +import ( + "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-yamux" +) + +// conn implements mux.MuxedConn over yamux.Session. +type conn yamux.Session + +// Close closes underlying yamux +func (c *conn) Close() error { + return c.yamux().Close() +} + +// IsClosed checks if yamux.Session is in closed state. +func (c *conn) IsClosed() bool { + return c.yamux().IsClosed() +} + +// OpenStream creates a new stream. +func (c *conn) OpenStream() (mux.MuxedStream, error) { + s, err := c.yamux().OpenStream() + if err != nil { + return nil, err + } + + return (*stream)(s), nil +} + +// AcceptStream accepts a stream opened by the other side. +func (c *conn) AcceptStream() (mux.MuxedStream, error) { + s, err := c.yamux().AcceptStream() + return (*stream)(s), err +} + +func (c *conn) yamux() *yamux.Session { + return (*yamux.Session)(c) +} + +var _ mux.MuxedConn = &conn{} diff --git a/mux.go b/mux.go new file mode 100644 index 0000000..f0c1af2 --- /dev/null +++ b/mux.go @@ -0,0 +1,48 @@ +package sm_yamux + +import ( + "io/ioutil" + "net" + + mux "github.com/libp2p/go-libp2p-core/mux" + yamux "github.com/libp2p/go-yamux" +) + +var DefaultTransport *Multiplexer + +func init() { + config := yamux.DefaultConfig() + // We've bumped this to 16MiB as this critically limits throughput. + // + // 1MiB means a best case of 10MiB/s (83.89Mbps) on a connection with + // 100ms latency. The default gave us 2.4MiB *best case* which was + // totally unacceptable. + config.MaxStreamWindowSize = uint32(16 * 1024 * 1024) + // don't spam + config.LogOutput = ioutil.Discard + // We always run over a security transport that buffers internally + // (i.e., uses a block cipher). + config.ReadBufSize = 0 + DefaultTransport = (*Multiplexer)(config) +} + +// Multiplexer implements mux.Multiplexer that constructs +// yamux-backed muxed connections. +type Multiplexer yamux.Config + +func (t *Multiplexer) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { + var s *yamux.Session + var err error + if isServer { + s, err = yamux.Server(nc, t.Config()) + } else { + s, err = yamux.Client(nc, t.Config()) + } + return (*conn)(s), err +} + +func (t *Multiplexer) Config() *yamux.Config { + return (*yamux.Config)(t) +} + +var _ mux.Multiplexer = &Multiplexer{} diff --git a/yamux_test.go b/mux_test.go similarity index 75% rename from yamux_test.go rename to mux_test.go index 409b096..d3a9b4f 100644 --- a/yamux_test.go +++ b/mux_test.go @@ -6,6 +6,6 @@ import ( tmux "github.com/libp2p/go-libp2p-testing/suites/mux" ) -func TestYamuxTransport(t *testing.T) { +func TestDefaultMultiplexer(t *testing.T) { tmux.SubtestAll(t, DefaultTransport) } diff --git a/yamux.go b/yamux.go deleted file mode 100644 index 4875761..0000000 --- a/yamux.go +++ /dev/null @@ -1,77 +0,0 @@ -package sm_yamux - -import ( - "io/ioutil" - "net" - - mux "github.com/libp2p/go-libp2p-core/mux" - yamux "github.com/libp2p/go-yamux" -) - -// Conn is a connection to a remote peer. -type conn yamux.Session - -func (c *conn) yamuxSession() *yamux.Session { - return (*yamux.Session)(c) -} - -func (c *conn) Close() error { - return c.yamuxSession().Close() -} - -func (c *conn) IsClosed() bool { - return c.yamuxSession().IsClosed() -} - -// OpenStream creates a new stream. -func (c *conn) OpenStream() (mux.MuxedStream, error) { - s, err := c.yamuxSession().OpenStream() - if err != nil { - return nil, err - } - - return s, nil -} - -// AcceptStream accepts a stream opened by the other side. -func (c *conn) AcceptStream() (mux.MuxedStream, error) { - s, err := c.yamuxSession().AcceptStream() - return s, err -} - -// Transport is a go-peerstream transport that constructs -// yamux-backed connections. -type Transport yamux.Config - -var DefaultTransport *Transport - -func init() { - config := yamux.DefaultConfig() - // We've bumped this to 16MiB as this critically limits throughput. - // - // 1MiB means a best case of 10MiB/s (83.89Mbps) on a connection with - // 100ms latency. The default gave us 2.4MiB *best case* which was - // totally unacceptable. - config.MaxStreamWindowSize = uint32(16 * 1024 * 1024) - // don't spam - config.LogOutput = ioutil.Discard - // We always run over a security transport that buffers internally - // (i.e., uses a block cipher). - config.ReadBufSize = 0 - DefaultTransport = (*Transport)(config) -} - -func (t *Transport) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { - var s *yamux.Session - var err error - if isServer { - s, err = yamux.Server(nc, t.Config()) - } else { - s, err = yamux.Client(nc, t.Config()) - } - return (*conn)(s), err -} - -func (t *Transport) Config() *yamux.Config { - return (*yamux.Config)(t) -} From ed2b3419d29cc05ef223779a4ea2c8143fc56de8 Mon Sep 17 00:00:00 2001 From: Hlib Date: Fri, 28 Feb 2020 17:56:44 +0200 Subject: [PATCH 2/6] redefine yamux stream to implement mux.MuxedStream and to respect mux.ErrReset --- stream.go | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 stream.go diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..866b781 --- /dev/null +++ b/stream.go @@ -0,0 +1,55 @@ +package sm_yamux + +import ( + "time" + + "github.com/libp2p/go-libp2p-core/mux" + "github.com/libp2p/go-yamux" +) + +// stream implements mux.MuxedStream over yamux.Stream. +type stream yamux.Stream + +func (s *stream) Read(b []byte) (n int, err error) { + n, err = s.yamux().Read(b) + if err == yamux.ErrConnectionReset { + err = mux.ErrReset + } + + return +} + +func (s *stream) Write(b []byte) (n int, err error) { + n, err = s.yamux().Write(b) + if err == yamux.ErrConnectionReset { + err = mux.ErrReset + } + + return +} + +func (s *stream) Close() error { + return s.yamux().Close() +} + +func (s *stream) Reset() error { + return s.yamux().Reset() +} + +func (s *stream) SetDeadline(t time.Time) error { + return s.yamux().SetDeadline(t) +} + +func (s *stream) SetReadDeadline(t time.Time) error { + return s.yamux().SetReadDeadline(t) +} + +func (s *stream) SetWriteDeadline(t time.Time) error { + return s.yamux().SetWriteDeadline(t) +} + +func (s *stream) yamux() *yamux.Stream { + return (*yamux.Stream)(s) +} + +var _ mux.MuxedStream = &stream{} From e85de7c4951fe90e93db2d9a50dce5925b25bb87 Mon Sep 17 00:00:00 2001 From: Hlib Date: Fri, 28 Feb 2020 17:57:02 +0200 Subject: [PATCH 3/6] add go version --- go.mod | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go.mod b/go.mod index ba595a0..418f594 100644 --- a/go.mod +++ b/go.mod @@ -1,5 +1,7 @@ module github.com/libp2p/go-libp2p-yamux +go 1.13 + require ( github.com/libp2p/go-libp2p-core v0.3.0 github.com/libp2p/go-libp2p-testing v0.1.1 From d2ac631fe16c5922f5d7495807dd65338c5df515 Mon Sep 17 00:00:00 2001 From: Hlib Date: Fri, 28 Feb 2020 20:12:16 +0200 Subject: [PATCH 4/6] revert Multiplexer name back to Transport --- mux.go => transport.go | 14 +++++++------- mux_test.go => transport_test.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) rename mux.go => transport.go (72%) rename mux_test.go => transport_test.go (75%) diff --git a/mux.go b/transport.go similarity index 72% rename from mux.go rename to transport.go index f0c1af2..9b33f81 100644 --- a/mux.go +++ b/transport.go @@ -8,7 +8,7 @@ import ( yamux "github.com/libp2p/go-yamux" ) -var DefaultTransport *Multiplexer +var DefaultTransport *Transport func init() { config := yamux.DefaultConfig() @@ -23,14 +23,14 @@ func init() { // We always run over a security transport that buffers internally // (i.e., uses a block cipher). config.ReadBufSize = 0 - DefaultTransport = (*Multiplexer)(config) + DefaultTransport = (*Transport)(config) } -// Multiplexer implements mux.Multiplexer that constructs +// Transport implements mux.Multiplexer that constructs // yamux-backed muxed connections. -type Multiplexer yamux.Config +type Transport yamux.Config -func (t *Multiplexer) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { +func (t *Transport) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) { var s *yamux.Session var err error if isServer { @@ -41,8 +41,8 @@ func (t *Multiplexer) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) return (*conn)(s), err } -func (t *Multiplexer) Config() *yamux.Config { +func (t *Transport) Config() *yamux.Config { return (*yamux.Config)(t) } -var _ mux.Multiplexer = &Multiplexer{} +var _ mux.Multiplexer = &Transport{} diff --git a/mux_test.go b/transport_test.go similarity index 75% rename from mux_test.go rename to transport_test.go index d3a9b4f..0499971 100644 --- a/mux_test.go +++ b/transport_test.go @@ -6,6 +6,6 @@ import ( tmux "github.com/libp2p/go-libp2p-testing/suites/mux" ) -func TestDefaultMultiplexer(t *testing.T) { +func TestDefaultTransport(t *testing.T) { tmux.SubtestAll(t, DefaultTransport) } From 2142c3d848f782c8a62677147d86151bb09ddce5 Mon Sep 17 00:00:00 2001 From: Hlib Date: Mon, 2 Mar 2020 12:32:40 +0200 Subject: [PATCH 5/6] use explicit return values --- stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index 866b781..9f6e4f9 100644 --- a/stream.go +++ b/stream.go @@ -16,7 +16,7 @@ func (s *stream) Read(b []byte) (n int, err error) { err = mux.ErrReset } - return + return n, err } func (s *stream) Write(b []byte) (n int, err error) { @@ -25,7 +25,7 @@ func (s *stream) Write(b []byte) (n int, err error) { err = mux.ErrReset } - return + return n, err } func (s *stream) Close() error { From 00f83670424cbbf552d9d0b547939f8a55f12b2d Mon Sep 17 00:00:00 2001 From: Hlib Date: Tue, 3 Mar 2020 12:19:56 +0200 Subject: [PATCH 6/6] update go-yamux --- go.mod | 2 +- go.sum | 2 ++ stream.go | 4 ++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 418f594..199202a 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,5 @@ go 1.13 require ( github.com/libp2p/go-libp2p-core v0.3.0 github.com/libp2p/go-libp2p-testing v0.1.1 - github.com/libp2p/go-yamux v1.2.4 + github.com/libp2p/go-yamux v1.3.0 ) diff --git a/go.sum b/go.sum index 3060007..b0c4a13 100644 --- a/go.sum +++ b/go.sum @@ -81,6 +81,8 @@ github.com/libp2p/go-yamux v1.2.3 h1:xX8A36vpXb59frIzWFdEgptLMsOANMFq2K7fPRlunYI github.com/libp2p/go-yamux v1.2.3/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow= github.com/libp2p/go-yamux v1.2.4 h1:tSkZdyEEwA++MeJ+r3bxIVpwsF6ygPmOs3xaIjrgJCw= github.com/libp2p/go-yamux v1.2.4/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow= +github.com/libp2p/go-yamux v1.3.0 h1:FsYzT16Wq2XqUGJsBbOxoz9g+dFklvNi7jN6YFPfl7U= +github.com/libp2p/go-yamux v1.3.0/go.mod h1:FGTiPvoV/3DVdgWpX+tM0OW3tsM+W5bSE3gZwqQTcow= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329 h1:2gxZ0XQIU/5z3Z3bUBu+FXuk2pFbkN6tcwi/pjyaDic= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= diff --git a/stream.go b/stream.go index 9f6e4f9..d5b865b 100644 --- a/stream.go +++ b/stream.go @@ -12,7 +12,7 @@ type stream yamux.Stream func (s *stream) Read(b []byte) (n int, err error) { n, err = s.yamux().Read(b) - if err == yamux.ErrConnectionReset { + if err == yamux.ErrStreamReset { err = mux.ErrReset } @@ -21,7 +21,7 @@ func (s *stream) Read(b []byte) (n int, err error) { func (s *stream) Write(b []byte) (n int, err error) { n, err = s.yamux().Write(b) - if err == yamux.ErrConnectionReset { + if err == yamux.ErrStreamReset { err = mux.ErrReset }