Skip to content

Commit

Permalink
fix: HTTPTransport.Flush panic and data race (#140)
Browse files Browse the repository at this point in the history
* test: HTTPTransport

The ConcurrentSendAndFlush test reveals a data race in the old
HTTPTransport implementation.

* fix: HTTPTransport.Flush panic and data race

Rewrite HTTPTransport internals to remove a data race and occasional
panics.

Prior to this, HTTPTransport used a `sync.WaitGroup` to track how many
in-flight requests existed, and `Flush` waited until the observed number
of in-flight requests reached zero.

Unsynchronized access to the WaitGroup lead to panics (reuse before
wg.Wait returns) and data races (undefined order of wg.Add and wg.Wait
calls).

The new implementation changes the `Flush` behavior to wait until the
current in-flight requests are processed, inline with other SDKs and the
Unified API.

Fixes #103.
  • Loading branch information
rhcarvalho authored Jan 22, 2020
1 parent 03949d5 commit 0a07f35
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 33 deletions.
135 changes: 102 additions & 33 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,25 @@ func getRequestBodyFromEvent(event *Event) []byte {
// HTTPTransport
// ================================

// A batch groups items that are processed sequentially.
type batch struct {
items chan *http.Request
started chan struct{} // closed to signal items started to be worked on
done chan struct{} // closed to signal completion of all items
}

// HTTPTransport is a default implementation of `Transport` interface used by `Client`.
type HTTPTransport struct {
dsn *Dsn
client *http.Client
transport *http.Transport

buffer chan *http.Request
// buffer is a channel of batches. Calling Flush terminates work on the
// current in-flight items and starts a new batch for subsequent events.
buffer chan batch

disabledUntil time.Time

wg sync.WaitGroup
start sync.Once

// Size of the transport buffer. Defaults to 30.
Expand All @@ -130,9 +139,17 @@ func (t *HTTPTransport) Configure(options ClientOptions) {
Logger.Printf("%v\n", err)
return
}

t.dsn = dsn
t.buffer = make(chan *http.Request, t.BufferSize)

// A buffered channel with capacity 1 works like a mutex, ensuring only one
// goroutine can access the current batch at a given time. Access is
// synchronized by reading from and writing to the channel.
t.buffer = make(chan batch, 1)
t.buffer <- batch{
items: make(chan *http.Request, t.BufferSize),
started: make(chan struct{}),
done: make(chan struct{}),
}

if options.HTTPTransport != nil {
t.transport = options.HTTPTransport
Expand Down Expand Up @@ -178,10 +195,21 @@ func (t *HTTPTransport) SendEvent(event *Event) {
request.Header.Set(headerKey, headerValue)
}

t.wg.Add(1)
// <-t.buffer is equivalent to acquiring a lock to access the current batch.
// A few lines below, t.buffer <- b releases the lock.
//
// The lock must be held during the select block below to guarantee that
// b.items is not closed while trying to send to it. Remember that sending
// on a closed channel panics.
//
// Note that the select block takes a bounded amount of CPU time because of
// the default case that is executed if sending on b.items would block. That
// is, the event is dropped if it cannot be sent immediately to the b.items
// channel (used as a queue).
b := <-t.buffer

select {
case t.buffer <- request:
case b.items <- request:
Logger.Printf(
"Sending %s event [%s] to %s project: %d\n",
event.Level,
Expand All @@ -190,51 +218,92 @@ func (t *HTTPTransport) SendEvent(event *Event) {
t.dsn.projectID,
)
default:
t.wg.Done()
Logger.Println("Event dropped due to transport buffer being full.")
// worker would block, drop the packet
}

t.buffer <- b
}

// Flush notifies when all the buffered events have been sent by returning `true`
// or `false` if timeout was reached.
func (t *HTTPTransport) Flush(timeout time.Duration) bool {
c := make(chan struct{})
toolate := time.After(timeout)

// Wait until processing the current batch has started or the timeout.
//
// We must wait until the worker has seen the current batch, because it is
// the only way b.done will be closed. If we do not wait, there is a
// possible execution flow in which b.done is never closed, and the only way
// out of Flush would be waiting for the timeout, which is undesired.
var b batch
for {
select {
case b = <-t.buffer:
select {
case <-b.started:
goto started
default:
t.buffer <- b
}
case <-toolate:
goto fail
}
}

go func() {
t.wg.Wait()
close(c)
}()
started:
// Signal that there won't be any more items in this batch, so that the
// worker inner loop can end.
close(b.items)
// Start a new batch for subsequent events.
t.buffer <- batch{
items: make(chan *http.Request, t.BufferSize),
started: make(chan struct{}),
done: make(chan struct{}),
}

// Wait until the current batch is done or the timeout.
select {
case <-c:
case <-b.done:
Logger.Println("Buffer flushed successfully.")
return true
case <-time.After(timeout):
Logger.Println("Buffer flushing reached the timeout.")
return false
case <-toolate:
goto fail
}

fail:
Logger.Println("Buffer flushing reached the timeout.")
return false
}

func (t *HTTPTransport) worker() {
for request := range t.buffer {
if time.Now().Before(t.disabledUntil) {
t.wg.Done()
continue
}

response, err := t.client.Do(request)

if err != nil {
Logger.Printf("There was an issue with sending an event: %v", err)
}

if response != nil && response.StatusCode == http.StatusTooManyRequests {
t.disabledUntil = time.Now().Add(retryAfter(time.Now(), response))
Logger.Printf("Too many requests, backing off till: %s\n", t.disabledUntil)
for b := range t.buffer {
// Signal that processing of the current batch has started.
close(b.started)

// Return the batch to the buffer so that other goroutines can use it.
// Equivalent to releasing a lock.
t.buffer <- b

// Process all batch items.
for request := range b.items {
if time.Now().Before(t.disabledUntil) {
continue
}

response, err := t.client.Do(request)

if err != nil {
Logger.Printf("There was an issue with sending an event: %v", err)
}

if response != nil && response.StatusCode == http.StatusTooManyRequests {
t.disabledUntil = time.Now().Add(retryAfter(time.Now(), response))
Logger.Printf("Too many requests, backing off till: %s\n", t.disabledUntil)
}
}

t.wg.Done()
// Signal that processing of the batch is done.
close(b.done)
}
}

Expand Down
138 changes: 138 additions & 0 deletions transport_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package sentry

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"sync"
"sync/atomic"
"testing"
"time"
)
Expand Down Expand Up @@ -152,3 +157,136 @@ func TestRetryAfterDateHeader(t *testing.T) {
}
assertEqual(t, retryAfter(now, &r), time.Second*13)
}

// A testHTTPServer counts events sent to it. It requires a call to Unblock
// before incrementing its internal counter and sending a response to the HTTP
// client. This allows for coordinating the execution flow when needed.
type testHTTPServer struct {
*httptest.Server
// eventCounter counts the number of events processed by the server.
eventCounter *uint64
// ch is used to block/unblock the server on demand.
ch chan bool
}

func newTestHTTPServer(t *testing.T) *testHTTPServer {
ch := make(chan bool)
eventCounter := new(uint64)
handler := func(w http.ResponseWriter, r *http.Request) {
var event struct {
EventID string `json:"event_id"`
}
dec := json.NewDecoder(r.Body)
err := dec.Decode(&event)
if err != nil {
t.Fatal(err)
}
// Block until signal to continue.
<-ch
count := atomic.AddUint64(eventCounter, 1)
t.Logf("[SERVER] {%.4s} event received (#%d)", event.EventID, count)
}
return &testHTTPServer{
Server: httptest.NewTLSServer(http.HandlerFunc(handler)),
eventCounter: eventCounter,
ch: ch,
}
}

func (ts *testHTTPServer) EventCount() uint64 {
return atomic.LoadUint64(ts.eventCounter)
}

func (ts *testHTTPServer) Unblock() {
ts.ch <- true
}

func TestHTTPTransport(t *testing.T) {
server := newTestHTTPServer(t)
defer server.Close()

transport := NewHTTPTransport()
transport.Configure(ClientOptions{
Dsn: fmt.Sprintf("https://test@%s/1", server.Listener.Addr()),
HTTPClient: server.Client(),
})

// Helpers

transportSendTestEvent := func(t *testing.T) (id string) {
t.Helper()

e := NewEvent()
id = uuid()
e.EventID = EventID(id)

transport.SendEvent(e)
t.Logf("[CLIENT] {%.4s} event sent", e.EventID)
return id
}

transportMustFlush := func(t *testing.T, id string) {
t.Helper()

ok := transport.Flush(100 * time.Millisecond)
if !ok {
t.Fatalf("[CLIENT] {%.4s} Flush() timed out", id)
}
}

serverEventCountMustBe := func(t *testing.T, n uint64) {
t.Helper()

count := server.EventCount()
if count != n {
t.Fatalf("[SERVER] event count = %d, want %d", count, n)
}
}

// Actual tests

testSendSingleEvent := func(t *testing.T) {
// Sending a single event should increase the server event count by
// exactly one.

initialCount := server.EventCount()
id := transportSendTestEvent(t)

// Server is blocked waiting for us, right now count must not have
// changed yet.
serverEventCountMustBe(t, initialCount)

// After unblocking the server, Flush must guarantee that the server
// event count increased by one.
server.Unblock()
transportMustFlush(t, id)
serverEventCountMustBe(t, initialCount+1)
}
t.Run("SendSingleEvent", testSendSingleEvent)

t.Run("FlushMultipleTimes", func(t *testing.T) {
// Flushing multiple times should not increase the server event count.

initialCount := server.EventCount()
for i := 0; i < 10; i++ {
transportMustFlush(t, fmt.Sprintf("loop%d", i))
}
serverEventCountMustBe(t, initialCount)
})

t.Run("ConcurrentSendAndFlush", func(t *testing.T) {
// It should be safe to send events and flush concurrently.

var wg sync.WaitGroup
wg.Add(2)
go func() {
testSendSingleEvent(t)
wg.Done()
}()
go func() {
transportMustFlush(t, "from goroutine")
wg.Done()
}()
wg.Wait()
})
}

0 comments on commit 0a07f35

Please sign in to comment.