Skip to content

Commit

Permalink
Enable splice for freedom outbound (downlink only)
Browse files Browse the repository at this point in the history
- Add outbound name
- Add outbound conn in ctx
- Refactor splice: it can be turn on from all inbounds and outbounds
- Refactor splice: Add splice copy to vless inbound
- Fix http error test
- Add freedom splice toggle via env var
- Populate outbound obj in context
- Use CanSpliceCopy to mark a connection
- Turn off splice by default
  • Loading branch information
yuhan6665 committed Sep 7, 2023
1 parent ae2fa30 commit efd32b0
Show file tree
Hide file tree
Showing 32 changed files with 282 additions and 168 deletions.
20 changes: 12 additions & 8 deletions app/dispatcher/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,13 @@ func (d *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
if !destination.IsValid() {
panic("Dispatcher: Invalid destination.")
}
ob := &session.Outbound{
OriginalTarget: destination,
Target: destination,
ob := session.OutboundFromContext(ctx)
if ob == nil {
ob = &session.Outbound{}
ctx = session.ContextWithOutbound(ctx, ob)
}
ctx = session.ContextWithOutbound(ctx, ob)
ob.OriginalTarget = destination
ob.Target = destination
content := session.ContentFromContext(ctx)
if content == nil {
content = new(session.Content)
Expand Down Expand Up @@ -271,11 +273,13 @@ func (d *DefaultDispatcher) DispatchLink(ctx context.Context, destination net.De
if !destination.IsValid() {
return newError("Dispatcher: Invalid destination.")
}
ob := &session.Outbound{
OriginalTarget: destination,
Target: destination,
ob := session.OutboundFromContext(ctx)
if ob == nil {
ob = &session.Outbound{}
ctx = session.ContextWithOutbound(ctx, ob)
}
ctx = session.ContextWithOutbound(ctx, ob)
ob.OriginalTarget = destination
ob.Target = destination
content := session.ContentFromContext(ctx)
if content == nil {
content = new(session.Content)
Expand Down
6 changes: 3 additions & 3 deletions app/proxyman/inbound/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (w *tcpWorker) callback(conn stat.Connection) {
sid := session.NewID()
ctx = session.ContextWithID(ctx, sid)

var outbound = &session.Outbound{}
if w.recvOrigDest {
var dest net.Destination
switch getTProxyType(w.stream) {
Expand All @@ -74,11 +75,10 @@ func (w *tcpWorker) callback(conn stat.Connection) {
dest = net.DestinationFromAddr(conn.LocalAddr())
}
if dest.IsValid() {
ctx = session.ContextWithOutbound(ctx, &session.Outbound{
Target: dest,
})
outbound.Target = dest
}
}
ctx = session.ContextWithOutbound(ctx, outbound)

if w.uplinkCounter != nil || w.downlinkCounter != nil {
conn = &stat.CounterConnection{
Expand Down
7 changes: 6 additions & 1 deletion app/proxyman/outbound/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,12 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (stat.Connecti
}

conn, err := internet.Dial(ctx, dest, h.streamSettings)
return h.getStatCouterConnection(conn), err
conn = h.getStatCouterConnection(conn)
outbound := session.OutboundFromContext(ctx)
if outbound != nil {
outbound.Conn = conn
}
return conn, err
}

func (h *Handler) getStatCouterConnection(conn stat.Connection) stat.Connection {
Expand Down
12 changes: 12 additions & 0 deletions common/buf/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/features/stats"
)

type dataHandler func(MultiBuffer)
Expand Down Expand Up @@ -40,6 +41,17 @@ func CountSize(sc *SizeCounter) CopyOption {
}
}

// AddToStatCounter a CopyOption add to stat counter
func AddToStatCounter(sc stats.Counter) CopyOption {
return func(handler *copyHandler) {
handler.onData = append(handler.onData, func(b MultiBuffer) {
if sc != nil {
sc.Add(int64(b.Len()))
}
})
}
}

type readError struct {
error
}
Expand Down
14 changes: 14 additions & 0 deletions common/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@ type Inbound struct {
Conn net.Conn
// Timer of the inbound buf copier. May be nil.
Timer *signal.ActivityTimer
// CanSpliceCopy is a property for this connection, set by both inbound and outbound
// 1 = can, 2 = after processing protocol info should be able to, 3 = cannot
CanSpliceCopy int
}

func(i *Inbound) SetCanSpliceCopy(canSpliceCopy int) int {
if canSpliceCopy > i.CanSpliceCopy {
i.CanSpliceCopy = canSpliceCopy
}
return i.CanSpliceCopy
}

// Outbound is the metadata of an outbound connection.
Expand All @@ -60,6 +70,10 @@ type Outbound struct {
RouteTarget net.Destination
// Gateway address
Gateway net.Address
// Name of the outbound proxy that handles the connection.
Name string
// Conn is actually internet.Connection. May be nil. It is currently nil for outbound with proxySettings
Conn net.Conn
}

// SniffingRequest controls the behavior of content sniffing.
Expand Down
6 changes: 6 additions & 0 deletions proxy/blackhole/blackhole.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/internet"
)
Expand All @@ -30,6 +31,11 @@ func New(ctx context.Context, config *Config) (*Handler, error) {

// Process implements OutboundHandler.Dispatch().
func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
outbound := session.OutboundFromContext(ctx)
if outbound != nil {
outbound.Name = "blackhole"
}

nBytes := h.response.WriteTo(link.Writer)
if nBytes > 0 {
// Sleep a little here to make sure the response is sent to client.
Expand Down
1 change: 1 addition & 0 deletions proxy/dns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, d internet.
if outbound == nil || !outbound.Target.IsValid() {
return newError("invalid outbound")
}
outbound.Name = "dns"

srcNetwork := outbound.Target.Network

Expand Down
9 changes: 4 additions & 5 deletions proxy/dokodemo/dokodemo.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn st
}

inbound := session.InboundFromContext(ctx)
if inbound != nil {
inbound.Name = "dokodemo-door"
inbound.User = &protocol.MemoryUser{
Level: d.config.UserLevel,
}
inbound.Name = "dokodemo-door"
inbound.SetCanSpliceCopy(1)
inbound.User = &protocol.MemoryUser{
Level: d.config.UserLevel,
}

ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{
Expand Down
9 changes: 9 additions & 0 deletions proxy/errors.generated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package proxy

import "github.com/xtls/xray-core/common/errors"

type errPathObjHolder struct{}

func newError(values ...interface{}) *errors.Error {
return errors.New(values...).WithPathObj(errPathObjHolder{})
}
27 changes: 21 additions & 6 deletions proxy/freedom/freedom.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/dice"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/platform"
"github.com/xtls/xray-core/common/retry"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
Expand All @@ -21,11 +22,14 @@ import (
"github.com/xtls/xray-core/features/dns"
"github.com/xtls/xray-core/features/policy"
"github.com/xtls/xray-core/features/stats"
"github.com/xtls/xray-core/proxy"
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/stat"
)

var useSplice bool

func init() {
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
h := new(Handler)
Expand All @@ -36,6 +40,12 @@ func init() {
}
return h, nil
}))
const defaultFlagValue = "NOT_DEFINED_AT_ALL"
value := platform.NewEnvFlag("xray.buf.splice").GetValue(func() string { return defaultFlagValue })
switch value {
case "auto", "enable":
useSplice = true
}
}

// Handler handles Freedom connections.
Expand Down Expand Up @@ -107,6 +117,11 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified.")
}
outbound.Name = "freedom"
inbound := session.InboundFromContext(ctx)
if inbound != nil {
inbound.SetCanSpliceCopy(1)
}
destination := outbound.Target
UDPOverride := net.UDPDestination(nil, 0)
if h.config.DestinationOverride != nil {
Expand Down Expand Up @@ -195,17 +210,17 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte

responseDone := func() error {
defer timer.SetTimeout(plcy.Timeouts.UplinkOnly)

var reader buf.Reader
if destination.Network == net.Network_TCP {
reader = buf.NewReader(conn)
} else {
reader = NewPacketReader(conn, UDPOverride)
var writeConn net.Conn
if inbound := session.InboundFromContext(ctx); inbound != nil && inbound.Conn != nil && useSplice {
writeConn = inbound.Conn
}
return proxy.CopyRawConnIfExist(ctx, conn, writeConn, link.Writer, timer)
}
reader := NewPacketReader(conn, UDPOverride)
if err := buf.Copy(reader, output, buf.UpdateActivity(timer)); err != nil {
return newError("failed to process response").Base(err)
}

return nil
}

Expand Down
5 changes: 5 additions & 0 deletions proxy/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified.")
}
outbound.Name = "http"
inbound := session.InboundFromContext(ctx)
if inbound != nil {
inbound.SetCanSpliceCopy(2)
}
target := outbound.Target
targetAddr := target.NetAddr()

Expand Down
9 changes: 4 additions & 5 deletions proxy/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,10 @@ type readerOnly struct {

func (s *Server) Process(ctx context.Context, network net.Network, conn stat.Connection, dispatcher routing.Dispatcher) error {
inbound := session.InboundFromContext(ctx)
if inbound != nil {
inbound.Name = "http"
inbound.User = &protocol.MemoryUser{
Level: s.config.UserLevel,
}
inbound.Name = "http"
inbound.SetCanSpliceCopy(2)
inbound.User = &protocol.MemoryUser{
Level: s.config.UserLevel,
}

reader := bufio.NewReaderSize(readerOnly{conn}, buf.Size)
Expand Down
1 change: 1 addition & 0 deletions proxy/loopback/loopback.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (l *Loopback) Process(ctx context.Context, link *transport.Link, _ internet
if outbound == nil || !outbound.Target.IsValid() {
return newError("target not specified.")
}
outbound.Name = "loopback"
destination := outbound.Target

newError("opening connection to ", destination).WriteToLog(session.ExportIDToError(ctx))
Expand Down
86 changes: 86 additions & 0 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,24 @@ package proxy

import (
"context"
gotls "crypto/tls"
"io"
"runtime"

"github.com/pires/go-proxyproto"
"github.com/xtls/xray-core/common/buf"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/common/net"
"github.com/xtls/xray-core/common/protocol"
"github.com/xtls/xray-core/common/session"
"github.com/xtls/xray-core/common/signal"
"github.com/xtls/xray-core/features/routing"
"github.com/xtls/xray-core/features/stats"
"github.com/xtls/xray-core/transport"
"github.com/xtls/xray-core/transport/internet"
"github.com/xtls/xray-core/transport/internet/reality"
"github.com/xtls/xray-core/transport/internet/stat"
"github.com/xtls/xray-core/transport/internet/tls"
)

// An Inbound processes inbound connections.
Expand Down Expand Up @@ -47,3 +58,78 @@ type GetInbound interface {
type GetOutbound interface {
GetOutbound() Outbound
}

// UnwrapRawConn support unwrap stats, tls, utls, reality and proxyproto conn and get raw tcp conn from it
func UnwrapRawConn(conn net.Conn) (net.Conn, stats.Counter, stats.Counter) {
var readCounter, writerCounter stats.Counter
if conn != nil {
statConn, ok := conn.(*stat.CounterConnection)
if ok {
conn = statConn.Connection
readCounter = statConn.ReadCounter
writerCounter = statConn.WriteCounter
}
if xc, ok := conn.(*gotls.Conn); ok {
conn = xc.NetConn()
} else if utlsConn, ok := conn.(*tls.UConn); ok {
conn = utlsConn.NetConn()
} else if realityConn, ok := conn.(*reality.Conn); ok {
conn = realityConn.NetConn()
} else if realityUConn, ok := conn.(*reality.UConn); ok {
conn = realityUConn.NetConn()
}
if pc, ok := conn.(*proxyproto.Conn); ok {
conn = pc.Raw()
// 8192 > 4096, there is no need to process pc's bufReader
}
}
return conn, readCounter, writerCounter
}

// CopyRawConnIfExist use the most efficient copy method.
// - If caller don't want to turn on splice, do not pass in both reader conn and writer conn
// - writer are from *transport.Link
func CopyRawConnIfExist(ctx context.Context, readerConn net.Conn, writerConn net.Conn, writer buf.Writer, timer signal.ActivityUpdater) error {
readerConn, readCounter, _ := UnwrapRawConn(readerConn)
writerConn, _, writeCounter := UnwrapRawConn(writerConn)
reader := buf.NewReader(readerConn)
if inbound := session.InboundFromContext(ctx); inbound != nil {
if tc, ok := writerConn.(*net.TCPConn); ok && readerConn != nil && writerConn != nil && (runtime.GOOS == "linux" || runtime.GOOS == "android") {
for inbound.CanSpliceCopy != 3 {
if inbound.CanSpliceCopy == 1 {
newError("CopyRawConn splice").WriteToLog(session.ExportIDToError(ctx))
runtime.Gosched() // necessary
w, err := tc.ReadFrom(readerConn)
if readCounter != nil {
readCounter.Add(w)
}
if writeCounter != nil {
writeCounter.Add(w)
}
if err != nil && errors.Cause(err) != io.EOF {
return err
}
return nil
}
buffer, err := reader.ReadMultiBuffer()
if !buffer.IsEmpty() {
if readCounter != nil {
readCounter.Add(int64(buffer.Len()))
}
timer.Update()
if werr := writer.WriteMultiBuffer(buffer); werr != nil {
return werr
}
}
if err != nil {
return err
}
}
}
}
newError("CopyRawConn readv").WriteToLog(session.ExportIDToError(ctx))
if err := buf.Copy(reader, writer, buf.UpdateActivity(timer), buf.AddToStatCounter(readCounter)); err != nil {
return newError("failed to process response").Base(err)
}
return nil
}
Loading

4 comments on commit efd32b0

@chika0801
Copy link
Contributor

@chika0801 chika0801 commented on efd32b0 Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

客户端最新代码版本或用1.8.4

服务端用这个代码版本,客户端表现是浏览器开网页提示 ERR_SSL_PROTOCOL_ERROR,

日志就先删除了

@yuhan6665
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chika0801 感谢测试 我有空看下

@chika0801
Copy link
Contributor

@chika0801 chika0801 commented on efd32b0 Sep 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

两端使用此代码的版本,客户端浏览器访问 tls 1.2 的网站比如 sukebei.nyaa.si 能正常打开

@yuhan6665
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

两端使用此代码的版本,客户端浏览器访问 tls 1.2 的网站比如 sukebei.nyaa.si 能正常打开

应该修好了 585d5ba 我不知道我之前是怎么测试通过的 ;)
再次感谢!

Please sign in to comment.