-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
worker.go
81 lines (68 loc) · 2.42 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package traces // import "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/traces"
import (
"context"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
type worker struct {
running *atomic.Bool // pointer to shared flag that indicates it's time to stop the test
numTraces int // how many traces the worker has to generate (only when duration==0)
propagateContext bool // whether the worker needs to propagate the trace context via HTTP headers
totalDuration time.Duration // how long to run the test for (overrides `numTraces`)
limitPerSecond rate.Limit // how many spans per second to generate
wg *sync.WaitGroup // notify when done
logger *zap.Logger
}
const (
fakeIP string = "1.2.3.4"
fakeSpanDuration = 123 * time.Microsecond
)
func (w worker) simulateTraces() {
tracer := otel.Tracer("telemetrygen")
limiter := rate.NewLimiter(w.limitPerSecond, 1)
var i int
for w.running.Load() {
ctx, sp := tracer.Start(context.Background(), "lets-go", trace.WithAttributes(
attribute.String("span.kind", "client"), // is there a semantic convention for this?
semconv.NetPeerIPKey.String(fakeIP),
semconv.PeerServiceKey.String("telemetrygen-server"),
))
childCtx := ctx
if w.propagateContext {
header := propagation.HeaderCarrier{}
// simulates going remote
otel.GetTextMapPropagator().Inject(childCtx, header)
// simulates getting a request from a client
childCtx = otel.GetTextMapPropagator().Extract(childCtx, header)
}
_, child := tracer.Start(childCtx, "okey-dokey", trace.WithAttributes(
attribute.String("span.kind", "server"),
semconv.NetPeerIPKey.String(fakeIP),
semconv.PeerServiceKey.String("telemetrygen-client"),
))
if err := limiter.Wait(context.Background()); err != nil {
w.logger.Fatal("limiter waited failed, retry", zap.Error(err))
}
opt := trace.WithTimestamp(time.Now().Add(fakeSpanDuration))
child.End(opt)
sp.End(opt)
i++
if w.numTraces != 0 {
if i >= w.numTraces {
break
}
}
}
w.logger.Info("traces generated", zap.Int("traces", i))
w.wg.Done()
}