Skip to content
This repository has been archived by the owner on Oct 3, 2023. It is now read-only.

Commit

Permalink
trace: allow for concurrent uploads to Stackdriver (#246)
Browse files Browse the repository at this point in the history
* trace: allow for concurrent uploads to Stackdriver

In situations where a large number of spans need to be exported from a
single instance of the exporter, bundle uploads are limited to using a
single goroutine. This limits the overall throughput of the exporter.

Make use of the NumberOfWorkers option to allow the exporter to use
multiple, concurrent goroutines to upload spans to Stackdriver.

Closes #245.

Signed-off-by: Nick Travers <n.e.travers@gmail.com>

* Update documentation

Signed-off-by: Nick Travers <n.e.travers@gmail.com>

* Count number of exported spans

Signed-off-by: Nick Travers <n.e.travers@gmail.com>

* Fix data race in test case

Signed-off-by: Nick Travers <n.e.travers@gmail.com>
  • Loading branch information
nicktrav authored and rghetia committed Jan 8, 2020
1 parent 1cdca91 commit 626e69e
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 2 deletions.
3 changes: 1 addition & 2 deletions stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,7 @@ type Options struct {
ReportingInterval time.Duration

// NumberOfWorkers sets the number of go rountines that send requests
// to Stackdriver Monitoring. This is only used for Proto metrics export
// for now. The minimum number of workers is 1.
// to Stackdriver Monitoring and Trace. The minimum number of workers is 1.
NumberOfWorkers int

// ResourceByDescriptor may be provided to supply monitored resource dynamically
Expand Down
3 changes: 3 additions & 0 deletions trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func newTraceExporterWithClient(o Options, c *tracingclient.Client) *traceExport
} else {
b.BundleCountThreshold = 50
}
if o.NumberOfWorkers > 0 {
b.HandlerLimit = o.NumberOfWorkers
}
// The measured "bytes" are not really bytes, see exportReceiver.
b.BundleByteThreshold = b.BundleCountThreshold * 200
b.BundleByteLimit = b.BundleCountThreshold * 1000
Expand Down
67 changes: 67 additions & 0 deletions trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package stackdriver
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -63,6 +64,72 @@ func TestBundling(t *testing.T) {
}
}

func TestBundling_ConcurrentExports(t *testing.T) {
workers := 2
spansPerWorker := 10
delay := 2 * time.Second
exporter := newTraceExporterWithClient(Options{
ProjectID: "fakeProjectID",
BundleCountThreshold: spansPerWorker,
BundleDelayThreshold: delay,
NumberOfWorkers: workers,
}, nil)

wg := sync.WaitGroup{}
waitCh := make(chan struct{})
wg.Add(workers)

var exportMap sync.Map // maintain a collection of the spans exported
exporter.uploadFn = func(spans []*tracepb.Span) {
for _, s := range spans {
exportMap.Store(s.SpanId, true)
}
wg.Done()

// Don't complete the function until the WaitGroup is done.
// This ensures the semaphore limiting the concurrent uploads is not
// released by one goroutine completing before the other.
wg.Wait()
}
trace.RegisterExporter(exporter)

totalSpans := workers * spansPerWorker
var expectedSpanIDs []string
go func() {
// Release enough spans to form two bundles
for i := 0; i < totalSpans; i++ {
_, span := trace.StartSpan(context.Background(), "span", trace.WithSampler(trace.AlwaysSample()))
expectedSpanIDs = append(expectedSpanIDs, span.SpanContext().SpanID.String())
span.End()
}

// Wait for the desired concurrency before completing
wg.Wait()
close(waitCh)
}()

select {
case <-waitCh:
case <-time.After(delay / 2): // fail before a time-based flush is triggered
t.Fatal("timed out waiting for concurrent uploads")
}

// all the spans are accounted for
var exportedSpans []string
exportMap.Range(func(key, value interface{}) bool {
exportedSpans = append(exportedSpans, key.(string))
return true
})
if len(exportedSpans) != totalSpans {
t.Errorf("got %d spans, want %d", len(exportedSpans), totalSpans)
}
for _, id := range expectedSpanIDs {
if _, ok := exportMap.Load(id); !ok {
t.Errorf("want %s; missing from exported spans", id)
}
}
}

func TestNewContext_Timeout(t *testing.T) {
e := newTraceExporterWithClient(Options{
Timeout: 10 * time.Millisecond,
Expand Down

0 comments on commit 626e69e

Please sign in to comment.