From ae16060955e74cc90ef4bf78e2a42505d65fc7b2 Mon Sep 17 00:00:00 2001 From: Matheus Nogueira Date: Tue, 17 Sep 2024 16:46:47 -0300 Subject: [PATCH] feat: make spans fit an envelope size --- agent/workers/envelope/spans_envelope.go | 42 +++++ agent/workers/envelope/spans_envelope_test.go | 144 ++++++++++++++++++ agent/workers/poller.go | 5 +- 3 files changed, 190 insertions(+), 1 deletion(-) create mode 100644 agent/workers/envelope/spans_envelope.go create mode 100644 agent/workers/envelope/spans_envelope_test.go diff --git a/agent/workers/envelope/spans_envelope.go b/agent/workers/envelope/spans_envelope.go new file mode 100644 index 0000000000..522b66d331 --- /dev/null +++ b/agent/workers/envelope/spans_envelope.go @@ -0,0 +1,42 @@ +package envelope + +import ( + agentProto "github.com/kubeshop/tracetest/agent/proto" + "google.golang.org/protobuf/proto" +) + +// EnvelopeSpans get a list of spans and batch them into a packet that does not +// surpasses the maxPacketSize restriction. When maxPacketSize is reached, no +// more spans are added to the packet, +func EnvelopeSpans(spans []*agentProto.Span, maxPacketSize int) []*agentProto.Span { + envelope := make([]*agentProto.Span, 0, len(spans)) + currentSize := 0 + + // There's a weird scenario that must be covered here: imagine a span so big it is bigger than maxPacketSize. + // It is impossible to send a span like this, so in this case, we classify those spans as "large spans" and we allow + // `largeSpansPerPacket` per packet. + // + // It is important to ensure a limit of large spans per packet because if your whole trace is composed by + // large spans, this would mean a packet would hold the entiry trace and we don't want that to happen. + const largeSpansPerPacket = 1 + numberLargeSpansAddedToPacket := 0 + + for _, span := range spans { + spanSize := proto.Size(span) + isLargeSpan := spanSize > maxPacketSize + if currentSize+spanSize < maxPacketSize || isLargeSpan { + if isLargeSpan { + if numberLargeSpansAddedToPacket >= largeSpansPerPacket { + // there is already the limit of large spans in the packet, skip this one + continue + } + + numberLargeSpansAddedToPacket++ + } + envelope = append(envelope, span) + currentSize += spanSize + } + } + + return envelope +} diff --git a/agent/workers/envelope/spans_envelope_test.go b/agent/workers/envelope/spans_envelope_test.go new file mode 100644 index 0000000000..f7ab8bcc8d --- /dev/null +++ b/agent/workers/envelope/spans_envelope_test.go @@ -0,0 +1,144 @@ +package envelope_test + +import ( + "testing" + "time" + + "github.com/kubeshop/tracetest/agent/proto" + "github.com/kubeshop/tracetest/agent/workers/envelope" + "github.com/kubeshop/tracetest/server/pkg/id" + "github.com/stretchr/testify/require" + "gotest.tools/v3/assert" +) + +func TestSpanEnvelopeWithSmallSpans(t *testing.T) { + // each of these spans take 73 bytes + spans := []*proto.Span{ + createSpan(), + createSpan(), + } + + envelope := envelope.EnvelopeSpans(spans, 150) + // 2 * 73 < 150, so all spans should be included in the envelope + require.Len(t, envelope, 2) +} + +func TestSpanEnvelopeWithSmallSpansButOneShouldBeIgnored(t *testing.T) { + // each of these spans take 73 bytes + span1, span2, span3 := createSpan(), createSpan(), createSpan() + spans := []*proto.Span{ + span1, + span2, + span3, + } + + envelope := envelope.EnvelopeSpans(spans, 150) + // 3 * 73 > 150, but 2 spans fit the envelope, so take 2 spans instead + require.Len(t, envelope, 2) + assert.Equal(t, envelope[0].Id, span1.Id) + assert.Equal(t, envelope[1].Id, span2.Id) +} + +func TestSpanEnvelopeIncludeSmallerSpans(t *testing.T) { + // these spans take 73 bytes, 73 bytes, and 33 bytes respectively + span1, span2, span3 := createSpan(), createSpan(), createSmallSpan() + spans := []*proto.Span{ + span1, + span2, + span3, + } + + envelope := envelope.EnvelopeSpans(spans, 110) + // 73+73+33 > 110, 73+73 is also bigger than 110. But we can add 73 (span1) + 33 (span3) to fit the envelope + require.Len(t, envelope, 2) + assert.Equal(t, envelope[0].Id, span1.Id) + assert.Equal(t, envelope[1].Id, span3.Id) +} + +func TestSpanEnvelopeWithOneLargeSpan(t *testing.T) { + // a large span is 682 bytes long, in theory, it should not fit the envelope, however, + // we should allow 1 per envelope just to make sure ALL spans are sent. + spans := []*proto.Span{ + createLargeSpan(), + } + + envelope := envelope.EnvelopeSpans(spans, 110) + require.Len(t, envelope, 1) +} + +func TestSpanEnvelopeWithTwoLargeSpan(t *testing.T) { + // a large span is 682 bytes long, in theory, it should not fit the envelope, however, + // we should allow 1 per envelope just to make sure ALL spans are sent. + largeSpan1, largeSpan2 := createLargeSpan(), createLargeSpan() + spans := []*proto.Span{ + largeSpan1, + largeSpan2, + } + + envelope := envelope.EnvelopeSpans(spans, 110) + require.Len(t, envelope, 1) + assert.Equal(t, largeSpan1.Id, envelope[0].Id) +} + +func TestSpanEnvelopeWithSmallSpansAndALargeOne(t *testing.T) { + // Given a list of small spans that should be able to fit the envelope, but also a large span that doesn't fit an envelope, + // it should include the largeSpan with the 2 first spans and leave the third small span out + smallSpan1, smallSpan2, largeSpan1, smallSpan3 := createSmallSpan(), createSmallSpan(), createLargeSpan(), createSmallSpan() + spans := []*proto.Span{ + smallSpan1, + smallSpan2, + largeSpan1, + smallSpan3, + } + + envelope := envelope.EnvelopeSpans(spans, 100) + require.Len(t, envelope, 3) + assert.Equal(t, smallSpan1.Id, envelope[0].Id) + assert.Equal(t, smallSpan2.Id, envelope[1].Id) + assert.Equal(t, largeSpan1.Id, envelope[2].Id) +} + +func createSpan() *proto.Span { + return &proto.Span{ + Id: id.NewRandGenerator().SpanID().String(), + Name: "span name", + StartTime: time.Now().Unix(), + EndTime: time.Now().Add(2 * time.Second).Unix(), + Kind: "internal", + Attributes: []*proto.KeyValuePair{ + { + Key: "service.name", + Value: "core", + }, + }, + } +} + +func createSmallSpan() *proto.Span { + return &proto.Span{ + Id: id.NewRandGenerator().SpanID().String(), + Name: "s", + StartTime: time.Now().Unix(), + EndTime: time.Now().Add(2 * time.Second).Unix(), + Kind: "", + Attributes: []*proto.KeyValuePair{}, + } +} + +func createLargeSpan() *proto.Span { + loremIpsum := "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Vestibulum eu fermentum elit. Ut convallis elit nisl, et porttitor ante dignissim quis. Curabitur porttitor molestie iaculis. Suspendisse potenti. Curabitur sollicitudin finibus mollis. Nunc at tincidunt dolor. Nam eleifend ante in elit vulputate lacinia. Donec sem orci, luctus ut eros id, tincidunt elementum nulla. Nulla et nibh pharetra, pretium odio nec, posuere est. Curabitur a felis ut risus fermentum ornare vitae sed dolor. Mauris non velit at nulla ultricies mattis. " + return &proto.Span{ + Id: id.NewRandGenerator().SpanID().String(), + Name: loremIpsum, + StartTime: time.Now().Unix(), + EndTime: time.Now().Add(2 * time.Second).Unix(), + Kind: "internal", + Attributes: []*proto.KeyValuePair{ + {Key: "service.name", Value: "core"}, + {Key: "service.team", Value: "ranchers"}, + {Key: "go.version", Value: "1.22.3"}, + {Key: "go.os", Value: "Linux"}, + {Key: "go.arch", Value: "amd64"}, + }, + } +} diff --git a/agent/workers/poller.go b/agent/workers/poller.go index 7c8b18aa5c..e4a3177a75 100644 --- a/agent/workers/poller.go +++ b/agent/workers/poller.go @@ -15,6 +15,7 @@ import ( "github.com/kubeshop/tracetest/agent/telemetry" "github.com/kubeshop/tracetest/agent/tracedb" "github.com/kubeshop/tracetest/agent/tracedb/connection" + "github.com/kubeshop/tracetest/agent/workers/envelope" "github.com/kubeshop/tracetest/agent/workers/poller" "github.com/kubeshop/tracetest/server/datastore" "github.com/kubeshop/tracetest/server/executor" @@ -24,6 +25,8 @@ import ( "go.uber.org/zap" ) +const maxEnvelopeSize = 4 * 1024 * 1024 // 4MB + type PollerWorker struct { client *client.Client inmemoryDatastore tracedb.TraceDB @@ -222,7 +225,7 @@ func (w *PollerWorker) poll(ctx context.Context, request *proto.PollingRequest) w.logger.Debug("Span was already sent", zap.String("runKey", runKey)) } } - pollingResponse.Spans = newSpans + pollingResponse.Spans = envelope.EnvelopeSpans(newSpans, maxEnvelopeSize) w.logger.Debug("Filtered spans", zap.Any("pollingResponse", spew.Sdump(pollingResponse))) }