From a5ae2e891c4166fa304ac89ddf886cf6cf19559a Mon Sep 17 00:00:00 2001 From: Skyxim Date: Sat, 26 Nov 2022 19:37:32 +0800 Subject: [PATCH] feat: support fast_open for hysteria, and unified parameter naming --- adapter/outbound/hysteria.go | 79 ++++++++++++++++++------------- transport/hysteria/core/client.go | 44 ++++++++++++----- 2 files changed, 79 insertions(+), 44 deletions(-) diff --git a/adapter/outbound/hysteria.go b/adapter/outbound/hysteria.go index 5a2850c6de..5ba96a0af1 100644 --- a/adapter/outbound/hysteria.go +++ b/adapter/outbound/hysteria.go @@ -10,6 +10,7 @@ import ( "fmt" "net" "os" + "reflect" "regexp" "strconv" "time" @@ -87,27 +88,33 @@ func (h *Hysteria) ListenPacketContext(ctx context.Context, metadata *C.Metadata type HysteriaOption struct { BasicOption - Name string `proxy:"name"` - Server string `proxy:"server"` - Port int `proxy:"port"` - Protocol string `proxy:"protocol,omitempty"` - ObfsProtocol string `proxy:"obfs-protocol,omitempty"` // compatible with Stash - Up string `proxy:"up"` - UpSpeed int `proxy:"up-speed,omitempty"` // compatible with Stash - Down string `proxy:"down"` - DownSpeed int `proxy:"down-speed,omitempty"` // compatible with Stash - Auth string `proxy:"auth,omitempty"` - AuthString string `proxy:"auth_str,omitempty"` - Obfs string `proxy:"obfs,omitempty"` - SNI string `proxy:"sni,omitempty"` - SkipCertVerify bool `proxy:"skip-cert-verify,omitempty"` - Fingerprint string `proxy:"fingerprint,omitempty"` - ALPN []string `proxy:"alpn,omitempty"` - CustomCA string `proxy:"ca,omitempty"` - CustomCAString string `proxy:"ca_str,omitempty"` - ReceiveWindowConn int `proxy:"recv_window_conn,omitempty"` - ReceiveWindow int `proxy:"recv_window,omitempty"` - DisableMTUDiscovery bool `proxy:"disable_mtu_discovery,omitempty"` + Name string `proxy:"name"` + Server string `proxy:"server"` + Port int `proxy:"port"` + Protocol string `proxy:"protocol,omitempty"` + ObfsProtocol string `proxy:"obfs-protocol,omitempty"` // compatible with Stash + Up string `proxy:"up"` + UpSpeed int `proxy:"up-speed,omitempty"` // compatible with Stash + Down string `proxy:"down"` + DownSpeed int `proxy:"down-speed,omitempty"` // compatible with Stash + Auth string `proxy:"auth,omitempty"` + OldAuthString string `proxy:"auth_str,omitempty"` + AuthString string `proxy:"auth-str,omitempty"` + Obfs string `proxy:"obfs,omitempty"` + SNI string `proxy:"sni,omitempty"` + SkipCertVerify bool `proxy:"skip-cert-verify,omitempty"` + Fingerprint string `proxy:"fingerprint,omitempty"` + ALPN []string `proxy:"alpn,omitempty"` + CustomCA string `proxy:"ca,omitempty"` + OldCustomCAString string `proxy:"ca_str,omitempty"` + CustomCAString string `proxy:"ca-str,omitempty"` + OldReceiveWindowConn int `proxy:"recv_window_conn,omitempty"` + ReceiveWindowConn int `proxy:"recv-window-conn,omitempty"` + OldReceiveWindow int `proxy:"recv_window,omitempty"` + ReceiveWindow int `proxy:"recv-window,omitempty"` + OldDisableMTUDiscovery bool `proxy:"disable_mtu_discovery,omitempty"` + DisableMTUDiscovery bool `proxy:"disable-mtu-discovery,omitempty"` + FastOpen bool `proxy:"fast-open,omitempty"` } func (c *HysteriaOption) Speed() (uint64, uint64, error) { @@ -151,8 +158,8 @@ func NewHysteria(option HysteriaOption) (*Hysteria, error) { if err != nil { return nil, fmt.Errorf("hysteria %s load ca error: %w", addr, err) } - } else if option.CustomCAString != "" { - bs = []byte(option.CustomCAString) + } else if compatibilityValue(option.CustomCAString, option.OldCustomCAString) != "" { + bs = []byte(compatibilityValue(option.CustomCAString, option.OldCustomCAString)) } if len(bs) > 0 { @@ -182,14 +189,13 @@ func NewHysteria(option HysteriaOption) (*Hysteria, error) { } else { tlsConfig.NextProtos = []string{DefaultALPN} } - quicConfig := &quic.Config{ - InitialStreamReceiveWindow: uint64(option.ReceiveWindowConn), - MaxStreamReceiveWindow: uint64(option.ReceiveWindowConn), - InitialConnectionReceiveWindow: uint64(option.ReceiveWindow), - MaxConnectionReceiveWindow: uint64(option.ReceiveWindow), + InitialStreamReceiveWindow: uint64(compatibilityValue(option.ReceiveWindowConn, option.OldReceiveWindow)), + MaxStreamReceiveWindow: uint64(compatibilityValue(option.ReceiveWindowConn, option.OldReceiveWindow)), + InitialConnectionReceiveWindow: uint64(compatibilityValue(option.ReceiveWindow, option.OldReceiveWindow)), + MaxConnectionReceiveWindow: uint64(compatibilityValue(option.ReceiveWindow, option.OldReceiveWindow)), KeepAlivePeriod: 10 * time.Second, - DisablePathMTUDiscovery: option.DisableMTUDiscovery, + DisablePathMTUDiscovery: compatibilityValue(option.DisableMTUDiscovery, option.OldDisableMTUDiscovery), EnableDatagrams: true, } if option.ObfsProtocol != "" { @@ -198,11 +204,11 @@ func NewHysteria(option HysteriaOption) (*Hysteria, error) { if option.Protocol == "" { option.Protocol = DefaultProtocol } - if option.ReceiveWindowConn == 0 { + if compatibilityValue( option.ReceiveWindowConn, option.OldReceiveWindowConn)== 0 { quicConfig.InitialStreamReceiveWindow = DefaultStreamReceiveWindow / 10 quicConfig.MaxStreamReceiveWindow = DefaultStreamReceiveWindow } - if option.ReceiveWindow == 0 { + if compatibilityValue( option.ReceiveWindow,option.OldReceiveWindow) == 0 { quicConfig.InitialConnectionReceiveWindow = DefaultConnectionReceiveWindow / 10 quicConfig.MaxConnectionReceiveWindow = DefaultConnectionReceiveWindow } @@ -210,7 +216,7 @@ func NewHysteria(option HysteriaOption) (*Hysteria, error) { log.Infoln("hysteria: Path MTU Discovery is not yet supported on this platform") } - var auth = []byte(option.AuthString) + var auth = []byte(compatibilityValue(option.AuthString, option.OldAuthString)) if option.Auth != "" { auth, err = base64.StdEncoding.DecodeString(option.Auth) if err != nil { @@ -235,7 +241,7 @@ func NewHysteria(option HysteriaOption) (*Hysteria, error) { client, err := core.NewClient( addr, option.Protocol, auth, tlsConfig, quicConfig, clientTransport, up, down, func(refBPS uint64) congestion.CongestionControl { return hyCongestion.NewBrutalSender(congestion.ByteCount(refBPS)) - }, obfuscator, + }, obfuscator, option.FastOpen, ) if err != nil { return nil, fmt.Errorf("hysteria %s create error: %w", addr, err) @@ -254,6 +260,13 @@ func NewHysteria(option HysteriaOption) (*Hysteria, error) { }, nil } +func compatibilityValue[T any](prefer T, fallback T) T { + if reflect.ValueOf(prefer).IsZero() { + return fallback + } else { + return prefer + } +} func stringToBps(s string) uint64 { if s == "" { return 0 diff --git a/transport/hysteria/core/client.go b/transport/hysteria/core/client.go index 9c93f5a59f..8a19570bea 100644 --- a/transport/hysteria/core/client.go +++ b/transport/hysteria/core/client.go @@ -45,11 +45,12 @@ type Client struct { udpSessionMutex sync.RWMutex udpSessionMap map[uint32]chan *udpMessage udpDefragger defragger + fastOpen bool } func NewClient(serverAddr string, protocol string, auth []byte, tlsConfig *tls.Config, quicConfig *quic.Config, transport *transport.ClientTransport, sendBPS uint64, recvBPS uint64, congestionFactory CongestionFactory, - obfuscator obfs.Obfuscator) (*Client, error) { + obfuscator obfs.Obfuscator,fastOpen bool) (*Client, error) { quicConfig.DisablePathMTUDiscovery = quicConfig.DisablePathMTUDiscovery || pmtud_fix.DisablePathMTUDiscovery c := &Client{ transport: transport, @@ -62,6 +63,7 @@ func NewClient(serverAddr string, protocol string, auth []byte, tlsConfig *tls.C obfuscator: obfuscator, tlsConfig: tlsConfig, quicConfig: quicConfig, + fastOpen: fastOpen, } return c, nil } @@ -205,21 +207,27 @@ func (c *Client) DialTCP(addr string, dialer transport.PacketDialer) (net.Conn, _ = stream.Close() return nil, err } - // Read response - var sr serverResponse - err = struc.Unpack(stream, &sr) - if err != nil { - _ = stream.Close() - return nil, err - } - if !sr.OK { - _ = stream.Close() - return nil, fmt.Errorf("connection rejected: %s", sr.Message) + // If fast open is enabled, we return the stream immediately + // and defer the response handling to the first Read() call + if !c.fastOpen { + // Read response + var sr serverResponse + err = struc.Unpack(stream, &sr) + if err != nil { + _ = stream.Close() + return nil, err + } + if !sr.OK { + _ = stream.Close() + return nil, fmt.Errorf("connection rejected: %s", sr.Message) + } } + return &quicConn{ Orig: stream, PseudoLocalAddr: session.LocalAddr(), PseudoRemoteAddr: session.RemoteAddr(), + Established: !c.fastOpen, }, nil } @@ -288,9 +296,23 @@ type quicConn struct { Orig quic.Stream PseudoLocalAddr net.Addr PseudoRemoteAddr net.Addr + Established bool } func (w *quicConn) Read(b []byte) (n int, err error) { + if !w.Established { + var sr serverResponse + err := struc.Unpack(w.Orig, &sr) + if err != nil { + _ = w.Close() + return 0, err + } + if !sr.OK { + _ = w.Close() + return 0, fmt.Errorf("connection rejected: %s", sr.Message) + } + w.Established = true + } return w.Orig.Read(b) }