Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Respect mux.ErrReset #10

Merged
merged 6 commits into from
Mar 7, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package sm_yamux

import (
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-yamux"
)

// conn implements mux.MuxedConn over yamux.Session.
type conn yamux.Session

// Close closes underlying yamux
func (c *conn) Close() error {
return c.yamux().Close()
}

// IsClosed checks if yamux.Session is in closed state.
func (c *conn) IsClosed() bool {
return c.yamux().IsClosed()
}

// OpenStream creates a new stream.
func (c *conn) OpenStream() (mux.MuxedStream, error) {
s, err := c.yamux().OpenStream()
if err != nil {
return nil, err
}

return (*stream)(s), nil
}

// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (mux.MuxedStream, error) {
s, err := c.yamux().AcceptStream()
return (*stream)(s), err
}

func (c *conn) yamux() *yamux.Session {
return (*yamux.Session)(c)
}

var _ mux.MuxedConn = &conn{}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
module github.com/libp2p/go-libp2p-yamux

go 1.13

require (
github.com/libp2p/go-libp2p-core v0.3.0
github.com/libp2p/go-libp2p-testing v0.1.1
Expand Down
48 changes: 48 additions & 0 deletions mux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package sm_yamux

import (
"io/ioutil"
"net"

mux "github.com/libp2p/go-libp2p-core/mux"
yamux "github.com/libp2p/go-yamux"
)

var DefaultTransport *Multiplexer
Wondertan marked this conversation as resolved.
Show resolved Hide resolved

func init() {
config := yamux.DefaultConfig()
// We've bumped this to 16MiB as this critically limits throughput.
//
// 1MiB means a best case of 10MiB/s (83.89Mbps) on a connection with
// 100ms latency. The default gave us 2.4MiB *best case* which was
// totally unacceptable.
config.MaxStreamWindowSize = uint32(16 * 1024 * 1024)
// don't spam
config.LogOutput = ioutil.Discard
// We always run over a security transport that buffers internally
// (i.e., uses a block cipher).
config.ReadBufSize = 0
DefaultTransport = (*Multiplexer)(config)
}

// Multiplexer implements mux.Multiplexer that constructs
// yamux-backed muxed connections.
type Multiplexer yamux.Config
Wondertan marked this conversation as resolved.
Show resolved Hide resolved

func (t *Multiplexer) NewConn(nc net.Conn, isServer bool) (mux.MuxedConn, error) {
var s *yamux.Session
var err error
if isServer {
s, err = yamux.Server(nc, t.Config())
} else {
s, err = yamux.Client(nc, t.Config())
}
return (*conn)(s), err
}

func (t *Multiplexer) Config() *yamux.Config {
return (*yamux.Config)(t)
}

var _ mux.Multiplexer = &Multiplexer{}
2 changes: 1 addition & 1 deletion yamux_test.go → mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ import (
tmux "github.com/libp2p/go-libp2p-testing/suites/mux"
)

func TestYamuxTransport(t *testing.T) {
func TestDefaultMultiplexer(t *testing.T) {
tmux.SubtestAll(t, DefaultTransport)
}
55 changes: 55 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package sm_yamux

import (
"time"

"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-yamux"
)

// stream implements mux.MuxedStream over yamux.Stream.
type stream yamux.Stream

func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.yamux().Read(b)
if err == yamux.ErrConnectionReset {
err = mux.ErrReset
}

return
}

func (s *stream) Write(b []byte) (n int, err error) {
n, err = s.yamux().Write(b)
if err == yamux.ErrConnectionReset {
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
err = mux.ErrReset
}

return
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *stream) Close() error {
return s.yamux().Close()
}

func (s *stream) Reset() error {
return s.yamux().Reset()
}

func (s *stream) SetDeadline(t time.Time) error {
return s.yamux().SetDeadline(t)
}

func (s *stream) SetReadDeadline(t time.Time) error {
return s.yamux().SetReadDeadline(t)
}

func (s *stream) SetWriteDeadline(t time.Time) error {
return s.yamux().SetWriteDeadline(t)
}

func (s *stream) yamux() *yamux.Stream {
return (*yamux.Stream)(s)
}

var _ mux.MuxedStream = &stream{}
77 changes: 0 additions & 77 deletions yamux.go

This file was deleted.