Skip to content

Commit

Permalink
Merge 890da60 into e0354b9
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiyipanuber authored Aug 21, 2023
2 parents e0354b9 + 890da60 commit ff84931
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 18 deletions.
19 changes: 17 additions & 2 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ type ChannelOptions struct {
// This is an unstable API - breaking changes are likely.
RelayTimerVerification bool

// EnableMPTCP enables MPTCP for TCP network connection to increase reliability.
// It requires underlying operating system support MPTCP.
// If EnableMPTCP is false or no MPTCP support, the connection will use normal TCP.
// It's set to false by default.
// If a Dialer is passed as option, this value will be ignored.
EnableMPTCP bool

// The reporter to use for reporting stats for this channel.
StatsReporter StatsReporter

Expand Down Expand Up @@ -184,6 +191,7 @@ type Channel struct {
relayMaxConnTimeout time.Duration
relayMaxTombs uint64
relayTimerVerify bool
enableMPTCP bool
internalHandlers *handlerMap
handler Handler
onPeerStatusChanged func(*Peer)
Expand Down Expand Up @@ -275,8 +283,12 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
return nil, err
}

// Default to dialContext if dialer is not passed in as an option
// Default to dialContext or dialMPTCPContex
// if dialer is not passed in as an option
dialCtx := dialContext
if opts.EnableMPTCP {
dialCtx = dialMPTCPContext
}
if opts.Dialer != nil {
dialCtx = func(ctx context.Context, hostPort string) (net.Conn, error) {
return opts.Dialer(ctx, "tcp", hostPort)
Expand Down Expand Up @@ -306,6 +318,7 @@ func NewChannel(serviceName string, opts *ChannelOptions) (*Channel, error) {
relayMaxConnTimeout: opts.RelayMaxConnectionTimeout,
relayMaxTombs: opts.RelayMaxTombs,
relayTimerVerify: opts.RelayTimerVerification,
enableMPTCP: opts.EnableMPTCP,
dialer: dialCtx,
connContext: opts.ConnContext,
closed: make(chan struct{}),
Expand Down Expand Up @@ -402,7 +415,9 @@ func (ch *Channel) ListenAndServe(hostPort string) error {
return errAlreadyListening
}

l, err := net.Listen("tcp", hostPort)
lc := net.ListenConfig{}
lc.SetMultipathTCP(ch.enableMPTCP)
l, err := lc.Listen(context.Background(), "tcp", hostPort)
if err != nil {
mutable.RUnlock()
return err
Expand Down
35 changes: 19 additions & 16 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,27 @@ func toMap(fields LogFields) map[string]interface{} {
}

func TestNewChannel(t *testing.T) {
ch, err := NewChannel("svc", &ChannelOptions{
ProcessName: "pname",
})
require.NoError(t, err, "NewChannel failed")

assert.Equal(t, LocalPeerInfo{
ServiceName: "svc",
PeerInfo: PeerInfo{
for _, mptcp := range []bool{true, false} {
ch, err := NewChannel("svc", &ChannelOptions{
ProcessName: "pname",
HostPort: ephemeralHostPort,
IsEphemeral: true,
Version: PeerVersion{
Language: "go",
LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"),
TChannelVersion: VersionInfo,
EnableMPTCP: mptcp,
})
require.NoError(t, err, "NewChannel failed")

assert.Equal(t, LocalPeerInfo{
ServiceName: "svc",
PeerInfo: PeerInfo{
ProcessName: "pname",
HostPort: ephemeralHostPort,
IsEphemeral: true,
Version: PeerVersion{
Language: "go",
LanguageVersion: strings.TrimPrefix(runtime.Version(), "go"),
TChannelVersion: VersionInfo,
},
},
},
}, ch.PeerInfo(), "Wrong local peer info")
}, ch.PeerInfo(), "Wrong local peer info")
}
}

func TestLoggers(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions dial_17.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,9 @@ func dialContext(ctx context.Context, hostPort string) (net.Conn, error) {
d := net.Dialer{}
return d.DialContext(ctx, "tcp", hostPort)
}

func dialMPTCPContext(ctx context.Context, hostPort string) (net.Conn, error) {
d := net.Dialer{}
d.SetMultipathTCP(true)
return d.DialContext(ctx, "tcp", hostPort)

Check warning on line 39 in dial_17.go

View check run for this annotation

Codecov / codecov/patch

dial_17.go#L36-L39

Added lines #L36 - L39 were not covered by tests
}

0 comments on commit ff84931

Please sign in to comment.