Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
Fixes no profiles to merge error (#559)
Browse files Browse the repository at this point in the history
* Fixes no profiles to merge error.

* Fixes tests comparing timestamp
  • Loading branch information
cyriltovena authored Mar 8, 2023
1 parent c0a305a commit 006ad25
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 15 deletions.
16 changes: 16 additions & 0 deletions pkg/model/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strings"

"github.com/gogo/status"
"github.com/google/pprof/profile"
"github.com/prometheus/prometheus/model/labels"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -45,3 +46,18 @@ func SelectorFromProfileType(profileType *typesv1.ProfileType) *labels.Matcher {
Value: profileType.Name + ":" + profileType.SampleType + ":" + profileType.SampleUnit + ":" + profileType.PeriodType + ":" + profileType.PeriodUnit,
}
}

// SetProfileMetadata sets the metadata on the profile.
func SetProfileMetadata(p *profile.Profile, ty *typesv1.ProfileType) {
p.SampleType = []*profile.ValueType{{Type: ty.SampleType, Unit: ty.SampleUnit}}
p.DefaultSampleType = ty.SampleType
p.PeriodType = &profile.ValueType{Type: ty.PeriodType, Unit: ty.PeriodUnit}
switch ty.Name {
case "process_cpu": // todo: this should support other types of cpu profiles
p.Period = 1000000000
case "memory":
p.Period = 512 * 1024
default:
p.Period = 1
}
}
17 changes: 5 additions & 12 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,19 +706,12 @@ func (q Queriers) MergeProfilesPprof(ctx context.Context, stream *connect.BidiSt
if err := g.Wait(); err != nil {
return err
}
if len(result) == 0 {
result = append(result, &profile.Profile{})
}
for _, p := range result {
p.SampleType = []*profile.ValueType{{Type: r.Request.Type.SampleType, Unit: r.Request.Type.SampleUnit}}
p.DefaultSampleType = r.Request.Type.SampleType
p.PeriodType = &profile.ValueType{Type: r.Request.Type.PeriodType, Unit: r.Request.Type.PeriodUnit}
p.TimeNanos = model.Time(r.Request.Start).UnixNano()
switch r.Request.Type.Name {
case "process_cpu":
p.Period = 1000000000
case "memory":
p.Period = 512 * 1024
default:
p.Period = 1
}
phlaremodel.SetProfileMetadata(p, request.Type)
p.TimeNanos = model.Time(r.Request.End).UnixNano()
}
p, err := profile.Merge(result)
if err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/phlaredb/phlaredb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,22 @@ func TestMergeProfilesPprof(t *testing.T) {
}))
require.NoError(t, bidi.CloseRequest())
})

t.Run("timerange with no Profiles", func(t *testing.T) {
bidi := client.MergeProfilesPprof(ctx)
require.NoError(t, bidi.Send(&ingestv1.MergeProfilesPprofRequest{
Request: &ingestv1.SelectProfilesRequest{
LabelSelector: `{pod="my-pod"}`,
Type: mustParseProfileSelector(t, "process_cpu:cpu:nanoseconds:cpu:nanoseconds"),
Start: 0,
End: 1,
},
}))
_, err := bidi.Receive()
require.NoError(t, err)
_, err = bidi.Receive()
require.NoError(t, err)
})
}

func TestFilterProfiles(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,12 @@ func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[q
}

// merge all profiles
profile, err := selectMergePprofProfile(gCtx, responses)
profile, err := selectMergePprofProfile(gCtx, profileType, responses)
if err != nil {
return nil, err
}
profile.DurationNanos = model.Time(req.Msg.End).UnixNano() - model.Time(req.Msg.Start).UnixNano()

profile.TimeNanos = model.Time(req.Msg.End).UnixNano()
return connect.NewResponse(profile), nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ func Test_SelectMergeProfile(t *testing.T) {

expected := pprofth.FooBarProfile.Copy()
expected.DurationNanos = model.Time(req.Msg.End).UnixNano() - model.Time(req.Msg.Start).UnixNano()
expected.TimeNanos = model.Time(req.Msg.End).UnixNano()
for _, s := range expected.Sample {
s.Value[0] = s.Value[0] * 2
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/querier/select_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func selectMergeStacktraces(ctx context.Context, responses []responseFromIngeste
}

// selectMergePprofProfile selects the profile from each ingester by deduping them and request merges of stacktraces in the pprof format.
func selectMergePprofProfile(ctx context.Context, responses []responseFromIngesters[clientpool.BidiClientMergeProfilesPprof]) (*googlev1.Profile, error) {
func selectMergePprofProfile(ctx context.Context, ty *typesv1.ProfileType, responses []responseFromIngesters[clientpool.BidiClientMergeProfilesPprof]) (*googlev1.Profile, error) {
mergeResults := make([]MergeResult[[]byte], len(responses))
iters := make([]MergeIterator, len(responses))
for i, resp := range responses {
Expand Down Expand Up @@ -389,6 +389,12 @@ func selectMergePprofProfile(ctx context.Context, responses []responseFromIngest
if err := g.Wait(); err != nil {
return nil, err
}

if len(results) == 0 {
empty := &profile.Profile{}
phlaremodel.SetProfileMetadata(empty, ty)
return pprof.FromProfile(empty)
}
p, err := profile.Merge(results)
if err != nil {
return nil, err
Expand Down

0 comments on commit 006ad25

Please sign in to comment.