From d93b90881468a512e875ab4872654d3b415c7864 Mon Sep 17 00:00:00 2001 From: RecallSong <13607438+recallsong@users.noreply.github.com> Date: Thu, 11 Nov 2021 14:07:23 +0800 Subject: [PATCH] shared http transport (#3014) * shared http transport * disable connection pool --- .../extensions/loghub/index/query/clients.go | 2 +- pkg/http/httpclient/client.go | 95 +++++++++++-------- pkg/http/httpclient/request.go | 1 + pkg/http/httpclient/trace.go | 1 + 4 files changed, 60 insertions(+), 39 deletions(-) diff --git a/modules/extensions/loghub/index/query/clients.go b/modules/extensions/loghub/index/query/clients.go index a7acd961164..6ee8c5c0cba 100644 --- a/modules/extensions/loghub/index/query/clients.go +++ b/modules/extensions/loghub/index/query/clients.go @@ -262,7 +262,7 @@ func newHTTPClient(clusterName string) *http.Client { Proxy: http.ProxyFromEnvironment, DialContext: t.DialContext, ForceAttemptHTTP2: true, - MaxIdleConns: 100, + MaxIdleConns: -1, IdleConnTimeout: 90 * time.Second, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, diff --git a/pkg/http/httpclient/client.go b/pkg/http/httpclient/client.go index b1c409d744e..3b544fd1038 100644 --- a/pkg/http/httpclient/client.go +++ b/pkg/http/httpclient/client.go @@ -34,13 +34,6 @@ import ( "github.com/erda-project/erda/pkg/clusterdialer" ) -const ( - // DialTimeout 建立 tcp 连接的超时时间 - DialTimeout = 15 * time.Second - // ClientDefaultTimeout 从建立 tcp 到读完 response body 超时时间 - ClientDefaultTimeout = 60 * time.Second -) - type BasicAuth struct { name string password string @@ -202,16 +195,40 @@ func WithEnableAutoRetry(enableAutoRetry bool) OpOption { } } -func mkDialContext(option *Option) func(ctx context.Context, network, addr string) (net.Conn, error) { - raw := (&net.Dialer{ - Timeout: option.dialTimeout, - KeepAlive: option.dialerKeepalive, +var defaultTransport = newdefaultTransport(newDialContext(0, 0)) + +func newDialContext(dialTimeout, tcpKeepAlive time.Duration) func(ctx context.Context, network, addr string) (net.Conn, error) { + if dialTimeout == 0 { + dialTimeout = 15 * time.Second + } + if tcpKeepAlive == 0 { + tcpKeepAlive = 60 * time.Second + } + return (&net.Dialer{ + Timeout: dialTimeout, + KeepAlive: tcpKeepAlive, }).DialContext +} + +func newdefaultTransport(dialContext func(ctx context.Context, network, addr string) (net.Conn, error)) *http.Transport { + return &http.Transport{ + DialContext: dialContext, + Proxy: http.ProxyFromEnvironment, + ForceAttemptHTTP2: false, + MaxIdleConns: 10, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 1 * time.Second, + } +} + +func mkDialContext(clusterDialKey string, dnscache *DNSCache, dialTimeout, tcpKeepAlive time.Duration) func(ctx context.Context, network, addr string) (net.Conn, error) { + raw := newDialContext(dialTimeout, tcpKeepAlive) dialcontext := raw - if option.clusterDialKey != "" { - return clusterdialer.DialContext(option.clusterDialKey) + if clusterDialKey != "" { + return clusterdialer.DialContext(clusterDialKey) } - if option.dnscache != nil { + if dnscache != nil { dialcontext = func(ctx context.Context, network, addr string) (net.Conn, error) { var host string var remain string @@ -231,7 +248,7 @@ func mkDialContext(option *Option) func(ctx context.Context, network, addr strin if net.ParseIP(host) != nil || (!strings.HasPrefix(network, "tcp") && !strings.HasPrefix(network, "udp")) { return raw(ctx, network, addr) } - ips, err := option.dnscache.lookup(host) + ips, err := dnscache.lookup(host) if err != nil { return raw(ctx, network, addr) } @@ -250,37 +267,39 @@ func mkDialContext(option *Option) func(ctx context.Context, network, addr strin func New(ops ...OpOption) *HTTPClient { option := &Option{} - option.dialTimeout = DialTimeout - option.clientTimeout = ClientDefaultTimeout for _, op := range ops { op(option) } - tr := &http.Transport{ - DialContext: mkDialContext(option), - MaxIdleConns: 2, - } - if option.proxy != "" { - tr.Proxy = func(request *http.Request) (u *url.URL, err error) { - return url.Parse(option.proxy) - } - } - if option.dialerKeepalive != 0 { - tr.IdleConnTimeout = option.dialerKeepalive - } - tr.ExpectContinueTimeout = 1 * time.Second proto := "http" if option.isHTTPS { proto = "https" - if option.ca != nil { - tr.TLSClientConfig = &tls.Config{ - RootCAs: option.ca, - Certificates: []tls.Certificate{option.keyPair}, + } + + var tr = defaultTransport + if option.clusterDialKey != "" || option.dnscache != nil || option.dialTimeout != 0 || option.clientTimeout != 0 || + option.proxy != "" || option.dialerKeepalive != 0 || option.ca != nil { + tr = newdefaultTransport(mkDialContext(option.clusterDialKey, option.dnscache, option.dialTimeout, option.clientTimeout)) + tr.MaxIdleConns = -1 // disable connection pool + if option.proxy != "" { + tr.Proxy = func(request *http.Request) (u *url.URL, err error) { + return url.Parse(option.proxy) } - } else { - tr.TLSClientConfig = &tls.Config{ - InsecureSkipVerify: true, - Certificates: []tls.Certificate{option.keyPair}, + } + if option.dialerKeepalive != 0 { + tr.IdleConnTimeout = option.dialerKeepalive + } + if option.isHTTPS { + if option.ca != nil { + tr.TLSClientConfig = &tls.Config{ + RootCAs: option.ca, + Certificates: []tls.Certificate{option.keyPair}, + } + } else { + tr.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: true, + Certificates: []tls.Certificate{option.keyPair}, + } } } } diff --git a/pkg/http/httpclient/request.go b/pkg/http/httpclient/request.go index b659d1accdc..8ae032dfb6b 100644 --- a/pkg/http/httpclient/request.go +++ b/pkg/http/httpclient/request.go @@ -201,6 +201,7 @@ func (r AfterDo) JSON(o interface{}) (*Response, error) { return nil, err } defer resp.Body.Close() + // check content-type before decode body contentType := resp.Header.Get("Content-Type") body, err := ioutil.ReadAll(resp.Body) diff --git a/pkg/http/httpclient/trace.go b/pkg/http/httpclient/trace.go index 05dda17e673..f4395def293 100644 --- a/pkg/http/httpclient/trace.go +++ b/pkg/http/httpclient/trace.go @@ -47,6 +47,7 @@ func (t *DefaultTracer) TraceResponse(r *http.Response) { io.WriteString(t.w, fmt.Sprintf("TraceResponse: read response body fail: %v", err)) return } + r.Body.Close() io.WriteString(t.w, fmt.Sprintf("ResponseBody: %s\n", string(body))) r.Body = ioutil.NopCloser(bytes.NewReader(body)) }