Skip to content

Commit

Permalink
fix: HTTPTransport.Flush panic and data race
Browse files Browse the repository at this point in the history
Rewrite HTTPTransport internals to remove a data race and occasional
panics.

Prior to this, HTTPTransport used a `sync.WaitGroup` to track how many
in-flight requests existed, and `Flush` waited until the observed number
of in-flight requests reached zero.

Unsynchronized access to the WaitGroup lead to panics (reuse before
wg.Wait returns) and data races (undefined order of wg.Add and wg.Wait
calls).

The new implementation changes the `Flush` behavior to wait until the
current in-flight requests are processed, inline with other SDKs and the
Unified API.
  • Loading branch information
rhcarvalho committed Jan 22, 2020
1 parent c34ccc5 commit 4277850
Showing 1 changed file with 83 additions and 33 deletions.
116 changes: 83 additions & 33 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,25 @@ func getRequestBodyFromEvent(event *Event) []byte {
// HTTPTransport
// ================================

// A batch groups items that are processed sequentially.
type batch struct {
items chan *http.Request
started chan struct{} // closed to signal items started to be worked on
done chan struct{} // closed to signal completion of all items
}

// HTTPTransport is a default implementation of `Transport` interface used by `Client`.
type HTTPTransport struct {
dsn *Dsn
client *http.Client
transport *http.Transport

buffer chan *http.Request
// buffer is a channel of batches. Calling Flush terminates work on the
// current in-flight items and starts a new batch for subsequent events.
buffer chan batch

disabledUntil time.Time

wg sync.WaitGroup
start sync.Once

// Size of the transport buffer. Defaults to 30.
Expand All @@ -130,9 +139,14 @@ func (t *HTTPTransport) Configure(options ClientOptions) {
Logger.Printf("%v\n", err)
return
}

t.dsn = dsn
t.buffer = make(chan *http.Request, t.BufferSize)

t.buffer = make(chan batch, 1)
t.buffer <- batch{
items: make(chan *http.Request, t.BufferSize),
started: make(chan struct{}),
done: make(chan struct{}),
}

if options.HTTPTransport != nil {
t.transport = options.HTTPTransport
Expand Down Expand Up @@ -178,10 +192,10 @@ func (t *HTTPTransport) SendEvent(event *Event) {
request.Header.Set(headerKey, headerValue)
}

t.wg.Add(1)
b := <-t.buffer

select {
case t.buffer <- request:
case b.items <- request:
Logger.Printf(
"Sending %s event [%s] to %s project: %d\n",
event.Level,
Expand All @@ -190,51 +204,87 @@ func (t *HTTPTransport) SendEvent(event *Event) {
t.dsn.projectID,
)
default:
t.wg.Done()
Logger.Println("Event dropped due to transport buffer being full.")
// worker would block, drop the packet
}

t.buffer <- b
}

// Flush notifies when all the buffered events have been sent by returning `true`
// or `false` if timeout was reached.
func (t *HTTPTransport) Flush(timeout time.Duration) bool {
c := make(chan struct{})
toolate := time.After(timeout)

var b batch
for {
// Wait until processing the current batch has started or the timeout.
select {
case b = <-t.buffer:
select {
case <-b.started:
goto started
default:
t.buffer <- b
}
case <-toolate:
goto fail
}
}

go func() {
t.wg.Wait()
close(c)
}()
started:
// Signal that there won't be any more items in this batch, so that the
// worker inner loop can end.
close(b.items)
// Start a new batch for subsequent events.
t.buffer <- batch{
items: make(chan *http.Request, t.BufferSize),
started: make(chan struct{}),
done: make(chan struct{}),
}

// Wait until the current batch is done or the timeout.
select {
case <-c:
case <-b.done:
Logger.Println("Buffer flushed successfully.")
return true
case <-time.After(timeout):
Logger.Println("Buffer flushing reached the timeout.")
return false
case <-toolate:
goto fail
}

fail:
Logger.Println("Buffer flushing reached the timeout.")
return false
}

func (t *HTTPTransport) worker() {
for request := range t.buffer {
if time.Now().Before(t.disabledUntil) {
t.wg.Done()
continue
}

response, err := t.client.Do(request)

if err != nil {
Logger.Printf("There was an issue with sending an event: %v", err)
}

if response != nil && response.StatusCode == http.StatusTooManyRequests {
t.disabledUntil = time.Now().Add(retryAfter(time.Now(), response))
Logger.Printf("Too many requests, backing off till: %s\n", t.disabledUntil)
for b := range t.buffer {
// Signal that processing of the current batch has started.
close(b.started)

// Return the batch to the buffer so that other goroutines can use it.
// Equivalent to releasing a lock.
t.buffer <- b

// Process all batch items.
for request := range b.items {
if time.Now().Before(t.disabledUntil) {
continue
}

response, err := t.client.Do(request)

if err != nil {
Logger.Printf("There was an issue with sending an event: %v", err)
}

if response != nil && response.StatusCode == http.StatusTooManyRequests {
t.disabledUntil = time.Now().Add(retryAfter(time.Now(), response))
Logger.Printf("Too many requests, backing off till: %s\n", t.disabledUntil)
}
}

t.wg.Done()
// Signal that processing of the batch is done.
close(b.done)
}
}

Expand Down

0 comments on commit 4277850

Please sign in to comment.