Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concatenate header and first upstream payload #73

Merged
merged 3 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion shadowsocks/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"io"
"net"
"time"

onet "github.com/Jigsaw-Code/outline-ss-server/net"
"github.com/shadowsocks/go-shadowsocks2/core"
Expand Down Expand Up @@ -46,6 +47,18 @@ type ssClient struct {
cipher shadowaead.Cipher
}

// This code contains an optimization to send the initial client payload along with
// the Shadowsocks handshake. This saves one packet during connection, and also
// reduces the distinctiveness of the connection pattern.
//
// Normally, the initial payload will be sent as soon as the socket is connected,
// except for delays due to inter-process communication. However, some protocols
// expect the server to send data first, in which case there is no client payload.
// We therefore use a short delay, longer than any reasonable IPC but similar to
// typical network latency. (In an Android emulator, the 90th percentile delay
// was ~1 ms.) If no client payload is received by this time, we connect without it.
const helloWait = 20 * time.Millisecond
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did you decide no 20 ms?
This will only delay connections where the client doesn't immediately send data, but I have no idea what those are, so maybe reduce the wait time?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, reduced to 10ms


func (c *ssClient) DialTCP(laddr *net.TCPAddr, raddr string) (onet.DuplexConn, error) {
socksTargetAddr := socks.ParseAddr(raddr)
if socksTargetAddr == nil {
Expand All @@ -57,11 +70,14 @@ func (c *ssClient) DialTCP(laddr *net.TCPAddr, raddr string) (onet.DuplexConn, e
return nil, err
}
ssw := NewShadowsocksWriter(proxyConn, c.cipher)
_, err = ssw.Write(socksTargetAddr)
_, err = ssw.LazyWrite(socksTargetAddr)
if err != nil {
proxyConn.Close()
return nil, errors.New("Failed to write target address")
}
time.AfterFunc(helloWait, func() {
ssw.Flush()
})
ssr := NewShadowsocksReader(proxyConn, c.cipher)
return onet.WrapConn(proxyConn, ssr, ssw), nil
}
Expand Down
22 changes: 22 additions & 0 deletions shadowsocks/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,28 @@ func TestShadowsocksClient_DialTCP(t *testing.T) {
expectEchoPayload(conn, MakeTestPayload(1024), make([]byte, 1024), t)
}

func TestShadowsocksClient_DialTCPNoPayload(t *testing.T) {
proxyAddr := startShadowsocksTCPEchoProxy(testTargetAddr, t)
proxyHost, proxyPort, err := splitHostPortNumber(proxyAddr.String())
if err != nil {
t.Fatalf("Failed to parse proxy address: %v", err)
}
d, err := NewClient(proxyHost, proxyPort, testPassword, testCipher)
if err != nil {
t.Fatalf("Failed to create ShadowsocksClient: %v", err)
}
conn, err := d.DialTCP(nil, testTargetAddr)
if err != nil {
t.Fatalf("ShadowsocksClient.DialTCP failed: %v", err)
}

// Wait for more than 20 milliseconds to ensure that the target
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check that no data was sent in the first 20ms?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I added a test for this.

// address is sent.
time.Sleep(40 * time.Millisecond)
// Force the echo server to verify the target address.
conn.Close()
}

func TestShadowsocksClient_ListenUDP(t *testing.T) {
proxyAddr := startShadowsocksUDPEchoServer(testTargetAddr, t)
proxyHost, proxyPort, err := splitHostPortNumber(proxyAddr.String())
Expand Down
158 changes: 134 additions & 24 deletions shadowsocks/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/binary"
"fmt"
"io"
"sync"

"github.com/shadowsocks/go-shadowsocks2/shadowaead"
)
Expand All @@ -30,17 +31,35 @@ const payloadSizeMask = 0x3FFF // 16*1024 - 1

// Writer is an io.Writer that also implements io.ReaderFrom to
// allow for piping the data without extra allocations and copies.
// The LazyWrite and Flush methods allow a header to be
// added but delayed until the first write, for concatenation.
// All methods except Flush must be called from a single thread.
type Writer interface {
io.Writer
io.ReaderFrom
// LazyWrite queues p to be written, but doesn't send it until
// Flush() is called, a non-lazy write is made, or the buffer
// is filled.
LazyWrite(p []byte) (int, error)
// Flush sends the pending data, if any. This method is
// thread-safe, but must not be the first method called.
Flush() error
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get the "MUST". Why can't it be called first? Wouldn't it just return and do nothing? I don't see what would break.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flush() relies on init() having run, but it can't run init() because init() isn't thread-safe.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there was no write, you only depend on needFlush and the mutex, both of which are initialized before init(). A sequence Flush, LazyWrite, Write seems safe.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Done.

}

type shadowsocksWriter struct {
writer io.Writer
ssCipher shadowaead.Cipher
// This type is single-threaded except when needFlush is true.
// mu protects needFlush, and also protects everything
// else while needFlush could be true.
mu sync.Mutex
// Indicates that a concurrent flush is currently allowed.
needFlush bool
writer io.Writer
ssCipher shadowaead.Cipher
// Wrapper for input that arrives as a slice.
byteWrapper bytes.Reader
// These are lazily initialized:
// Number of plaintext bytes that are currently buffered.
pending int
// These are populated by init():
buf []byte
aead cipher.AEAD
// Index of the next encrypted chunk to write.
Expand Down Expand Up @@ -91,6 +110,42 @@ func (sw *shadowsocksWriter) Write(p []byte) (int, error) {
return int(n), err
}

func (sw *shadowsocksWriter) LazyWrite(p []byte) (int, error) {
if err := sw.init(); err != nil {
return 0, err
}

// Locking is needed due to potential concurrency with the Flush()
// for a previous call to LazyWrite().
sw.mu.Lock()
defer sw.mu.Unlock()

queued := 0
for {
n := sw.enqueue(p)
queued += n
p = p[n:]
if len(p) == 0 {
sw.needFlush = true
return queued, nil
}
// p didn't fit in the buffer. Flush the buffer and try
// again.
if err := sw.flush(); err != nil {
return queued, err
}
}
}

func (sw *shadowsocksWriter) Flush() error {
sw.mu.Lock()
defer sw.mu.Unlock()
if !sw.needFlush {
return nil
}
return sw.flush()
}

func isZero(b []byte) bool {
for _, v := range b {
if v != 0 {
Expand All @@ -100,12 +155,81 @@ func isZero(b []byte) bool {
return true
}

// Returns the slices of sw.buf in which to place plaintext for encryption.
func (sw *shadowsocksWriter) buffers() (sizeBuf, payloadBuf []byte) {
// sw.buf starts with the salt.
saltSize := sw.ssCipher.SaltSize()

// Each Shadowsocks-TCP message consists of a fixed-length size block,
// followed by a variable-length payload block.
sizeBuf = sw.buf[saltSize : saltSize+2]
payloadStart := saltSize + 2 + sw.aead.Overhead()
payloadBuf = sw.buf[payloadStart : payloadStart+payloadSizeMask]
return
}

func (sw *shadowsocksWriter) ReadFrom(r io.Reader) (int64, error) {
if err := sw.init(); err != nil {
return 0, err
}
var written int64
var err error
_, payloadBuf := sw.buffers()

// Special case: one thread-safe read, if necessary
sw.mu.Lock()
if sw.needFlush {
pending := sw.pending

sw.mu.Unlock()
// It's not safe to read into payloadBuf, which may be
// modified on the flush thread.
// Any size is acceptable here, but this is the largest value
// for which a single call to enqueue() is sufficient.
readBuf := make([]byte, len(payloadBuf)-pending)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need this allocation.
You can use sw.buf[saltSize + 2 + sw.aead,Overhead() + sw.pending + sw.aead.Overhead():]

The enqueue call later will shift things to the right position.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Beautiful. Done.

var plaintextSize int
plaintextSize, err = r.Read(readBuf)
written = int64(plaintextSize)
sw.mu.Lock()

sw.enqueue(readBuf[:plaintextSize])
readBuf = nil // Release memory before blocking I/O.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be deleted if you remove the make call

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if flushErr := sw.flush(); flushErr != nil {
err = flushErr
}
sw.needFlush = false
}
sw.mu.Unlock()

// Main transfer loop
for err == nil {
sw.pending, err = r.Read(payloadBuf)
written += int64(sw.pending)
if flushErr := sw.flush(); flushErr != nil {
err = flushErr
}
}

if err == io.EOF { // ignore EOF as per io.ReaderFrom contract
return written, nil
}
return written, fmt.Errorf("Failed to read payload: %v", err)
}

// Adds as much of `plaintext` into the buffer as will fit, and increases
// sw.pending accordingly. Returns the number of bytes consumed.
func (sw *shadowsocksWriter) enqueue(plaintext []byte) int {
_, payloadBuf := sw.buffers()
n := copy(payloadBuf[sw.pending:], plaintext)
sw.pending += n
return n
}

// Encrypts all pending data and writes it to the output.
func (sw *shadowsocksWriter) flush() error {
if sw.pending == 0 {
return nil
}
// sw.buf starts with the salt.
saltSize := sw.ssCipher.SaltSize()
// Normally we ignore the salt at the beginning of sw.buf.
Expand All @@ -117,27 +241,13 @@ func (sw *shadowsocksWriter) ReadFrom(r io.Reader) (int64, error) {
start = 0
}

// Each Shadowsocks-TCP message consists of a fixed-length size block, followed by
// a variable-length payload block.
sizeBuf := sw.buf[saltSize : saltSize+2+sw.aead.Overhead()]
payloadBuf := sw.buf[saltSize+len(sizeBuf):]
for {
plaintextSize, err := r.Read(payloadBuf[:payloadSizeMask])
if plaintextSize > 0 {
binary.BigEndian.PutUint16(sizeBuf, uint16(plaintextSize))
sw.encryptBlock(sizeBuf[:2])
payloadSize := sw.encryptBlock(payloadBuf[:plaintextSize])
_, err = sw.writer.Write(sw.buf[start : saltSize+len(sizeBuf)+payloadSize])
written += int64(plaintextSize)
start = saltSize // Skip the salt for all writes except the first.
}
if err != nil {
if err == io.EOF { // ignore EOF as per io.ReaderFrom contract
return written, nil
}
return written, fmt.Errorf("Failed to read payload: %v", err)
}
}
sizeBuf, payloadBuf := sw.buffers()
binary.BigEndian.PutUint16(sizeBuf, uint16(sw.pending))
sizeBlockSize := sw.encryptBlock(sizeBuf)
payloadSize := sw.encryptBlock(payloadBuf[:sw.pending])
_, err := sw.writer.Write(sw.buf[start : saltSize+sizeBlockSize+payloadSize])
sw.pending = 0
return err
}

// ChunkReader is similar to io.Reader, except that it controls its own
Expand Down
Loading