diff --git a/cmd/profilecli/main.go b/cmd/profilecli/main.go index 539d1a574a..011e6762d1 100644 --- a/cmd/profilecli/main.go +++ b/cmd/profilecli/main.go @@ -48,8 +48,15 @@ func main() { parquetInspectCmd := parquetCmd.Command("inspect", "Inspect a parquet file's structure.") parquetInspectFiles := parquetInspectCmd.Arg("file", "parquet file path").Required().ExistingFiles() + queryCmd := app.Command("query", "Query profile store.") + queryParams := addQueryParams(queryCmd) + queryOutput := queryCmd.Flag("output", "How to output the result, examples: console, raw, pprof=./my.pprof").Default("console").String() + queryMergeCmd := queryCmd.Command("merge", "Request merged profile.") + + // parse command line arguments parsedCmd := kingpin.MustParse(app.Parse(os.Args[1:])) + // enable verbose logging if requested if !cfg.verbose { logger = level.NewFilter(logger, level.AllowWarn()) } @@ -63,7 +70,14 @@ func main() { os.Exit(checkError(err)) } } + case queryMergeCmd.FullCommand(): + if err := queryMerge(ctx, queryParams, *queryOutput); err != nil { + os.Exit(checkError(err)) + } + default: + level.Error(logger).Log("msg", "unknown command", "cmd", parsedCmd) } + } func checkError(err error) int { diff --git a/cmd/profilecli/query.go b/cmd/profilecli/query.go new file mode 100644 index 0000000000..41c93171b5 --- /dev/null +++ b/cmd/profilecli/query.go @@ -0,0 +1,188 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "os" + "strings" + "time" + + "github.com/bufbuild/connect-go" + "github.com/go-kit/log/level" + gprofile "github.com/google/pprof/profile" + "github.com/grafana/dskit/runutil" + "github.com/k0kubun/pp/v3" + "github.com/klauspost/compress/gzip" + "github.com/mattn/go-isatty" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "gopkg.in/alecthomas/kingpin.v2" + + querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1" + "github.com/grafana/phlare/api/gen/proto/go/querier/v1/querierv1connect" +) + +const ( + outputConsole = "console" + outputRaw = "raw" + outputPprof = "pprof=" +) + +func parseTime(s string) (time.Time, error) { + if s == "" { + return time.Time{}, fmt.Errorf("empty time") + } + t, err := time.Parse(time.RFC3339, s) + if err == nil { + return t, nil + } + + // try if it is a relative time + d, rerr := parseRelativeTime(s) + if rerr == nil { + return time.Now().Add(-d), nil + } + + // if not return first error + return time.Time{}, err + +} + +func parseRelativeTime(s string) (time.Duration, error) { + s = strings.TrimSpace(s) + if s == "now" { + return 0, nil + } + s = strings.TrimPrefix(s, "now-") + + d, err := model.ParseDuration(s) + if err != nil { + return 0, err + } + return time.Duration(d), nil +} + +type queryParams struct { + URL string + From string + To string + ProfileType string + Query string +} + +func (p *queryParams) parseFromTo() (from time.Time, to time.Time, err error) { + from, err = parseTime(p.From) + if err != nil { + return time.Time{}, time.Time{}, errors.Wrap(err, "failed to parse from") + } + to, err = parseTime(p.To) + if err != nil { + return time.Time{}, time.Time{}, errors.Wrap(err, "failed to parse to") + } + + if to.Before(from) { + return time.Time{}, time.Time{}, errors.Wrap(err, "from cannot be after") + } + + return from, to, nil +} + +func (p *queryParams) client() querierv1connect.QuerierServiceClient { + return querierv1connect.NewQuerierServiceClient( + http.DefaultClient, + p.URL, + ) +} + +type flagger interface { + Flag(name, help string) *kingpin.FlagClause +} + +func addQueryParams(queryCmd flagger) *queryParams { + params := &queryParams{} + queryCmd.Flag("url", "URL of the profile store.").Default("http://localhost:4100").StringVar(¶ms.URL) + queryCmd.Flag("from", "Beginning of the query.").Default("now-1h").StringVar(¶ms.From) + queryCmd.Flag("to", "End of the query.").Default("now").StringVar(¶ms.To) + queryCmd.Flag("profile-type", "Profile type to query.").Default("process_cpu:cpu:nanoseconds:cpu:nanoseconds").StringVar(¶ms.ProfileType) + queryCmd.Flag("query", "Label selector to query.").Default("{}").StringVar(¶ms.Query) + return params +} + +func queryMerge(ctx context.Context, params *queryParams, outputFlag string) (err error) { + from, to, err := params.parseFromTo() + if err != nil { + return err + } + + level.Info(logger).Log("msg", "query aggregated profile from profile store", "url", params.URL, "from", from, "to", to, "query", params.Query, "type", params.ProfileType) + + qc := params.client() + + resp, err := qc.SelectMergeProfile(ctx, connect.NewRequest(&querierv1.SelectMergeProfileRequest{ + ProfileTypeID: params.ProfileType, + Start: from.UnixMilli(), + End: to.UnixMilli(), + LabelSelector: params.Query, + })) + + if err != nil { + return errors.Wrap(err, "failed to query") + } + + mypp := pp.New() + mypp.SetColoringEnabled(isatty.IsTerminal(os.Stdout.Fd())) + mypp.SetExportedOnly(true) + + if outputFlag == outputConsole { + buf, err := resp.Msg.MarshalVT() + if err != nil { + return errors.Wrap(err, "failed to marshal protobuf") + } + + p, err := gprofile.Parse(bytes.NewReader(buf)) + if err != nil { + return errors.Wrap(err, "failed to parse profile") + } + + fmt.Fprintln(output(ctx), p.String()) + return nil + + } + + if outputFlag == outputRaw { + mypp.Print(resp.Msg) + return nil + } + + if strings.HasPrefix(outputFlag, outputPprof) { + filePath := strings.TrimPrefix(outputFlag, outputPprof) + if filePath == "" { + return errors.New("no file path specified after pprof=") + } + buf, err := resp.Msg.MarshalVT() + if err != nil { + return errors.Wrap(err, "failed to marshal protobuf") + } + + // open new file, fail when the file already exists + f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644) + if err != nil { + return errors.Wrap(err, "failed to create pprof file") + } + defer runutil.CloseWithErrCapture(&err, f, "failed to close pprof file") + + gzipWriter := gzip.NewWriter(f) + defer runutil.CloseWithErrCapture(&err, gzipWriter, "failed to close pprof gzip writer") + + if _, err := io.Copy(gzipWriter, bytes.NewReader(buf)); err != nil { + return errors.Wrap(err, "failed to write pprof") + } + + return nil + } + + return errors.Errorf("unknown output %s", outputFlag) +} diff --git a/go.mod b/go.mod index 5c35a2cd44..a53a6a97aa 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,9 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0 github.com/json-iterator/go v1.1.12 + github.com/k0kubun/pp/v3 v3.2.0 github.com/klauspost/compress v1.15.13 + github.com/mattn/go-isatty v0.0.16 github.com/minio/minio-go/v7 v7.0.45 github.com/mitchellh/go-wordwrap v1.0.1 github.com/oklog/ulid v1.3.1 @@ -177,7 +179,6 @@ require ( github.com/linode/linodego v1.9.3 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/miekg/dns v1.1.50 // indirect diff --git a/go.sum b/go.sum index f92cc87b48..37aa1e9de6 100644 --- a/go.sum +++ b/go.sum @@ -584,6 +584,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= +github.com/k0kubun/pp/v3 v3.2.0 h1:h33hNTZ9nVFNP3u2Fsgz8JXiF5JINoZfFq4SvKJwNcs= +github.com/k0kubun/pp/v3 v3.2.0/go.mod h1:ODtJQbQcIRfAD3N+theGCV1m/CBxweERz2dapdz1EwA= github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4= github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=