diff --git a/cmd/mirror.go b/cmd/mirror.go index a1776df30b..e439a33087 100644 --- a/cmd/mirror.go +++ b/cmd/mirror.go @@ -15,6 +15,7 @@ package cmd import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -106,7 +107,7 @@ func newMirrorSignCmd() *cobra.Command { if strings.HasPrefix(args[0], "http") { client := utils.NewHTTPClient(time.Duration(timeout)*time.Second, nil) - data, err := client.Get(args[0]) + data, err := client.Get(context.TODO(), args[0]) if err != nil { return err } @@ -115,7 +116,7 @@ func newMirrorSignCmd() *cobra.Command { return err } - if _, err = client.Post(args[0], bytes.NewBuffer(data)); err != nil { + if _, err = client.Post(context.TODO(), args[0], bytes.NewBuffer(data)); err != nil { return err } diff --git a/cmd/root.go b/cmd/root.go index 2e55e97fdf..0a522821ad 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -241,8 +241,8 @@ func Execute() { tiupReport.ExitCode = int32(code) tiupReport.TakeMilliseconds = uint64(time.Since(start).Milliseconds()) - tele := telemetry.NewTelemetry() ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + tele := telemetry.NewTelemetry() err := tele.Report(ctx, teleReport) if environment.DebugMode { if err != nil { diff --git a/components/cluster/command/root.go b/components/cluster/command/root.go index 9f5b45aeb6..f4c6024bb9 100644 --- a/components/cluster/command/root.go +++ b/components/cluster/command/root.go @@ -324,8 +324,8 @@ func Execute() { } clusterReport.TakeMilliseconds = uint64(time.Since(start).Milliseconds()) clusterReport.Command = strings.Join(teleCommand, " ") - tele := telemetry.NewTelemetry() ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + tele := telemetry.NewTelemetry() err := tele.Report(ctx, teleReport) if environment.DebugMode { if err != nil { diff --git a/components/playground/main.go b/components/playground/main.go index baabd28fa7..2451615439 100644 --- a/components/playground/main.go +++ b/components/playground/main.go @@ -632,8 +632,8 @@ func main() { } } playgroundReport.TakeMilliseconds = uint64(time.Since(start).Milliseconds()) - tele := telemetry.NewTelemetry() ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + tele := telemetry.NewTelemetry() err := tele.Report(ctx, teleReport) if environment.DebugMode { if err != nil { diff --git a/pkg/cluster/api/dmapi.go b/pkg/cluster/api/dmapi.go index 67e18df406..209d8285b7 100644 --- a/pkg/cluster/api/dmapi.go +++ b/pkg/cluster/api/dmapi.go @@ -15,6 +15,7 @@ package api import ( "bytes" + "context" "crypto/tls" "fmt" "strings" @@ -78,7 +79,7 @@ func (dm *DMMasterClient) getEndpoints(cmd string) (endpoints []string) { func (dm *DMMasterClient) getMember(endpoints []string) (*dmpb.ListMemberResponse, error) { resp := &dmpb.ListMemberResponse{} _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, err := dm.httpClient.Get(endpoint) + body, err := dm.httpClient.Get(context.TODO(), endpoint) if err != nil { return body, err } @@ -101,7 +102,7 @@ func (dm *DMMasterClient) getMember(endpoints []string) (*dmpb.ListMemberRespons func (dm *DMMasterClient) deleteMember(endpoints []string) (*dmpb.OfflineMemberResponse, error) { resp := &dmpb.OfflineMemberResponse{} _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, statusCode, err := dm.httpClient.Delete(endpoint, nil) + body, statusCode, err := dm.httpClient.Delete(context.TODO(), endpoint, nil) if statusCode == 404 || bytes.Contains(body, []byte("not exists")) { zap.L().Debug("member to offline does not exist, ignore.") diff --git a/pkg/cluster/api/pdapi.go b/pkg/cluster/api/pdapi.go index d0e2ca5308..3d90510c5a 100644 --- a/pkg/cluster/api/pdapi.go +++ b/pkg/cluster/api/pdapi.go @@ -15,6 +15,7 @@ package api import ( "bytes" + "context" "crypto/tls" "encoding/json" "errors" @@ -62,7 +63,7 @@ func (pc *PDClient) tryIdentifyVersion() { endpoints := pc.getEndpoints(pdVersionURI) response := map[string]string{} _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, err := pc.httpClient.Get(endpoint) + body, err := pc.httpClient.Get(context.TODO(), endpoint) if err != nil { return body, err } @@ -150,7 +151,7 @@ func (pc *PDClient) CheckHealth() error { endpoints := pc.getEndpoints(pdPingURI) _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, err := pc.httpClient.Get(endpoint) + body, err := pc.httpClient.Get(context.TODO(), endpoint) if err != nil { return body, err } @@ -174,7 +175,7 @@ func (pc *PDClient) GetStores() (*StoresInfo, error) { storesInfo := StoresInfo{} _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, err := pc.httpClient.Get(endpoint) + body, err := pc.httpClient.Get(context.TODO(), endpoint) if err != nil { return body, err } @@ -267,7 +268,7 @@ func (pc *PDClient) GetLeader() (*pdpb.Member, error) { leader := pdpb.Member{} _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, err := pc.httpClient.Get(endpoint) + body, err := pc.httpClient.Get(context.TODO(), endpoint) if err != nil { return body, err } @@ -288,7 +289,7 @@ func (pc *PDClient) GetMembers() (*pdpb.GetMembersResponse, error) { members := pdpb.GetMembersResponse{} _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, err := pc.httpClient.Get(endpoint) + body, err := pc.httpClient.Get(context.TODO(), endpoint) if err != nil { return body, err } @@ -312,7 +313,7 @@ func (pc *PDClient) GetConfig() (map[string]interface{}, error) { pdConfig := map[string]interface{}{} _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, err := pc.httpClient.Get(endpoint) + body, err := pc.httpClient.Get(context.TODO(), endpoint) if err != nil { return body, err } @@ -358,7 +359,7 @@ func (pc *PDClient) EvictPDLeader(retryOpt *utils.RetryOption) error { endpoints := pc.getEndpoints(cmd) _, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, err := pc.httpClient.Post(endpoint, nil) + body, err := pc.httpClient.Post(context.TODO(), endpoint, nil) if err != nil { return body, err } @@ -438,7 +439,7 @@ func (pc *PDClient) EvictStoreLeader(host string, retryOpt *utils.RetryOption, c endpoints := pc.getEndpoints(pdSchedulersURI) _, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) { - return pc.httpClient.Post(endpoint, bytes.NewBuffer(scheduler)) + return pc.httpClient.Post(context.TODO(), endpoint, bytes.NewBuffer(scheduler)) }) if err != nil { return err @@ -498,7 +499,7 @@ func (pc *PDClient) RemoveStoreEvict(host string) error { endpoints := pc.getEndpoints(cmd) _, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, statusCode, err := pc.httpClient.Delete(endpoint, nil) + body, statusCode, err := pc.httpClient.Delete(context.TODO(), endpoint, nil) if err != nil { if statusCode == http.StatusNotFound || bytes.Contains(body, []byte("scheduler not found")) { log.Debugf("Store leader evicting scheduler does not exist, ignore.") @@ -533,7 +534,7 @@ func (pc *PDClient) DelPD(name string, retryOpt *utils.RetryOption) error { endpoints := pc.getEndpoints(cmd) _, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, statusCode, err := pc.httpClient.Delete(endpoint, nil) + body, statusCode, err := pc.httpClient.Delete(context.TODO(), endpoint, nil) if err != nil { if statusCode == http.StatusNotFound || bytes.Contains(body, []byte("not found, pd")) { log.Debugf("PD node does not exist, ignore: %s", body) @@ -620,7 +621,7 @@ func (pc *PDClient) DelStore(host string, retryOpt *utils.RetryOption) error { endpoints := pc.getEndpoints(cmd) _, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, statusCode, err := pc.httpClient.Delete(endpoint, nil) + body, statusCode, err := pc.httpClient.Delete(context.TODO(), endpoint, nil) if err != nil { if statusCode == http.StatusNotFound || bytes.Contains(body, []byte("not found")) { log.Debugf("store %d %s does not exist, ignore: %s", storeID, host, body) @@ -673,7 +674,7 @@ func (pc *PDClient) DelStore(host string, retryOpt *utils.RetryOption) error { func (pc *PDClient) updateConfig(url string, body io.Reader) error { endpoints := pc.getEndpoints(url) _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { - return pc.httpClient.Post(endpoint, body) + return pc.httpClient.Post(context.TODO(), endpoint, body) }) return err } @@ -687,7 +688,7 @@ func (pc *PDClient) UpdateReplicateConfig(body io.Reader) error { func (pc *PDClient) GetReplicateConfig() ([]byte, error) { endpoints := pc.getEndpoints(pdConfigReplicate) return tryURLs(endpoints, func(endpoint string) ([]byte, error) { - return pc.httpClient.Get(endpoint) + return pc.httpClient.Get(context.TODO(), endpoint) }) } @@ -744,7 +745,7 @@ func (pc *PDClient) CheckRegion(state string) (*RegionsInfo, error) { regionsInfo := RegionsInfo{} _, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) { - body, err := pc.httpClient.Get(endpoint) + body, err := pc.httpClient.Get(context.TODO(), endpoint) if err != nil { return body, err } diff --git a/pkg/cluster/spec/util.go b/pkg/cluster/spec/util.go index a859706455..0e1a523ccb 100644 --- a/pkg/cluster/spec/util.go +++ b/pkg/cluster/spec/util.go @@ -15,6 +15,7 @@ package spec import ( "bytes" + "context" "crypto/tls" "fmt" "path/filepath" @@ -136,7 +137,7 @@ func statusByHost(host string, port int, path string, tlsCfg *tls.Config) string url := fmt.Sprintf("%s://%s:%d%s", scheme, host, port, path) // body doesn't have any status section needed - body, err := client.Get(url) + body, err := client.Get(context.TODO(), url) if err != nil || body == nil { return "Down" } @@ -153,7 +154,7 @@ func UptimeByHost(host string, port int, tlsCfg *tls.Config) time.Duration { client := utils.NewHTTPClient(statusQueryTimeout, tlsCfg) - body, err := client.Get(url) + body, err := client.Get(context.TODO(), url) if err != nil || body == nil { return 0 } diff --git a/pkg/cluster/template/install/local_install.sh.go b/pkg/cluster/template/install/local_install.sh.go index eddaa98e43..a5a2a45497 100644 --- a/pkg/cluster/template/install/local_install.sh.go +++ b/pkg/cluster/template/install/local_install.sh.go @@ -83,6 +83,10 @@ fi chmod 755 "$bin_dir/tiup" +# telemetry is not needed for offline installations +"$bin_dir/tiup" telemetry disable + +# set mirror to the local path "$bin_dir/tiup" mirror set ${script_dir} bold=$(tput bold 2>/dev/null) diff --git a/pkg/repository/store/txn.go b/pkg/repository/store/txn.go index e0de561fb8..73954be1c0 100644 --- a/pkg/repository/store/txn.go +++ b/pkg/repository/store/txn.go @@ -15,6 +15,7 @@ package store import ( "bytes" + "context" "fmt" "io" "os" @@ -161,7 +162,7 @@ func (t *localTxn) ReadManifest(filename string, role v1manifest.ValidManifest) case os.IsNotExist(err) && t.store.upstream != "": url := fmt.Sprintf("%s/%s", t.store.upstream, filename) client := utils.NewHTTPClient(time.Minute, nil) - body, err := client.Get(url) + body, err := client.Get(context.TODO(), url) if err != nil { return nil, errors.Annotatef(err, "fetch %s", url) } diff --git a/pkg/telemetry/telemetry.go b/pkg/telemetry/telemetry.go index 706a427d08..f6a1c7f102 100644 --- a/pkg/telemetry/telemetry.go +++ b/pkg/telemetry/telemetry.go @@ -48,7 +48,7 @@ func (t *Telemetry) Report(ctx context.Context, msg *Report) error { return errors.AddStack(err) } - if _, err = t.cli.Post(t.url, bytes.NewReader(dst)); err != nil { + if _, err = t.cli.Post(ctx, t.url, bytes.NewReader(dst)); err != nil { return errors.AddStack(err) } diff --git a/pkg/telemetry/telemetry_test.go b/pkg/telemetry/telemetry_test.go index 14e32ad943..9aa0c9a201 100644 --- a/pkg/telemetry/telemetry_test.go +++ b/pkg/telemetry/telemetry_test.go @@ -62,10 +62,10 @@ func (s *TelemetrySuite) TestReport(c *check.C) { msg := new(Report) - err := tele.Report(context.Background(), msg) + err := tele.Report(context.TODO(), msg) c.Assert(err, check.NotNil) msg.EventUUID = "dfdfdf" - err = tele.Report(context.Background(), msg) + err = tele.Report(context.TODO(), msg) c.Assert(err, check.IsNil) } diff --git a/pkg/utils/http_client.go b/pkg/utils/http_client.go index 034ed462b5..0f0065b217 100644 --- a/pkg/utils/http_client.go +++ b/pkg/utils/http_client.go @@ -14,6 +14,7 @@ package utils import ( + "context" "crypto/tls" "fmt" "io" @@ -36,7 +37,7 @@ func NewHTTPClient(timeout time.Duration, tlsConfig *tls.Config) *HTTPClient { } tr := &http.Transport{ TLSClientConfig: tlsConfig, - Dial: (&net.Dialer{Timeout: 5 * time.Second}).Dial, + Dial: (&net.Dialer{Timeout: 3 * time.Second}).Dial, } // prefer to use the inner http proxy httpProxy := os.Getenv("TIUP_INNER_HTTP_PROXY") @@ -57,8 +58,16 @@ func NewHTTPClient(timeout time.Duration, tlsConfig *tls.Config) *HTTPClient { } // Get fetch an URL with GET method and returns the response -func (c *HTTPClient) Get(url string) ([]byte, error) { - res, err := c.client.Get(url) +func (c *HTTPClient) Get(ctx context.Context, url string) ([]byte, error) { + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + if ctx != nil { + req = req.WithContext(ctx) + } + res, err := c.client.Do(req) if err != nil { return nil, err } @@ -68,8 +77,17 @@ func (c *HTTPClient) Get(url string) ([]byte, error) { } // Post send a POST request to the url and returns the response -func (c *HTTPClient) Post(url string, body io.Reader) ([]byte, error) { - res, err := c.client.Post(url, "application/json", body) +func (c *HTTPClient) Post(ctx context.Context, url string, body io.Reader) ([]byte, error) { + req, err := http.NewRequest("POST", url, body) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + + if ctx != nil { + req = req.WithContext(ctx) + } + res, err := c.client.Do(req) if err != nil { return nil, err } @@ -79,13 +97,16 @@ func (c *HTTPClient) Post(url string, body io.Reader) ([]byte, error) { } // Delete send a DELETE request to the url and returns the response and status code. -func (c *HTTPClient) Delete(url string, body io.Reader) ([]byte, int, error) { +func (c *HTTPClient) Delete(ctx context.Context, url string, body io.Reader) ([]byte, int, error) { var statusCode int req, err := http.NewRequest("DELETE", url, body) if err != nil { return nil, statusCode, err } + if ctx != nil { + req = req.WithContext(ctx) + } res, err := c.client.Do(req) if err != nil { return nil, statusCode, err