Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
feat: support for tree truncation when converting to flamebearer (#634)
Browse files Browse the repository at this point in the history
* wip

* fix

* switched to a much more performant version of truncation

* improvements
  • Loading branch information
petethepig authored Apr 26, 2023
1 parent f09f749 commit f90edba
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 13 deletions.
14 changes: 13 additions & 1 deletion api/gen/proto/go/querier/v1/querier.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions api/gen/proto/go/querier/v1/querier_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions api/querier/v1/querier.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ message SelectMergeStacktracesRequest {
string label_selector = 2;
int64 start = 3; // milliseconds since epoch
int64 end = 4; // milliseconds since epoch
optional int64 max_nodes = 5; // Limit the nodes returned to only show the node with the max_node's biggest total
}

message SelectMergeStacktracesResponse {
Expand Down
56 changes: 55 additions & 1 deletion pkg/querier/flamegraph.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package querier

import (
"container/heap"

"github.com/pyroscope-io/pyroscope/pkg/storage/metadata"
"github.com/pyroscope-io/pyroscope/pkg/structs/flamebearer"
"github.com/samber/lo"
Expand All @@ -15,7 +17,7 @@ type stackNode struct {
node *node
}

func NewFlameGraph(t *tree) *querierv1.FlameGraph {
func NewFlameGraph(t *tree, maxNodes int64) *querierv1.FlameGraph {
var total, max int64
for _, node := range t.root {
total += node.total
Expand All @@ -29,6 +31,8 @@ func NewFlameGraph(t *tree) *querierv1.FlameGraph {
}
}()

minVal := t.minValue(maxNodes)

stack := stackNodePool.Get().(*Stack[stackNode])
defer stackNodePool.Put(stack)
stack.Reset()
Expand Down Expand Up @@ -71,11 +75,27 @@ func NewFlameGraph(t *tree) *querierv1.FlameGraph {
level.Push(int64(current.xOffset))
current.xOffset += int(current.node.self)

otherTotal := int64(0)
for _, child := range current.node.children {
if child.total >= minVal && child.name != "other" {
stack.Push(stackNode{xOffset: current.xOffset, level: current.level + 1, node: child})
current.xOffset += int(child.total)
} else {
otherTotal += child.total
}
}
if otherTotal != 0 {
child := &node{
name: "other",
parent: current.node,
self: otherTotal,
total: otherTotal,
}
stack.Push(stackNode{xOffset: current.xOffset, level: current.level + 1, node: child})
current.xOffset += int(child.total)
}
}

result := make([][]int64, len(res))
for i := range result {
result[i] = res[i].Slice()
Expand Down Expand Up @@ -138,3 +158,37 @@ func ExportToFlamebearer(fg *querierv1.FlameGraph, profileType *typesv1.ProfileT
},
}
}

// minValue returns the minimum "total" value a node in a tree has to have to show up in
// the resulting flamegraph
func (t *tree) minValue(maxNodes int64) int64 {
if maxNodes == -1 { // -1 means show all nodes
return 0
}
nodes := t.root

mh := &minHeap{}
heap.Init(mh)

for len(nodes) > 0 {
node := nodes[0]
nodes = nodes[1:]
number := node.total

if mh.Len() < int(maxNodes) {
heap.Push(mh, number)
} else {
if number > (*mh)[0] {
heap.Pop(mh)
heap.Push(mh, number)
nodes = append(node.children, nodes...)
}
}
}

if mh.Len() < int(maxNodes) {
return 0
}

return (*mh)[0]
}
3 changes: 2 additions & 1 deletion pkg/querier/flamegraph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func Test_ExportToFlamebearer(t *testing.T) {
value: 1,
},
}),
-1,
), &typesv1.ProfileType{
ID: "memory:inuse_space:bytes:space:bytes",
Name: "memory",
Expand All @@ -75,6 +76,6 @@ func BenchmarkFlamegraph(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
f = NewFlameGraph(tr)
f = NewFlameGraph(tr, -1)
}
}
25 changes: 16 additions & 9 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"

"github.com/bufbuild/connect-go"
Expand Down Expand Up @@ -136,17 +137,23 @@ func parseSelectProfilesRequest(req *http.Request) (*querierv1.SelectMergeStackt
}

v := req.URL.Query()

// parse time using pyroscope's attime parser
start := model.TimeFromUnixNano(attime.Parse(v.Get("from")).UnixNano())
end := model.TimeFromUnixNano(attime.Parse(v.Get("until")).UnixNano())

return &querierv1.SelectMergeStacktracesRequest{
Start: int64(start),
End: int64(end),
p := &querierv1.SelectMergeStacktracesRequest{
LabelSelector: selector,
ProfileTypeID: ptype.ID,
}, ptype, nil
Start: int64(model.TimeFromUnixNano(attime.Parse(v.Get("from")).UnixNano())),
End: int64(model.TimeFromUnixNano(attime.Parse(v.Get("until")).UnixNano())),
}

var mn int64
if v, err := strconv.Atoi(v.Get("max-nodes")); err == nil && v != 0 {
mn = int64(v)
}
if v, err := strconv.Atoi(v.Get("maxNodes")); err == nil && v != 0 {
mn = int64(v)
}
p.MaxNodes = &mn

return p, ptype, nil
}

func parseQuery(req *http.Request) (string, *typesv1.ProfileType, error) {
Expand Down
31 changes: 31 additions & 0 deletions pkg/querier/minheap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package querier

// minHeap is a custom min-heap data structure that stores integers.
type minHeap []int64

// Len returns the number of elements in the min-heap.
func (h minHeap) Len() int { return len(h) }

// Less returns true if the element at index i is less than the element at index j.
// This method is used by the container/heap package to maintain the min-heap property.
func (h minHeap) Less(i, j int) bool { return h[i] < h[j] }

// Swap exchanges the elements at index i and index j.
// This method is used by the container/heap package to reorganize the min-heap during its operations.
func (h minHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

// Push adds an element (x) to the min-heap.
// This method is used by the container/heap package to grow the min-heap.
func (h *minHeap) Push(x interface{}) {
*h = append(*h, x.(int64))
}

// Pop removes and returns the smallest element (minimum) from the min-heap.
// This method is used by the container/heap package to shrink the min-heap.
func (h *minHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
8 changes: 7 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type Querier struct {
ingesterQuerier *IngesterQuerier
}

const maxNodesDefault = int64(2048)

func New(cfg Config, ingestersRing ring.ReadRing, factory ring_client.PoolFactory, logger log.Logger, clientsOptions ...connect.ClientOption) (*Querier, error) {
q := &Querier{
cfg: cfg,
Expand Down Expand Up @@ -264,8 +266,12 @@ func (q *Querier) SelectMergeStacktraces(ctx context.Context, req *connect.Reque
if err != nil {
return nil, err
}
if req.Msg.MaxNodes == nil {
mn := maxNodesDefault
req.Msg.MaxNodes = &mn
}
return connect.NewResponse(&querierv1.SelectMergeStacktracesResponse{
Flamegraph: NewFlameGraph(newTree(st)),
Flamegraph: NewFlameGraph(newTree(st), *req.Msg.MaxNodes),
}), nil
}

Expand Down

0 comments on commit f90edba

Please sign in to comment.