Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix telemetry timeout #1500

Merged
merged 4 commits into from
Aug 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package cmd

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -106,7 +107,7 @@ func newMirrorSignCmd() *cobra.Command {

if strings.HasPrefix(args[0], "http") {
client := utils.NewHTTPClient(time.Duration(timeout)*time.Second, nil)
data, err := client.Get(args[0])
data, err := client.Get(context.TODO(), args[0])
if err != nil {
return err
}
Expand All @@ -115,7 +116,7 @@ func newMirrorSignCmd() *cobra.Command {
return err
}

if _, err = client.Post(args[0], bytes.NewBuffer(data)); err != nil {
if _, err = client.Post(context.TODO(), args[0], bytes.NewBuffer(data)); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ func Execute() {

tiupReport.ExitCode = int32(code)
tiupReport.TakeMilliseconds = uint64(time.Since(start).Milliseconds())
tele := telemetry.NewTelemetry()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
tele := telemetry.NewTelemetry()
err := tele.Report(ctx, teleReport)
if environment.DebugMode {
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion components/cluster/command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ func Execute() {
}
clusterReport.TakeMilliseconds = uint64(time.Since(start).Milliseconds())
clusterReport.Command = strings.Join(teleCommand, " ")
tele := telemetry.NewTelemetry()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
tele := telemetry.NewTelemetry()
err := tele.Report(ctx, teleReport)
if environment.DebugMode {
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,8 @@ func main() {
}
}
playgroundReport.TakeMilliseconds = uint64(time.Since(start).Milliseconds())
tele := telemetry.NewTelemetry()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
tele := telemetry.NewTelemetry()
err := tele.Report(ctx, teleReport)
if environment.DebugMode {
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/cluster/api/dmapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package api

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"strings"
Expand Down Expand Up @@ -78,7 +79,7 @@ func (dm *DMMasterClient) getEndpoints(cmd string) (endpoints []string) {
func (dm *DMMasterClient) getMember(endpoints []string) (*dmpb.ListMemberResponse, error) {
resp := &dmpb.ListMemberResponse{}
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := dm.httpClient.Get(endpoint)
body, err := dm.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand All @@ -101,7 +102,7 @@ func (dm *DMMasterClient) getMember(endpoints []string) (*dmpb.ListMemberRespons
func (dm *DMMasterClient) deleteMember(endpoints []string) (*dmpb.OfflineMemberResponse, error) {
resp := &dmpb.OfflineMemberResponse{}
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, statusCode, err := dm.httpClient.Delete(endpoint, nil)
body, statusCode, err := dm.httpClient.Delete(context.TODO(), endpoint, nil)

if statusCode == 404 || bytes.Contains(body, []byte("not exists")) {
zap.L().Debug("member to offline does not exist, ignore.")
Expand Down
29 changes: 15 additions & 14 deletions pkg/cluster/api/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package api

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
Expand Down Expand Up @@ -62,7 +63,7 @@ func (pc *PDClient) tryIdentifyVersion() {
endpoints := pc.getEndpoints(pdVersionURI)
response := map[string]string{}
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand Down Expand Up @@ -150,7 +151,7 @@ func (pc *PDClient) CheckHealth() error {
endpoints := pc.getEndpoints(pdPingURI)

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand All @@ -174,7 +175,7 @@ func (pc *PDClient) GetStores() (*StoresInfo, error) {
storesInfo := StoresInfo{}

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand Down Expand Up @@ -267,7 +268,7 @@ func (pc *PDClient) GetLeader() (*pdpb.Member, error) {
leader := pdpb.Member{}

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand All @@ -288,7 +289,7 @@ func (pc *PDClient) GetMembers() (*pdpb.GetMembersResponse, error) {
members := pdpb.GetMembersResponse{}

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand All @@ -312,7 +313,7 @@ func (pc *PDClient) GetConfig() (map[string]interface{}, error) {
pdConfig := map[string]interface{}{}

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand Down Expand Up @@ -358,7 +359,7 @@ func (pc *PDClient) EvictPDLeader(retryOpt *utils.RetryOption) error {
endpoints := pc.getEndpoints(cmd)

_, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Post(endpoint, nil)
body, err := pc.httpClient.Post(context.TODO(), endpoint, nil)
if err != nil {
return body, err
}
Expand Down Expand Up @@ -438,7 +439,7 @@ func (pc *PDClient) EvictStoreLeader(host string, retryOpt *utils.RetryOption, c
endpoints := pc.getEndpoints(pdSchedulersURI)

_, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) {
return pc.httpClient.Post(endpoint, bytes.NewBuffer(scheduler))
return pc.httpClient.Post(context.TODO(), endpoint, bytes.NewBuffer(scheduler))
})
if err != nil {
return err
Expand Down Expand Up @@ -498,7 +499,7 @@ func (pc *PDClient) RemoveStoreEvict(host string) error {
endpoints := pc.getEndpoints(cmd)

_, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, statusCode, err := pc.httpClient.Delete(endpoint, nil)
body, statusCode, err := pc.httpClient.Delete(context.TODO(), endpoint, nil)
if err != nil {
if statusCode == http.StatusNotFound || bytes.Contains(body, []byte("scheduler not found")) {
log.Debugf("Store leader evicting scheduler does not exist, ignore.")
Expand Down Expand Up @@ -533,7 +534,7 @@ func (pc *PDClient) DelPD(name string, retryOpt *utils.RetryOption) error {
endpoints := pc.getEndpoints(cmd)

_, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, statusCode, err := pc.httpClient.Delete(endpoint, nil)
body, statusCode, err := pc.httpClient.Delete(context.TODO(), endpoint, nil)
if err != nil {
if statusCode == http.StatusNotFound || bytes.Contains(body, []byte("not found, pd")) {
log.Debugf("PD node does not exist, ignore: %s", body)
Expand Down Expand Up @@ -620,7 +621,7 @@ func (pc *PDClient) DelStore(host string, retryOpt *utils.RetryOption) error {
endpoints := pc.getEndpoints(cmd)

_, err = tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, statusCode, err := pc.httpClient.Delete(endpoint, nil)
body, statusCode, err := pc.httpClient.Delete(context.TODO(), endpoint, nil)
if err != nil {
if statusCode == http.StatusNotFound || bytes.Contains(body, []byte("not found")) {
log.Debugf("store %d %s does not exist, ignore: %s", storeID, host, body)
Expand Down Expand Up @@ -673,7 +674,7 @@ func (pc *PDClient) DelStore(host string, retryOpt *utils.RetryOption) error {
func (pc *PDClient) updateConfig(url string, body io.Reader) error {
endpoints := pc.getEndpoints(url)
_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
return pc.httpClient.Post(endpoint, body)
return pc.httpClient.Post(context.TODO(), endpoint, body)
})
return err
}
Expand All @@ -687,7 +688,7 @@ func (pc *PDClient) UpdateReplicateConfig(body io.Reader) error {
func (pc *PDClient) GetReplicateConfig() ([]byte, error) {
endpoints := pc.getEndpoints(pdConfigReplicate)
return tryURLs(endpoints, func(endpoint string) ([]byte, error) {
return pc.httpClient.Get(endpoint)
return pc.httpClient.Get(context.TODO(), endpoint)
})
}

Expand Down Expand Up @@ -744,7 +745,7 @@ func (pc *PDClient) CheckRegion(state string) (*RegionsInfo, error) {
regionsInfo := RegionsInfo{}

_, err := tryURLs(endpoints, func(endpoint string) ([]byte, error) {
body, err := pc.httpClient.Get(endpoint)
body, err := pc.httpClient.Get(context.TODO(), endpoint)
if err != nil {
return body, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/cluster/spec/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package spec

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -136,7 +137,7 @@ func statusByHost(host string, port int, path string, tlsCfg *tls.Config) string
url := fmt.Sprintf("%s://%s:%d%s", scheme, host, port, path)

// body doesn't have any status section needed
body, err := client.Get(url)
body, err := client.Get(context.TODO(), url)
if err != nil || body == nil {
return "Down"
}
Expand All @@ -153,7 +154,7 @@ func UptimeByHost(host string, port int, tlsCfg *tls.Config) time.Duration {

client := utils.NewHTTPClient(statusQueryTimeout, tlsCfg)

body, err := client.Get(url)
body, err := client.Get(context.TODO(), url)
if err != nil || body == nil {
return 0
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/cluster/template/install/local_install.sh.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ fi

chmod 755 "$bin_dir/tiup"

# telemetry is not needed for offline installations
"$bin_dir/tiup" telemetry disable

# set mirror to the local path
"$bin_dir/tiup" mirror set ${script_dir}

bold=$(tput bold 2>/dev/null)
Expand Down
3 changes: 2 additions & 1 deletion pkg/repository/store/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package store

import (
"bytes"
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -161,7 +162,7 @@ func (t *localTxn) ReadManifest(filename string, role v1manifest.ValidManifest)
case os.IsNotExist(err) && t.store.upstream != "":
url := fmt.Sprintf("%s/%s", t.store.upstream, filename)
client := utils.NewHTTPClient(time.Minute, nil)
body, err := client.Get(url)
body, err := client.Get(context.TODO(), url)
if err != nil {
return nil, errors.Annotatef(err, "fetch %s", url)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (t *Telemetry) Report(ctx context.Context, msg *Report) error {
return errors.AddStack(err)
}

if _, err = t.cli.Post(t.url, bytes.NewReader(dst)); err != nil {
if _, err = t.cli.Post(ctx, t.url, bytes.NewReader(dst)); err != nil {
return errors.AddStack(err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ func (s *TelemetrySuite) TestReport(c *check.C) {

msg := new(Report)

err := tele.Report(context.Background(), msg)
err := tele.Report(context.TODO(), msg)
c.Assert(err, check.NotNil)

msg.EventUUID = "dfdfdf"
err = tele.Report(context.Background(), msg)
err = tele.Report(context.TODO(), msg)
c.Assert(err, check.IsNil)
}
33 changes: 27 additions & 6 deletions pkg/utils/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package utils

import (
"context"
"crypto/tls"
"fmt"
"io"
Expand All @@ -36,7 +37,7 @@ func NewHTTPClient(timeout time.Duration, tlsConfig *tls.Config) *HTTPClient {
}
tr := &http.Transport{
TLSClientConfig: tlsConfig,
Dial: (&net.Dialer{Timeout: 5 * time.Second}).Dial,
Dial: (&net.Dialer{Timeout: 3 * time.Second}).Dial,
}
// prefer to use the inner http proxy
httpProxy := os.Getenv("TIUP_INNER_HTTP_PROXY")
Expand All @@ -57,8 +58,16 @@ func NewHTTPClient(timeout time.Duration, tlsConfig *tls.Config) *HTTPClient {
}

// Get fetch an URL with GET method and returns the response
func (c *HTTPClient) Get(url string) ([]byte, error) {
res, err := c.client.Get(url)
func (c *HTTPClient) Get(ctx context.Context, url string) ([]byte, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}

if ctx != nil {
req = req.WithContext(ctx)
}
res, err := c.client.Do(req)
if err != nil {
return nil, err
}
Expand All @@ -68,8 +77,17 @@ func (c *HTTPClient) Get(url string) ([]byte, error) {
}

// Post send a POST request to the url and returns the response
func (c *HTTPClient) Post(url string, body io.Reader) ([]byte, error) {
res, err := c.client.Post(url, "application/json", body)
func (c *HTTPClient) Post(ctx context.Context, url string, body io.Reader) ([]byte, error) {
req, err := http.NewRequest("POST", url, body)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")

if ctx != nil {
req = req.WithContext(ctx)
}
res, err := c.client.Do(req)
if err != nil {
return nil, err
}
Expand All @@ -79,13 +97,16 @@ func (c *HTTPClient) Post(url string, body io.Reader) ([]byte, error) {
}

// Delete send a DELETE request to the url and returns the response and status code.
func (c *HTTPClient) Delete(url string, body io.Reader) ([]byte, int, error) {
func (c *HTTPClient) Delete(ctx context.Context, url string, body io.Reader) ([]byte, int, error) {
var statusCode int
req, err := http.NewRequest("DELETE", url, body)
if err != nil {
return nil, statusCode, err
}

if ctx != nil {
req = req.WithContext(ctx)
}
res, err := c.client.Do(req)
if err != nil {
return nil, statusCode, err
Expand Down