Skip to content

Commit

Permalink
Fix data leak between mux.cool connections (XTLS#3718)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmmray authored and leninalive committed Oct 29, 2024
1 parent bdca0d1 commit 4a088d1
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 1 deletion.
5 changes: 4 additions & 1 deletion common/mux/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.Bu
}

func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error {
// deep-clone outbounds because it is going to be mutated concurrently
// (Target and OriginalTarget)
ctx = session.ContextCloneOutbounds(ctx)
errors.LogInfo(ctx, "received request for ", meta.Target)
{
msg := &log.AccessMessage{
Expand Down Expand Up @@ -170,7 +173,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
b.Release()
mb = nil
}
errors.LogInfoInner(ctx, err,"XUDP hit ", meta.GlobalID)
errors.LogInfoInner(ctx, err, "XUDP hit ", meta.GlobalID)
}
if mb != nil {
ctx = session.ContextWithTimeoutOnly(ctx, true)
Expand Down
124 changes: 124 additions & 0 deletions common/mux/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package mux_test

import (
"context"
"testing"

"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/mux"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/pipe"
)

func newLinkPair() (*transport.Link, *transport.Link) {
opt := pipe.WithoutSizeLimit()
uplinkReader, uplinkWriter := pipe.New(opt)
downlinkReader, downlinkWriter := pipe.New(opt)

uplink := &transport.Link{
Reader: uplinkReader,
Writer: downlinkWriter,
}

downlink := &transport.Link{
Reader: downlinkReader,
Writer: uplinkWriter,
}

return uplink, downlink
}

type TestDispatcher struct {
OnDispatch func(ctx context.Context, dest net.Destination) (*transport.Link, error)
}

func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) {
return d.OnDispatch(ctx, dest)
}

func (d *TestDispatcher) DispatchLink(ctx context.Context, destination net.Destination, outbound *transport.Link) error {
return nil
}

func (d *TestDispatcher) Start() error {
return nil
}

func (d *TestDispatcher) Close() error {
return nil
}

func (*TestDispatcher) Type() interface{} {
return routing.DispatcherType()
}

func TestRegressionOutboundLeak(t *testing.T) {
originalOutbounds := []*session.Outbound{{}}
serverCtx := session.ContextWithOutbounds(context.Background(), originalOutbounds)

websiteUplink, websiteDownlink := newLinkPair()

dispatcher := TestDispatcher{
OnDispatch: func(ctx context.Context, dest net.Destination) (*transport.Link, error) {
// emulate what DefaultRouter.Dispatch does, and mutate something on the context
ob := session.OutboundsFromContext(ctx)[0]
ob.Target = dest
return websiteDownlink, nil
},
}

muxServerUplink, muxServerDownlink := newLinkPair()
_, err := mux.NewServerWorker(serverCtx, &dispatcher, muxServerUplink)
common.Must(err)

client, err := mux.NewClientWorker(*muxServerDownlink, mux.ClientStrategy{})
common.Must(err)

clientCtx := session.ContextWithOutbounds(context.Background(), []*session.Outbound{{
Target: net.TCPDestination(net.DomainAddress("www.example.com"), 80),
}})

muxClientUplink, muxClientDownlink := newLinkPair()

ok := client.Dispatch(clientCtx, muxClientUplink)
if !ok {
t.Error("failed to dispatch")
}

{
b := buf.FromBytes([]byte("hello"))
common.Must(muxClientDownlink.Writer.WriteMultiBuffer(buf.MultiBuffer{b}))
}

resMb, err := websiteUplink.Reader.ReadMultiBuffer()
common.Must(err)
res := resMb.String()
if res != "hello" {
t.Error("upload: ", res)
}

{
b := buf.FromBytes([]byte("world"))
common.Must(websiteUplink.Writer.WriteMultiBuffer(buf.MultiBuffer{b}))
}

resMb, err = muxClientDownlink.Reader.ReadMultiBuffer()
common.Must(err)
res = resMb.String()
if res != "world" {
t.Error("download: ", res)
}

outbounds := session.OutboundsFromContext(serverCtx)
if outbounds[0] != originalOutbounds[0] {
t.Error("outbound got reassigned: ", outbounds[0])
}

if outbounds[0].Target.Address != nil {
t.Error("outbound target got leaked: ", outbounds[0].Target.String())
}
}
16 changes: 16 additions & 0 deletions common/session/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,22 @@ func ContextWithOutbounds(ctx context.Context, outbounds []*Outbound) context.Co
return context.WithValue(ctx, outboundSessionKey, outbounds)
}

func ContextCloneOutbounds(ctx context.Context) context.Context {
outbounds := OutboundsFromContext(ctx)
newOutbounds := make([]*Outbound, len(outbounds))
for i, ob := range outbounds {
if ob == nil {
continue
}

// copy outbound by value
v := *ob
newOutbounds[i] = &v
}

return ContextWithOutbounds(ctx, newOutbounds)
}

func OutboundsFromContext(ctx context.Context) []*Outbound {
if outbounds, ok := ctx.Value(outboundSessionKey).([]*Outbound); ok {
return outbounds
Expand Down

0 comments on commit 4a088d1

Please sign in to comment.