Skip to content

Commit

Permalink
feat: make spans fit an envelope size
Browse files Browse the repository at this point in the history
  • Loading branch information
mathnogueira committed Sep 17, 2024
1 parent 32519a3 commit ae16060
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 1 deletion.
42 changes: 42 additions & 0 deletions agent/workers/envelope/spans_envelope.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package envelope

import (
agentProto "github.com/kubeshop/tracetest/agent/proto"
"google.golang.org/protobuf/proto"
)

// EnvelopeSpans get a list of spans and batch them into a packet that does not
// surpasses the maxPacketSize restriction. When maxPacketSize is reached, no
// more spans are added to the packet,
func EnvelopeSpans(spans []*agentProto.Span, maxPacketSize int) []*agentProto.Span {
envelope := make([]*agentProto.Span, 0, len(spans))
currentSize := 0

// There's a weird scenario that must be covered here: imagine a span so big it is bigger than maxPacketSize.
// It is impossible to send a span like this, so in this case, we classify those spans as "large spans" and we allow
// `largeSpansPerPacket` per packet.
//
// It is important to ensure a limit of large spans per packet because if your whole trace is composed by
// large spans, this would mean a packet would hold the entiry trace and we don't want that to happen.
const largeSpansPerPacket = 1
numberLargeSpansAddedToPacket := 0

for _, span := range spans {
spanSize := proto.Size(span)
isLargeSpan := spanSize > maxPacketSize
if currentSize+spanSize < maxPacketSize || isLargeSpan {
if isLargeSpan {
if numberLargeSpansAddedToPacket >= largeSpansPerPacket {
// there is already the limit of large spans in the packet, skip this one
continue
}

numberLargeSpansAddedToPacket++
}
envelope = append(envelope, span)
currentSize += spanSize
}
}

return envelope
}
144 changes: 144 additions & 0 deletions agent/workers/envelope/spans_envelope_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package envelope_test

import (
"testing"
"time"

"github.com/kubeshop/tracetest/agent/proto"
"github.com/kubeshop/tracetest/agent/workers/envelope"
"github.com/kubeshop/tracetest/server/pkg/id"
"github.com/stretchr/testify/require"
"gotest.tools/v3/assert"
)

func TestSpanEnvelopeWithSmallSpans(t *testing.T) {
// each of these spans take 73 bytes
spans := []*proto.Span{
createSpan(),
createSpan(),
}

envelope := envelope.EnvelopeSpans(spans, 150)
// 2 * 73 < 150, so all spans should be included in the envelope
require.Len(t, envelope, 2)
}

func TestSpanEnvelopeWithSmallSpansButOneShouldBeIgnored(t *testing.T) {
// each of these spans take 73 bytes
span1, span2, span3 := createSpan(), createSpan(), createSpan()
spans := []*proto.Span{
span1,
span2,
span3,
}

envelope := envelope.EnvelopeSpans(spans, 150)
// 3 * 73 > 150, but 2 spans fit the envelope, so take 2 spans instead
require.Len(t, envelope, 2)
assert.Equal(t, envelope[0].Id, span1.Id)
assert.Equal(t, envelope[1].Id, span2.Id)
}

func TestSpanEnvelopeIncludeSmallerSpans(t *testing.T) {
// these spans take 73 bytes, 73 bytes, and 33 bytes respectively
span1, span2, span3 := createSpan(), createSpan(), createSmallSpan()
spans := []*proto.Span{
span1,
span2,
span3,
}

envelope := envelope.EnvelopeSpans(spans, 110)
// 73+73+33 > 110, 73+73 is also bigger than 110. But we can add 73 (span1) + 33 (span3) to fit the envelope
require.Len(t, envelope, 2)
assert.Equal(t, envelope[0].Id, span1.Id)
assert.Equal(t, envelope[1].Id, span3.Id)
}

func TestSpanEnvelopeWithOneLargeSpan(t *testing.T) {
// a large span is 682 bytes long, in theory, it should not fit the envelope, however,
// we should allow 1 per envelope just to make sure ALL spans are sent.
spans := []*proto.Span{
createLargeSpan(),
}

envelope := envelope.EnvelopeSpans(spans, 110)
require.Len(t, envelope, 1)
}

func TestSpanEnvelopeWithTwoLargeSpan(t *testing.T) {
// a large span is 682 bytes long, in theory, it should not fit the envelope, however,
// we should allow 1 per envelope just to make sure ALL spans are sent.
largeSpan1, largeSpan2 := createLargeSpan(), createLargeSpan()
spans := []*proto.Span{
largeSpan1,
largeSpan2,
}

envelope := envelope.EnvelopeSpans(spans, 110)
require.Len(t, envelope, 1)
assert.Equal(t, largeSpan1.Id, envelope[0].Id)
}

func TestSpanEnvelopeWithSmallSpansAndALargeOne(t *testing.T) {
// Given a list of small spans that should be able to fit the envelope, but also a large span that doesn't fit an envelope,
// it should include the largeSpan with the 2 first spans and leave the third small span out
smallSpan1, smallSpan2, largeSpan1, smallSpan3 := createSmallSpan(), createSmallSpan(), createLargeSpan(), createSmallSpan()
spans := []*proto.Span{
smallSpan1,
smallSpan2,
largeSpan1,
smallSpan3,
}

envelope := envelope.EnvelopeSpans(spans, 100)
require.Len(t, envelope, 3)
assert.Equal(t, smallSpan1.Id, envelope[0].Id)
assert.Equal(t, smallSpan2.Id, envelope[1].Id)
assert.Equal(t, largeSpan1.Id, envelope[2].Id)
}

func createSpan() *proto.Span {
return &proto.Span{
Id: id.NewRandGenerator().SpanID().String(),
Name: "span name",
StartTime: time.Now().Unix(),
EndTime: time.Now().Add(2 * time.Second).Unix(),
Kind: "internal",
Attributes: []*proto.KeyValuePair{
{
Key: "service.name",
Value: "core",
},
},
}
}

func createSmallSpan() *proto.Span {
return &proto.Span{
Id: id.NewRandGenerator().SpanID().String(),
Name: "s",
StartTime: time.Now().Unix(),
EndTime: time.Now().Add(2 * time.Second).Unix(),
Kind: "",
Attributes: []*proto.KeyValuePair{},
}
}

func createLargeSpan() *proto.Span {
loremIpsum := "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Vestibulum eu fermentum elit. Ut convallis elit nisl, et porttitor ante dignissim quis. Curabitur porttitor molestie iaculis. Suspendisse potenti. Curabitur sollicitudin finibus mollis. Nunc at tincidunt dolor. Nam eleifend ante in elit vulputate lacinia. Donec sem orci, luctus ut eros id, tincidunt elementum nulla. Nulla et nibh pharetra, pretium odio nec, posuere est. Curabitur a felis ut risus fermentum ornare vitae sed dolor. Mauris non velit at nulla ultricies mattis. "
return &proto.Span{
Id: id.NewRandGenerator().SpanID().String(),
Name: loremIpsum,
StartTime: time.Now().Unix(),
EndTime: time.Now().Add(2 * time.Second).Unix(),
Kind: "internal",
Attributes: []*proto.KeyValuePair{
{Key: "service.name", Value: "core"},
{Key: "service.team", Value: "ranchers"},
{Key: "go.version", Value: "1.22.3"},
{Key: "go.os", Value: "Linux"},
{Key: "go.arch", Value: "amd64"},
},
}
}
5 changes: 4 additions & 1 deletion agent/workers/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/kubeshop/tracetest/agent/telemetry"
"github.com/kubeshop/tracetest/agent/tracedb"
"github.com/kubeshop/tracetest/agent/tracedb/connection"
"github.com/kubeshop/tracetest/agent/workers/envelope"
"github.com/kubeshop/tracetest/agent/workers/poller"
"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor"
Expand All @@ -24,6 +25,8 @@ import (
"go.uber.org/zap"
)

const maxEnvelopeSize = 4 * 1024 * 1024 // 4MB

type PollerWorker struct {
client *client.Client
inmemoryDatastore tracedb.TraceDB
Expand Down Expand Up @@ -222,7 +225,7 @@ func (w *PollerWorker) poll(ctx context.Context, request *proto.PollingRequest)
w.logger.Debug("Span was already sent", zap.String("runKey", runKey))
}
}
pollingResponse.Spans = newSpans
pollingResponse.Spans = envelope.EnvelopeSpans(newSpans, maxEnvelopeSize)

w.logger.Debug("Filtered spans", zap.Any("pollingResponse", spew.Sdump(pollingResponse)))
}
Expand Down

0 comments on commit ae16060

Please sign in to comment.