Skip to content

Commit

Permalink
SplitHTTP: Do not produce too large upload
Browse files Browse the repository at this point in the history
Since XTLS#3603 and
XTLS#3611, iperf and speedtest are
triggering "too large upload" by the server. This is because v2ray's
MultiBuffer pipe can actually return data larger than the configured
size limit.

I'm surprised nobody noticed it so far. In principle, any heavy upload
could disrupt the entire connection. In its infinite wisdom,
speedtest.net hides such errors and only shows low upload instead. I
only noticed this issue myself when inspecting server logs.
  • Loading branch information
mmmray committed Aug 16, 2024
1 parent 8da6954 commit ec04685
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 7 deletions.
43 changes: 38 additions & 5 deletions transport/internet/splithttp/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me

httpClient := getHTTPClient(ctx, dest, streamSettings)

uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(scMaxEachPostBytes.roll()))
maxUploadSize := scMaxEachPostBytes.roll()
uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize))

go func() {
requestsLimiter := semaphore.New(int(scMaxConcurrentPosts.roll()))
Expand Down Expand Up @@ -318,16 +319,48 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
},
}

// necessary in order to send larger chunks in upload
bufferedUploadPipeWriter := buf.NewBufferedWriter(uploadPipeWriter)
bufferedUploadPipeWriter.SetBuffered(false)
writer := uploadWriter{
uploadPipeWriter,
maxUploadSize,
}

conn := splitConn{
writer: bufferedUploadPipeWriter,
writer: writer,
reader: lazyDownload,
remoteAddr: remoteAddr,
localAddr: localAddr,
}

return stat.Connection(&conn), nil
}

// A wrapper around pipe that ensures the size limit is exactly honored.
//
// The MultiBuffer pipe accepts any single WriteMultiBuffer call even if that
// single MultiBuffer exceeds the size limit, and then starts blocking on the
// next WriteMultiBuffer call. This means that ReadMultiBuffer can return more
// bytes than the size limit. We work around this by splitting a potentially
// too large write up into multiple.
type uploadWriter struct {
*pipe.Writer
maxLen int32
}

func (w uploadWriter) Write(b []byte) (int, error) {
capacity := int(w.maxLen - w.Len())
if capacity > 0 && capacity < len(b) {
b = b[:capacity]
}

buffer := buf.New()
n, err := buffer.Write(b)
if err != nil {
return 0, err
}

err = w.WriteMultiBuffer([]*buf.Buffer{buffer})
if err != nil {
return 0, err
}
return n, nil
}
57 changes: 56 additions & 1 deletion transport/internet/splithttp/splithttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func Test_queryString(t *testing.T) {
ctx := context.Background()
streamSettings := &internet.MemoryStreamConfig{
ProtocolName: "splithttp",
ProtocolSettings: &Config{Path: "sh"},
ProtocolSettings: &Config{Path: "sh?ed=2048"},
}
conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)

Expand All @@ -407,3 +407,58 @@ func Test_queryString(t *testing.T) {
common.Must(conn.Close())
common.Must(listen.Close())
}

func Test_maxUpload(t *testing.T) {
listenPort := tcp.PickPort()
streamSettings := &internet.MemoryStreamConfig{
ProtocolName: "splithttp",
ProtocolSettings: &Config{
Path: "/sh",
ScMaxEachPostBytes: &RandRangeConfig{
From: 100,
To: 100,
},
},
}

var uploadSize int
listen, err := ListenSH(context.Background(), net.LocalHostIP, listenPort, streamSettings, func(conn stat.Connection) {
go func(c stat.Connection) {
defer c.Close()
var b [1024]byte
c.SetReadDeadline(time.Now().Add(2 * time.Second))
n, err := c.Read(b[:])
if err != nil {
return
}

uploadSize = n

common.Must2(c.Write([]byte("Response")))
}(conn)
})
common.Must(err)
ctx := context.Background()

conn, err := Dial(ctx, net.TCPDestination(net.DomainAddress("localhost"), listenPort), streamSettings)

// send a slightly too large upload
var upload [101]byte
_, err = conn.Write(upload[:])
common.Must(err)

var b [1024]byte
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
n, _ := io.ReadFull(conn, b[:])
fmt.Println("string is", n)
if string(b[:n]) != "Response" {
t.Error("response: ", string(b[:n]))
}
common.Must(conn.Close())

if uploadSize > 100 {
t.Error("incorrect upload size: ", uploadSize)
}

common.Must(listen.Close())
}
10 changes: 9 additions & 1 deletion transport/pipe/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type pipeOption struct {
}

func (o *pipeOption) isFull(curSize int32) bool {
return o.limit >= 0 && curSize > o.limit
return o.limit >= 0 && curSize >= o.limit
}

type pipe struct {
Expand All @@ -46,6 +46,14 @@ var (
errSlowDown = errors.New("slow down")
)

func (p *pipe) Len() int32 {
data := p.data
if data == nil {
return 0
}
return data.Len()
}

func (p *pipe) getState(forRead bool) error {
switch p.state {
case open:
Expand Down
5 changes: 5 additions & 0 deletions transport/pipe/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@ func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error {
return w.pipe.WriteMultiBuffer(mb)
}


// Close implements io.Closer. After the pipe is closed, writing to the pipe will return io.ErrClosedPipe, while reading will return io.EOF.
func (w *Writer) Close() error {
return w.pipe.Close()
}

func (w *Writer) Len() int32 {
return w.pipe.Len()
}

// Interrupt implements common.Interruptible.
func (w *Writer) Interrupt() {
w.pipe.Interrupt()
Expand Down

0 comments on commit ec04685

Please sign in to comment.