From 35540bb7ff39b150846d56e73c97e39e0958a4cf Mon Sep 17 00:00:00 2001 From: mmmray <142015632+mmmray@users.noreply.github.com> Date: Wed, 21 Aug 2024 14:21:51 -0500 Subject: [PATCH 1/2] Fix data leak between mux.cool connections Fix https://github.com/XTLS/Xray-core/issues/116 This issue does not appear at all if mux.cool is disabled. Maybe the issue can be avoided if routing is disabled too, but it's possible that the same issue appears elsewhere. Tests should be added. ***The mux server has no tests at all!*** Step-by-step: 1. The mux server is calling handleStatusNew with the same context multiple times. That function then calls w.dispatcher.Dispatch 2. Dispatch assigns some values specific to the connection onto the outbound here: https://github.com/XTLS/Xray-core/blob/83eef6bc1f554be84aeb799417688a070cd32ab8/app/dispatcher/default.go#L241-L242 3. Because the outbounds array on the connection object is the same across multiple connections (it's the same context), the value of outbound.Target flips back and forth between values such as "1.1.1.1" or "chat.facebook.com" (IPAddress vs DomainAddress) 4. Many layers deeper in routing, as part of this Dispatch call, this function is called: https://github.com/XTLS/Xray-core/blob/83eef6bc1f554be84aeb799417688a070cd32ab8/features/routing/session/context.go#L53-L55 It does two things, in order: 1. Checks if Target is an IPAddress (IsIP) 2. If it's an IP, call IP() Because Target keeps mutating back and forth between multiple kinds of address, it sometimes is an IP for Step 1, and a DomainAddress for Step 2 5. Calling IP() on a DomainAddress panics: panic: Calling IP() on a DomainAddress. goroutine 123018 [running]: github.com/xtls/xray-core/common/net.domainAddress.IP(...) github.com/xtls/xray-core/common/net/address.go:172 github.com/xtls/xray-core/features/routing/session.(*Context).GetTargetIPs(0xc004175428) github.com/xtls/xray-core/features/routing/session/context.go:54 +0x5f github.com/xtls/xray-core/app/router.(*MultiGeoIPMatcher).Apply(0xc0003a63e0, {0x156c770?, 0xc004175428?}) github.com/xtls/xray-core/app/router/condition.go:143 +0x3e github.com/xtls/xray-core/app/router.(*ConditionChan).Apply(0x495c9d?, {0x156c770, 0xc004175428}) github.com/xtls/xray-core/app/router/condition.go:32 +0x5c github.com/xtls/xray-core/app/router.(*Rule).Apply(...) github.com/xtls/xray-core/app/router/config.go:30 github.com/xtls/xray-core/app/router.(*Router).pickRouteInternal(0xc000367a40, {0x156c770, 0xc004175428}) github.com/xtls/xray-core/app/router/router.go:196 +0x176 github.com/xtls/xray-core/app/router.(*Router).PickRoute(0x1562810?, {0x156c770?, 0xc004175428?}) github.com/xtls/xray-core/app/router/router.go:84 +0x25 github.com/xtls/xray-core/app/dispatcher.(*DefaultDispatcher).routedDispatch(0xc000398fc0, {0x1562810, 0xc0050cd350}, 0xc00418dea0, {{0x15625a8, 0xc005974c10}, 0x1bb, 0x2}) github.com/xtls/xray-core/app/dispatcher/default.go:420 +0x307 created by github.com/xtls/xray-core/app/dispatcher.(*DefaultDispatcher).Dispatch in goroutine 123015 github.com/xtls/xray-core/app/dispatcher/default.go:252 +0x56c --- common/mux/server.go | 5 +++-- common/session/context.go | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/common/mux/server.go b/common/mux/server.go index 5a4e9974d9ad..29de6566529b 100644 --- a/common/mux/server.go +++ b/common/mux/server.go @@ -170,7 +170,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, b.Release() mb = nil } - errors.LogInfoInner(ctx, err,"XUDP hit ", meta.GlobalID) + errors.LogInfoInner(ctx, err, "XUDP hit ", meta.GlobalID) } if mb != nil { ctx = session.ContextWithTimeoutOnly(ctx, true) @@ -286,7 +286,8 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead case SessionStatusEnd: err = w.handleStatusEnd(&meta, reader) case SessionStatusNew: - err = w.handleStatusNew(ctx, &meta, reader) + // clone outbounds because it is going to be mutated concurrently (Target and OriginalTarget) + err = w.handleStatusNew(session.ContextCloneOutbounds(ctx), &meta, reader) case SessionStatusKeep: err = w.handleStatusKeep(&meta, reader) default: diff --git a/common/session/context.go b/common/session/context.go index 3fed0151b829..b7af69cc35a6 100644 --- a/common/session/context.go +++ b/common/session/context.go @@ -40,6 +40,22 @@ func ContextWithOutbounds(ctx context.Context, outbounds []*Outbound) context.Co return context.WithValue(ctx, outboundSessionKey, outbounds) } +func ContextCloneOutbounds(ctx context.Context) context.Context { + outbounds := OutboundsFromContext(ctx) + newOutbounds := make([]*Outbound, len(outbounds)) + for i, ob := range outbounds { + if ob == nil { + continue + } + + // copy outbound by value + v := *ob + newOutbounds[i] = &v + } + + return ContextWithOutbounds(ctx, newOutbounds) +} + func OutboundsFromContext(ctx context.Context) []*Outbound { if outbounds, ok := ctx.Value(outboundSessionKey).([]*Outbound); ok { return outbounds From cb1c75f6ef0c622b5ebf18e0981a75cf7f21b652 Mon Sep 17 00:00:00 2001 From: mmmray <142015632+mmmray@users.noreply.github.com> Date: Sat, 24 Aug 2024 10:08:49 -0500 Subject: [PATCH 2/2] add test --- common/mux/server.go | 6 +- common/mux/server_test.go | 124 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 common/mux/server_test.go diff --git a/common/mux/server.go b/common/mux/server.go index 29de6566529b..480175ba069a 100644 --- a/common/mux/server.go +++ b/common/mux/server.go @@ -118,6 +118,9 @@ func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.Bu } func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error { + // deep-clone outbounds because it is going to be mutated concurrently + // (Target and OriginalTarget) + ctx = session.ContextCloneOutbounds(ctx) errors.LogInfo(ctx, "received request for ", meta.Target) { msg := &log.AccessMessage{ @@ -286,8 +289,7 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead case SessionStatusEnd: err = w.handleStatusEnd(&meta, reader) case SessionStatusNew: - // clone outbounds because it is going to be mutated concurrently (Target and OriginalTarget) - err = w.handleStatusNew(session.ContextCloneOutbounds(ctx), &meta, reader) + err = w.handleStatusNew(ctx, &meta, reader) case SessionStatusKeep: err = w.handleStatusKeep(&meta, reader) default: diff --git a/common/mux/server_test.go b/common/mux/server_test.go new file mode 100644 index 000000000000..4158bf46f19d --- /dev/null +++ b/common/mux/server_test.go @@ -0,0 +1,124 @@ +package mux_test + +import ( + "context" + "testing" + + "github.com/xtls/xray-core/common" + "github.com/xtls/xray-core/common/buf" + "github.com/xtls/xray-core/common/mux" + "github.com/xtls/xray-core/common/net" + "github.com/xtls/xray-core/common/session" + "github.com/xtls/xray-core/features/routing" + "github.com/xtls/xray-core/transport" + "github.com/xtls/xray-core/transport/pipe" +) + +func newLinkPair() (*transport.Link, *transport.Link) { + opt := pipe.WithoutSizeLimit() + uplinkReader, uplinkWriter := pipe.New(opt) + downlinkReader, downlinkWriter := pipe.New(opt) + + uplink := &transport.Link{ + Reader: uplinkReader, + Writer: downlinkWriter, + } + + downlink := &transport.Link{ + Reader: downlinkReader, + Writer: uplinkWriter, + } + + return uplink, downlink +} + +type TestDispatcher struct { + OnDispatch func(ctx context.Context, dest net.Destination) (*transport.Link, error) +} + +func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) { + return d.OnDispatch(ctx, dest) +} + +func (d *TestDispatcher) DispatchLink(ctx context.Context, destination net.Destination, outbound *transport.Link) error { + return nil +} + +func (d *TestDispatcher) Start() error { + return nil +} + +func (d *TestDispatcher) Close() error { + return nil +} + +func (*TestDispatcher) Type() interface{} { + return routing.DispatcherType() +} + +func TestRegressionOutboundLeak(t *testing.T) { + originalOutbounds := []*session.Outbound{{}} + serverCtx := session.ContextWithOutbounds(context.Background(), originalOutbounds) + + websiteUplink, websiteDownlink := newLinkPair() + + dispatcher := TestDispatcher{ + OnDispatch: func(ctx context.Context, dest net.Destination) (*transport.Link, error) { + // emulate what DefaultRouter.Dispatch does, and mutate something on the context + ob := session.OutboundsFromContext(ctx)[0] + ob.Target = dest + return websiteDownlink, nil + }, + } + + muxServerUplink, muxServerDownlink := newLinkPair() + _, err := mux.NewServerWorker(serverCtx, &dispatcher, muxServerUplink) + common.Must(err) + + client, err := mux.NewClientWorker(*muxServerDownlink, mux.ClientStrategy{}) + common.Must(err) + + clientCtx := session.ContextWithOutbounds(context.Background(), []*session.Outbound{{ + Target: net.TCPDestination(net.DomainAddress("www.example.com"), 80), + }}) + + muxClientUplink, muxClientDownlink := newLinkPair() + + ok := client.Dispatch(clientCtx, muxClientUplink) + if !ok { + t.Error("failed to dispatch") + } + + { + b := buf.FromBytes([]byte("hello")) + common.Must(muxClientDownlink.Writer.WriteMultiBuffer(buf.MultiBuffer{b})) + } + + resMb, err := websiteUplink.Reader.ReadMultiBuffer() + common.Must(err) + res := resMb.String() + if res != "hello" { + t.Error("upload: ", res) + } + + { + b := buf.FromBytes([]byte("world")) + common.Must(websiteUplink.Writer.WriteMultiBuffer(buf.MultiBuffer{b})) + } + + resMb, err = muxClientDownlink.Reader.ReadMultiBuffer() + common.Must(err) + res = resMb.String() + if res != "world" { + t.Error("download: ", res) + } + + outbounds := session.OutboundsFromContext(serverCtx) + if outbounds[0] != originalOutbounds[0] { + t.Error("outbound got reassigned: ", outbounds[0]) + } + + if outbounds[0].Target.Address != nil { + t.Error("outbound target got leaked: ", outbounds[0].Target.String()) + } +}