Skip to content

Commit

Permalink
Add query subcommand to profilecli for downloading pprof from phlare (g…
Browse files Browse the repository at this point in the history
…rafana/phlare#475)

* Add query subcommand to download pprof from phlare

Usage:

```
$ profilecli query merge --from now-5m --output pprof=my.pprof

$ profilecli query merge --from now-5m --profile-type memory:alloc_space:bytes:space:bytes
&googlev1.Profile{
  SampleType: []*googlev1.ValueType{
    &googlev1.ValueType{
      Type: 1,
      Unit: 2,
    },
  },

[...]
  DropFrames:    0,
  KeepFrames:    0,
  TimeNanos:     1673453999823000000,
  DurationNanos: 300000000000,
  PeriodType:    &googlev1.ValueType{
    Type:          434,
    Unit:          2,
  },
  Period:            524288,
  Comment:           []int64(nil),
  DefaultSampleType: 1,
}
```

* Use parsed profile output for console by default

Previous output is available with --output=raw

* Implement gzip compression for pprof file

This will also change the behaviour, when create a pprof file, it will
fail when the file already exists.
  • Loading branch information
simonswine authored Jan 17, 2023
1 parent 45ef1bc commit 7b1bbe2
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 1 deletion.
14 changes: 14 additions & 0 deletions cmd/profilecli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,15 @@ func main() {
parquetInspectCmd := parquetCmd.Command("inspect", "Inspect a parquet file's structure.")
parquetInspectFiles := parquetInspectCmd.Arg("file", "parquet file path").Required().ExistingFiles()

queryCmd := app.Command("query", "Query profile store.")
queryParams := addQueryParams(queryCmd)
queryOutput := queryCmd.Flag("output", "How to output the result, examples: console, raw, pprof=./my.pprof").Default("console").String()
queryMergeCmd := queryCmd.Command("merge", "Request merged profile.")

// parse command line arguments
parsedCmd := kingpin.MustParse(app.Parse(os.Args[1:]))

// enable verbose logging if requested
if !cfg.verbose {
logger = level.NewFilter(logger, level.AllowWarn())
}
Expand All @@ -63,7 +70,14 @@ func main() {
os.Exit(checkError(err))
}
}
case queryMergeCmd.FullCommand():
if err := queryMerge(ctx, queryParams, *queryOutput); err != nil {
os.Exit(checkError(err))
}
default:
level.Error(logger).Log("msg", "unknown command", "cmd", parsedCmd)
}

}

func checkError(err error) int {
Expand Down
188 changes: 188 additions & 0 deletions cmd/profilecli/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package main

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

"github.com/bufbuild/connect-go"
"github.com/go-kit/log/level"
gprofile "github.com/google/pprof/profile"
"github.com/grafana/dskit/runutil"
"github.com/k0kubun/pp/v3"
"github.com/klauspost/compress/gzip"
"github.com/mattn/go-isatty"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"gopkg.in/alecthomas/kingpin.v2"

querierv1 "github.com/grafana/phlare/api/gen/proto/go/querier/v1"
"github.com/grafana/phlare/api/gen/proto/go/querier/v1/querierv1connect"
)

const (
outputConsole = "console"
outputRaw = "raw"
outputPprof = "pprof="
)

func parseTime(s string) (time.Time, error) {
if s == "" {
return time.Time{}, fmt.Errorf("empty time")
}
t, err := time.Parse(time.RFC3339, s)
if err == nil {
return t, nil
}

// try if it is a relative time
d, rerr := parseRelativeTime(s)
if rerr == nil {
return time.Now().Add(-d), nil
}

// if not return first error
return time.Time{}, err

}

func parseRelativeTime(s string) (time.Duration, error) {
s = strings.TrimSpace(s)
if s == "now" {
return 0, nil
}
s = strings.TrimPrefix(s, "now-")

d, err := model.ParseDuration(s)
if err != nil {
return 0, err
}
return time.Duration(d), nil
}

type queryParams struct {
URL string
From string
To string
ProfileType string
Query string
}

func (p *queryParams) parseFromTo() (from time.Time, to time.Time, err error) {
from, err = parseTime(p.From)
if err != nil {
return time.Time{}, time.Time{}, errors.Wrap(err, "failed to parse from")
}
to, err = parseTime(p.To)
if err != nil {
return time.Time{}, time.Time{}, errors.Wrap(err, "failed to parse to")
}

if to.Before(from) {
return time.Time{}, time.Time{}, errors.Wrap(err, "from cannot be after")
}

return from, to, nil
}

func (p *queryParams) client() querierv1connect.QuerierServiceClient {
return querierv1connect.NewQuerierServiceClient(
http.DefaultClient,
p.URL,
)
}

type flagger interface {
Flag(name, help string) *kingpin.FlagClause
}

func addQueryParams(queryCmd flagger) *queryParams {
params := &queryParams{}
queryCmd.Flag("url", "URL of the profile store.").Default("http://localhost:4100").StringVar(&params.URL)
queryCmd.Flag("from", "Beginning of the query.").Default("now-1h").StringVar(&params.From)
queryCmd.Flag("to", "End of the query.").Default("now").StringVar(&params.To)
queryCmd.Flag("profile-type", "Profile type to query.").Default("process_cpu:cpu:nanoseconds:cpu:nanoseconds").StringVar(&params.ProfileType)
queryCmd.Flag("query", "Label selector to query.").Default("{}").StringVar(&params.Query)
return params
}

func queryMerge(ctx context.Context, params *queryParams, outputFlag string) (err error) {
from, to, err := params.parseFromTo()
if err != nil {
return err
}

level.Info(logger).Log("msg", "query aggregated profile from profile store", "url", params.URL, "from", from, "to", to, "query", params.Query, "type", params.ProfileType)

qc := params.client()

resp, err := qc.SelectMergeProfile(ctx, connect.NewRequest(&querierv1.SelectMergeProfileRequest{
ProfileTypeID: params.ProfileType,
Start: from.UnixMilli(),
End: to.UnixMilli(),
LabelSelector: params.Query,
}))

if err != nil {
return errors.Wrap(err, "failed to query")
}

mypp := pp.New()
mypp.SetColoringEnabled(isatty.IsTerminal(os.Stdout.Fd()))
mypp.SetExportedOnly(true)

if outputFlag == outputConsole {
buf, err := resp.Msg.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

p, err := gprofile.Parse(bytes.NewReader(buf))
if err != nil {
return errors.Wrap(err, "failed to parse profile")
}

fmt.Fprintln(output(ctx), p.String())
return nil

}

if outputFlag == outputRaw {
mypp.Print(resp.Msg)
return nil
}

if strings.HasPrefix(outputFlag, outputPprof) {
filePath := strings.TrimPrefix(outputFlag, outputPprof)
if filePath == "" {
return errors.New("no file path specified after pprof=")
}
buf, err := resp.Msg.MarshalVT()
if err != nil {
return errors.Wrap(err, "failed to marshal protobuf")
}

// open new file, fail when the file already exists
f, err := os.OpenFile(filePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0644)
if err != nil {
return errors.Wrap(err, "failed to create pprof file")
}
defer runutil.CloseWithErrCapture(&err, f, "failed to close pprof file")

gzipWriter := gzip.NewWriter(f)
defer runutil.CloseWithErrCapture(&err, gzipWriter, "failed to close pprof gzip writer")

if _, err := io.Copy(gzipWriter, bytes.NewReader(buf)); err != nil {
return errors.Wrap(err, "failed to write pprof")
}

return nil
}

return errors.Errorf("unknown output %s", outputFlag)
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.15.0
github.com/json-iterator/go v1.1.12
github.com/k0kubun/pp/v3 v3.2.0
github.com/klauspost/compress v1.15.13
github.com/mattn/go-isatty v0.0.16
github.com/minio/minio-go/v7 v7.0.45
github.com/mitchellh/go-wordwrap v1.0.1
github.com/oklog/ulid v1.3.1
Expand Down Expand Up @@ -177,7 +179,6 @@ require (
github.com/linode/linodego v1.9.3 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.50 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k0kubun/pp/v3 v3.2.0 h1:h33hNTZ9nVFNP3u2Fsgz8JXiF5JINoZfFq4SvKJwNcs=
github.com/k0kubun/pp/v3 v3.2.0/go.mod h1:ODtJQbQcIRfAD3N+theGCV1m/CBxweERz2dapdz1EwA=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
Expand Down

0 comments on commit 7b1bbe2

Please sign in to comment.