Skip to content

Commit

Permalink
XHTTP XMUX: Add hMaxRequestTimes and hKeepAlivePeriod (XTLS#4163)
Browse files Browse the repository at this point in the history
  • Loading branch information
RPRX authored Dec 15, 2024
1 parent 7463561 commit 73e0d4a
Show file tree
Hide file tree
Showing 10 changed files with 493 additions and 408 deletions.
47 changes: 24 additions & 23 deletions infra/conf/transport_internet.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,22 +228,23 @@ type SplitHTTPConfig struct {
NoSSEHeader bool `json:"noSSEHeader"`
ScMaxEachPostBytes Int32Range `json:"scMaxEachPostBytes"`
ScMinPostsIntervalMs Int32Range `json:"scMinPostsIntervalMs"`
ScMaxBufferedPosts int64 `json:"scMaxConcurrentPosts"`
KeepAlivePeriod int64 `json:"keepAlivePeriod"`
Xmux Xmux `json:"xmux"`
ScMaxBufferedPosts int64 `json:"scMaxBufferedPosts"`
Xmux XmuxConfig `json:"xmux"`
DownloadSettings *StreamConfig `json:"downloadSettings"`
Extra json.RawMessage `json:"extra"`
}

type Xmux struct {
MaxConcurrency Int32Range `json:"maxConcurrency"`
MaxConnections Int32Range `json:"maxConnections"`
CMaxReuseTimes Int32Range `json:"cMaxReuseTimes"`
CMaxLifetimeMs Int32Range `json:"cMaxLifetimeMs"`
type XmuxConfig struct {
MaxConcurrency Int32Range `json:"maxConcurrency"`
MaxConnections Int32Range `json:"maxConnections"`
CMaxReuseTimes Int32Range `json:"cMaxReuseTimes"`
CMaxLifetimeMs Int32Range `json:"cMaxLifetimeMs"`
HMaxRequestTimes Int32Range `json:"hMaxRequestTimes"`
HKeepAlivePeriod int64 `json:"hKeepAlivePeriod"`
}

func splithttpNewRandRangeConfig(input Int32Range) *splithttp.RandRangeConfig {
return &splithttp.RandRangeConfig{
func newRangeConfig(input Int32Range) *splithttp.RangeConfig {
return &splithttp.RangeConfig{
From: input.From,
To: input.To,
}
Expand Down Expand Up @@ -281,33 +282,33 @@ func (c *SplitHTTPConfig) Build() (proto.Message, error) {
if c.Xmux.MaxConnections.To > 0 && c.Xmux.MaxConcurrency.To > 0 {
return nil, errors.New("maxConnections cannot be specified together with maxConcurrency")
}
if c.Xmux.MaxConcurrency.To == 0 &&
c.Xmux.MaxConnections.To == 0 &&
c.Xmux.CMaxReuseTimes.To == 0 &&
c.Xmux.CMaxLifetimeMs.To == 0 {
if c.Xmux == (XmuxConfig{}) {
c.Xmux.MaxConcurrency.From = 16
c.Xmux.MaxConcurrency.To = 32
c.Xmux.CMaxReuseTimes.From = 64
c.Xmux.CMaxReuseTimes.To = 128
c.Xmux.HMaxRequestTimes.From = 800
c.Xmux.HMaxRequestTimes.To = 900
}

config := &splithttp.Config{
Host: c.Host,
Path: c.Path,
Mode: c.Mode,
Headers: c.Headers,
XPaddingBytes: splithttpNewRandRangeConfig(c.XPaddingBytes),
XPaddingBytes: newRangeConfig(c.XPaddingBytes),
NoGRPCHeader: c.NoGRPCHeader,
NoSSEHeader: c.NoSSEHeader,
ScMaxEachPostBytes: splithttpNewRandRangeConfig(c.ScMaxEachPostBytes),
ScMinPostsIntervalMs: splithttpNewRandRangeConfig(c.ScMinPostsIntervalMs),
ScMaxEachPostBytes: newRangeConfig(c.ScMaxEachPostBytes),
ScMinPostsIntervalMs: newRangeConfig(c.ScMinPostsIntervalMs),
ScMaxBufferedPosts: c.ScMaxBufferedPosts,
KeepAlivePeriod: c.KeepAlivePeriod,
Xmux: &splithttp.Multiplexing{
MaxConcurrency: splithttpNewRandRangeConfig(c.Xmux.MaxConcurrency),
MaxConnections: splithttpNewRandRangeConfig(c.Xmux.MaxConnections),
CMaxReuseTimes: splithttpNewRandRangeConfig(c.Xmux.CMaxReuseTimes),
CMaxLifetimeMs: splithttpNewRandRangeConfig(c.Xmux.CMaxLifetimeMs),
Xmux: &splithttp.XmuxConfig{
MaxConcurrency: newRangeConfig(c.Xmux.MaxConcurrency),
MaxConnections: newRangeConfig(c.Xmux.MaxConnections),
CMaxReuseTimes: newRangeConfig(c.Xmux.CMaxReuseTimes),
CMaxLifetimeMs: newRangeConfig(c.Xmux.CMaxLifetimeMs),
HMaxRequestTimes: newRangeConfig(c.Xmux.HMaxRequestTimes),
HKeepAlivePeriod: c.Xmux.HKeepAlivePeriod,
},
}

Expand Down
4 changes: 4 additions & 0 deletions transport/internet/splithttp/browser_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
// has no fields because everything is global state :O)
type BrowserDialerClient struct{}

func (c *BrowserDialerClient) IsClosed() bool {
panic("not implemented yet")
}

func (c *BrowserDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
panic("not implemented yet")
}
Expand Down
22 changes: 21 additions & 1 deletion transport/internet/splithttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

// interface to abstract between use of browser dialer, vs net/http
type DialerClient interface {
IsClosed() bool

// (ctx, baseURL, payload) -> err
// baseURL already contains sessionId and seq
SendUploadRequest(context.Context, string, io.ReadWriteCloser, int64) error
Expand All @@ -39,12 +41,17 @@ type DialerClient interface {
type DefaultDialerClient struct {
transportConfig *Config
client *http.Client
closed bool
httpVersion string
// pool of net.Conn, created using dialUploadConn
uploadRawPool *sync.Pool
dialUploadConn func(ctxInner context.Context) (net.Conn, error)
}

func (c *DefaultDialerClient) IsClosed() bool {
return c.closed
}

func (c *DefaultDialerClient) Open(ctx context.Context, pureURL string) (io.WriteCloser, io.ReadCloser) {
reader, writer := io.Pipe()
req, _ := http.NewRequestWithContext(ctx, "POST", pureURL, reader)
Expand All @@ -59,6 +66,8 @@ func (c *DefaultDialerClient) Open(ctx context.Context, pureURL string) (io.Writ
if err != nil {
errors.LogInfoInner(ctx, err, "failed to open ", pureURL)
} else {
// c.closed = true
response.Body.Close()
errors.LogInfo(ctx, "unexpected status ", response.StatusCode)
}
wrc.Close()
Expand All @@ -76,7 +85,14 @@ func (c *DefaultDialerClient) OpenUpload(ctx context.Context, baseURL string) io
if !c.transportConfig.NoGRPCHeader {
req.Header.Set("Content-Type", "application/grpc")
}
go c.client.Do(req)
go func() {
if resp, err := c.client.Do(req); err == nil {
if resp.StatusCode != 200 {
// c.closed = true
}
resp.Body.Close()
}
}()
return writer
}

Expand Down Expand Up @@ -130,6 +146,7 @@ func (c *DefaultDialerClient) OpenDownload(ctx context.Context, baseURL string)
}

if response.StatusCode != 200 {
// c.closed = true
response.Body.Close()
errors.LogInfo(ctx, "invalid status code on download:", response.Status)
gotDownResponse.Close()
Expand Down Expand Up @@ -180,6 +197,7 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string,
defer resp.Body.Close()

if resp.StatusCode != 200 {
// c.closed = true
return errors.New("bad status code:", resp.Status)
}
} else {
Expand Down Expand Up @@ -214,6 +232,8 @@ func (c *DefaultDialerClient) SendUploadRequest(ctx context.Context, url string,
return fmt.Errorf("error while reading response: %s", err.Error())
}
if resp.StatusCode != 200 {
// c.closed = true
// resp.Body.Close() // I'm not sure
return fmt.Errorf("got non-200 error response code: %d", resp.StatusCode)
}
}
Expand Down
47 changes: 29 additions & 18 deletions transport/internet/splithttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (c *Config) GetNormalizedQuery() string {
query += "&"
}

paddingLen := c.GetNormalizedXPaddingBytes().roll()
paddingLen := c.GetNormalizedXPaddingBytes().rand()
if paddingLen > 0 {
query += "x_padding=" + strings.Repeat("0", int(paddingLen))
}
Expand All @@ -58,7 +58,7 @@ func (c *Config) WriteResponseHeader(writer http.ResponseWriter) {
// CORS headers for the browser dialer
writer.Header().Set("Access-Control-Allow-Origin", "*")
writer.Header().Set("Access-Control-Allow-Methods", "GET, POST")
paddingLen := c.GetNormalizedXPaddingBytes().roll()
paddingLen := c.GetNormalizedXPaddingBytes().rand()
if paddingLen > 0 {
writer.Header().Set("X-Padding", strings.Repeat("0", int(paddingLen)))
}
Expand All @@ -72,9 +72,9 @@ func (c *Config) GetNormalizedScMaxBufferedPosts() int {
return int(c.ScMaxBufferedPosts)
}

func (c *Config) GetNormalizedScMaxEachPostBytes() RandRangeConfig {
func (c *Config) GetNormalizedScMaxEachPostBytes() RangeConfig {
if c.ScMaxEachPostBytes == nil || c.ScMaxEachPostBytes.To == 0 {
return RandRangeConfig{
return RangeConfig{
From: 1000000,
To: 1000000,
}
Expand All @@ -83,9 +83,9 @@ func (c *Config) GetNormalizedScMaxEachPostBytes() RandRangeConfig {
return *c.ScMaxEachPostBytes
}

func (c *Config) GetNormalizedScMinPostsIntervalMs() RandRangeConfig {
func (c *Config) GetNormalizedScMinPostsIntervalMs() RangeConfig {
if c.ScMinPostsIntervalMs == nil || c.ScMinPostsIntervalMs.To == 0 {
return RandRangeConfig{
return RangeConfig{
From: 30,
To: 30,
}
Expand All @@ -94,9 +94,9 @@ func (c *Config) GetNormalizedScMinPostsIntervalMs() RandRangeConfig {
return *c.ScMinPostsIntervalMs
}

func (c *Config) GetNormalizedXPaddingBytes() RandRangeConfig {
func (c *Config) GetNormalizedXPaddingBytes() RangeConfig {
if c.XPaddingBytes == nil || c.XPaddingBytes.To == 0 {
return RandRangeConfig{
return RangeConfig{
From: 100,
To: 1000,
}
Expand All @@ -105,9 +105,20 @@ func (c *Config) GetNormalizedXPaddingBytes() RandRangeConfig {
return *c.XPaddingBytes
}

func (m *Multiplexing) GetNormalizedCMaxReuseTimes() RandRangeConfig {
func (m *XmuxConfig) GetNormalizedCMaxRequestTimes() RangeConfig {
if m.HMaxRequestTimes == nil {
return RangeConfig{
From: 0,
To: 0,
}
}

return *m.HMaxRequestTimes
}

func (m *XmuxConfig) GetNormalizedCMaxReuseTimes() RangeConfig {
if m.CMaxReuseTimes == nil {
return RandRangeConfig{
return RangeConfig{
From: 0,
To: 0,
}
Expand All @@ -116,19 +127,19 @@ func (m *Multiplexing) GetNormalizedCMaxReuseTimes() RandRangeConfig {
return *m.CMaxReuseTimes
}

func (m *Multiplexing) GetNormalizedCMaxLifetimeMs() RandRangeConfig {
if m.CMaxLifetimeMs == nil || m.CMaxLifetimeMs.To == 0 {
return RandRangeConfig{
func (m *XmuxConfig) GetNormalizedCMaxLifetimeMs() RangeConfig {
if m.CMaxLifetimeMs == nil {
return RangeConfig{
From: 0,
To: 0,
}
}
return *m.CMaxLifetimeMs
}

func (m *Multiplexing) GetNormalizedMaxConnections() RandRangeConfig {
func (m *XmuxConfig) GetNormalizedMaxConnections() RangeConfig {
if m.MaxConnections == nil {
return RandRangeConfig{
return RangeConfig{
From: 0,
To: 0,
}
Expand All @@ -137,9 +148,9 @@ func (m *Multiplexing) GetNormalizedMaxConnections() RandRangeConfig {
return *m.MaxConnections
}

func (m *Multiplexing) GetNormalizedMaxConcurrency() RandRangeConfig {
func (m *XmuxConfig) GetNormalizedMaxConcurrency() RangeConfig {
if m.MaxConcurrency == nil {
return RandRangeConfig{
return RangeConfig{
From: 0,
To: 0,
}
Expand All @@ -154,7 +165,7 @@ func init() {
}))
}

func (c RandRangeConfig) roll() int32 {
func (c RangeConfig) rand() int32 {
if c.From == c.To {
return c.From
}
Expand Down
Loading

0 comments on commit 73e0d4a

Please sign in to comment.