Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
{cmd/trace-agent,writer}: fix race conditions (#495)
Browse files Browse the repository at this point in the history
* cmd/trace-agent: fix race in sampler stats
* writer: fix race in shutdown mechanism
  • Loading branch information
gbbr authored Oct 16, 2018
1 parent b73dda1 commit 24d4c76
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 42 deletions.
19 changes: 9 additions & 10 deletions cmd/trace-agent/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"reflect"
"sync/atomic"
"time"

log "github.com/cihub/seelog"
Expand All @@ -16,9 +17,10 @@ import (
// Sampler chooses wich spans to write to the API
type Sampler struct {
// For stats
keptTraceCount int
totalTraceCount int
lastFlush time.Time
keptTraceCount uint64
totalTraceCount uint64

lastFlush time.Time

// actual implementation of the sampling logic
engine sampler.Engine
Expand Down Expand Up @@ -62,10 +64,10 @@ func (s *Sampler) Run() {

// Add samples a trace and returns true if trace was sampled (should be kept), false otherwise
func (s *Sampler) Add(t processedTrace) bool {
s.totalTraceCount++
atomic.AddUint64(&s.totalTraceCount, 1)

if s.engine.Sample(t.Trace, t.Root, t.Env) {
s.keptTraceCount++
atomic.AddUint64(&s.keptTraceCount, 1)
return true
}

Expand All @@ -79,12 +81,9 @@ func (s *Sampler) Stop() {

// logStats reports statistics and update the info exposed.
func (s *Sampler) logStats() {

for now := range time.Tick(10 * time.Second) {
keptTraceCount := s.keptTraceCount
totalTraceCount := s.totalTraceCount
s.keptTraceCount = 0
s.totalTraceCount = 0
keptTraceCount := atomic.SwapUint64(&s.keptTraceCount, 0)
totalTraceCount := atomic.SwapUint64(&s.totalTraceCount, 0)

duration := now.Sub(s.lastFlush)
s.lastFlush = now
Expand Down
13 changes: 5 additions & 8 deletions writer/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ func (c *testPayloadSender) Start() {

// Run executes the core loop of this sender.
func (c *testPayloadSender) Run() {
c.exitWG.Add(1)
defer c.exitWG.Done()
defer close(c.exit)

for {
select {
Expand Down Expand Up @@ -140,8 +139,7 @@ type testPayloadSenderMonitor struct {

sender PayloadSender

exit chan struct{}
exitWG sync.WaitGroup
exit chan struct{}
}

// newTestPayloadSenderMonitor creates a new testPayloadSenderMonitor monitoring the specified sender.
Expand All @@ -159,8 +157,7 @@ func (m *testPayloadSenderMonitor) Start() {

// Run executes the core loop of this monitor.
func (m *testPayloadSenderMonitor) Run() {
m.exitWG.Add(1)
defer m.exitWG.Done()
defer close(m.exit)

for {
select {
Expand All @@ -187,8 +184,8 @@ func (m *testPayloadSenderMonitor) Run() {

// Stop stops this payload monitor and waits for it to stop.
func (m *testPayloadSenderMonitor) Stop() {
close(m.exit)
m.exitWG.Wait()
m.exit <- struct{}{}
<-m.exit
}

// SuccessPayloads returns a slice containing all successful payloads.
Expand Down
11 changes: 4 additions & 7 deletions writer/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package writer
import (
"container/list"
"fmt"
"sync"
"time"

log "github.com/cihub/seelog"
Expand Down Expand Up @@ -73,8 +72,7 @@ type BasePayloadSender struct {
monitor chan interface{}
endpoint Endpoint

exit chan struct{}
exitWG sync.WaitGroup
exit chan struct{}
}

// NewBasePayloadSender creates a new instance of a BasePayloadSender using the provided endpoint.
Expand All @@ -94,8 +92,8 @@ func (s *BasePayloadSender) Send(payload *Payload) {

// Stop asks this sender to stop and waits until it correctly stops.
func (s *BasePayloadSender) Stop() {
close(s.exit)
s.exitWG.Wait()
s.exit <- struct{}{}
<-s.exit
close(s.in)
close(s.monitor)
}
Expand Down Expand Up @@ -192,8 +190,7 @@ func (s *QueuablePayloadSender) Start() {

// Run executes the QueuablePayloadSender main logic synchronously.
func (s *QueuablePayloadSender) Run() {
s.exitWG.Add(1)
defer s.exitWG.Done()
defer close(s.exit)

for {
select {
Expand Down
7 changes: 3 additions & 4 deletions writer/service_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ func (w *ServiceWriter) Start() {
// Run runs the main loop of the writer goroutine. If buffers
// services read from input chan and flushes them when necessary.
func (w *ServiceWriter) Run() {
w.exitWG.Add(1)
defer w.exitWG.Done()
defer close(w.exit)

// for now, simply flush every x seconds
flushTicker := time.NewTicker(w.conf.FlushPeriod)
Expand Down Expand Up @@ -112,8 +111,8 @@ func (w *ServiceWriter) Run() {

// Stop stops the main Run loop.
func (w *ServiceWriter) Stop() {
close(w.exit)
w.exitWG.Wait()
w.exit <- struct{}{}
<-w.exit
w.BaseWriter.Stop()
}

Expand Down
7 changes: 3 additions & 4 deletions writer/stats_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func (w *StatsWriter) Start() {
// Run runs the event loop of the writer's main goroutine. It reads stat buckets
// from InStats, builds stat payloads and sends them out using the base writer.
func (w *StatsWriter) Run() {
w.exitWG.Add(1)
defer w.exitWG.Done()
defer close(w.exit)

log.Debug("starting stats writer")

Expand All @@ -89,8 +88,8 @@ func (w *StatsWriter) Run() {

// Stop stops the writer
func (w *StatsWriter) Stop() {
close(w.exit)
w.exitWG.Wait()
w.exit <- struct{}{}
<-w.exit

// Closing the base writer, among other things, will close the
// w.payloadSender.Monitor() channel, stoping the monitoring
Expand Down
7 changes: 3 additions & 4 deletions writer/trace_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ func (w *TraceWriter) Start() {
// Run runs the main loop of the writer goroutine. It sends traces to the payload constructor, flushing it periodically
// and collects stats which are also reported periodically.
func (w *TraceWriter) Run() {
w.exitWG.Add(1)
defer w.exitWG.Done()
defer close(w.exit)

// for now, simply flush every x seconds
flushTicker := time.NewTicker(w.conf.FlushPeriod)
Expand Down Expand Up @@ -137,8 +136,8 @@ func (w *TraceWriter) Run() {

// Stop stops the main Run loop.
func (w *TraceWriter) Stop() {
close(w.exit)
w.exitWG.Wait()
w.exit <- struct{}{}
<-w.exit
w.BaseWriter.Stop()
}

Expand Down
6 changes: 1 addition & 5 deletions writer/writer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package writer

import (
"sync"

"github.com/DataDog/datadog-trace-agent/statsd"
log "github.com/cihub/seelog"

Expand All @@ -15,8 +13,7 @@ type BaseWriter struct {

statsClient statsd.StatsClient

exit chan struct{}
exitWG *sync.WaitGroup
exit chan struct{}
}

// NewBaseWriter creates a new instance of a BaseWriter.
Expand All @@ -35,7 +32,6 @@ func NewBaseWriter(conf *config.AgentConfig, path string, senderFactory func(End
payloadSender: senderFactory(endpoint),
statsClient: statsd.Client,
exit: make(chan struct{}),
exitWG: &sync.WaitGroup{},
}
}

Expand Down

0 comments on commit 24d4c76

Please sign in to comment.