Skip to content

Commit

Permalink
shared http transport (erda-project#3014)
Browse files Browse the repository at this point in the history
* shared http transport

* disable connection pool
  • Loading branch information
recallsong authored and erda-bot committed Nov 11, 2021
1 parent 96b3478 commit d93b908
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 39 deletions.
2 changes: 1 addition & 1 deletion modules/extensions/loghub/index/query/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func newHTTPClient(clusterName string) *http.Client {
Proxy: http.ProxyFromEnvironment,
DialContext: t.DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
MaxIdleConns: -1,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
Expand Down
95 changes: 57 additions & 38 deletions pkg/http/httpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,6 @@ import (
"github.com/erda-project/erda/pkg/clusterdialer"
)

const (
// DialTimeout 建立 tcp 连接的超时时间
DialTimeout = 15 * time.Second
// ClientDefaultTimeout 从建立 tcp 到读完 response body 超时时间
ClientDefaultTimeout = 60 * time.Second
)

type BasicAuth struct {
name string
password string
Expand Down Expand Up @@ -202,16 +195,40 @@ func WithEnableAutoRetry(enableAutoRetry bool) OpOption {
}
}

func mkDialContext(option *Option) func(ctx context.Context, network, addr string) (net.Conn, error) {
raw := (&net.Dialer{
Timeout: option.dialTimeout,
KeepAlive: option.dialerKeepalive,
var defaultTransport = newdefaultTransport(newDialContext(0, 0))

func newDialContext(dialTimeout, tcpKeepAlive time.Duration) func(ctx context.Context, network, addr string) (net.Conn, error) {
if dialTimeout == 0 {
dialTimeout = 15 * time.Second
}
if tcpKeepAlive == 0 {
tcpKeepAlive = 60 * time.Second
}
return (&net.Dialer{
Timeout: dialTimeout,
KeepAlive: tcpKeepAlive,
}).DialContext
}

func newdefaultTransport(dialContext func(ctx context.Context, network, addr string) (net.Conn, error)) *http.Transport {
return &http.Transport{
DialContext: dialContext,
Proxy: http.ProxyFromEnvironment,
ForceAttemptHTTP2: false,
MaxIdleConns: 10,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
}

func mkDialContext(clusterDialKey string, dnscache *DNSCache, dialTimeout, tcpKeepAlive time.Duration) func(ctx context.Context, network, addr string) (net.Conn, error) {
raw := newDialContext(dialTimeout, tcpKeepAlive)
dialcontext := raw
if option.clusterDialKey != "" {
return clusterdialer.DialContext(option.clusterDialKey)
if clusterDialKey != "" {
return clusterdialer.DialContext(clusterDialKey)
}
if option.dnscache != nil {
if dnscache != nil {
dialcontext = func(ctx context.Context, network, addr string) (net.Conn, error) {
var host string
var remain string
Expand All @@ -231,7 +248,7 @@ func mkDialContext(option *Option) func(ctx context.Context, network, addr strin
if net.ParseIP(host) != nil || (!strings.HasPrefix(network, "tcp") && !strings.HasPrefix(network, "udp")) {
return raw(ctx, network, addr)
}
ips, err := option.dnscache.lookup(host)
ips, err := dnscache.lookup(host)
if err != nil {
return raw(ctx, network, addr)
}
Expand All @@ -250,37 +267,39 @@ func mkDialContext(option *Option) func(ctx context.Context, network, addr strin

func New(ops ...OpOption) *HTTPClient {
option := &Option{}
option.dialTimeout = DialTimeout
option.clientTimeout = ClientDefaultTimeout
for _, op := range ops {
op(option)
}

tr := &http.Transport{
DialContext: mkDialContext(option),
MaxIdleConns: 2,
}
if option.proxy != "" {
tr.Proxy = func(request *http.Request) (u *url.URL, err error) {
return url.Parse(option.proxy)
}
}
if option.dialerKeepalive != 0 {
tr.IdleConnTimeout = option.dialerKeepalive
}
tr.ExpectContinueTimeout = 1 * time.Second
proto := "http"
if option.isHTTPS {
proto = "https"
if option.ca != nil {
tr.TLSClientConfig = &tls.Config{
RootCAs: option.ca,
Certificates: []tls.Certificate{option.keyPair},
}

var tr = defaultTransport
if option.clusterDialKey != "" || option.dnscache != nil || option.dialTimeout != 0 || option.clientTimeout != 0 ||
option.proxy != "" || option.dialerKeepalive != 0 || option.ca != nil {
tr = newdefaultTransport(mkDialContext(option.clusterDialKey, option.dnscache, option.dialTimeout, option.clientTimeout))
tr.MaxIdleConns = -1 // disable connection pool
if option.proxy != "" {
tr.Proxy = func(request *http.Request) (u *url.URL, err error) {
return url.Parse(option.proxy)
}
} else {
tr.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
Certificates: []tls.Certificate{option.keyPair},
}
if option.dialerKeepalive != 0 {
tr.IdleConnTimeout = option.dialerKeepalive
}
if option.isHTTPS {
if option.ca != nil {
tr.TLSClientConfig = &tls.Config{
RootCAs: option.ca,
Certificates: []tls.Certificate{option.keyPair},
}
} else {
tr.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
Certificates: []tls.Certificate{option.keyPair},
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/http/httpclient/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (r AfterDo) JSON(o interface{}) (*Response, error) {
return nil, err
}
defer resp.Body.Close()

// check content-type before decode body
contentType := resp.Header.Get("Content-Type")
body, err := ioutil.ReadAll(resp.Body)
Expand Down
1 change: 1 addition & 0 deletions pkg/http/httpclient/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (t *DefaultTracer) TraceResponse(r *http.Response) {
io.WriteString(t.w, fmt.Sprintf("TraceResponse: read response body fail: %v", err))
return
}
r.Body.Close()
io.WriteString(t.w, fmt.Sprintf("ResponseBody: %s\n", string(body)))
r.Body = ioutil.NopCloser(bytes.NewReader(body))
}

0 comments on commit d93b908

Please sign in to comment.