Skip to content

Commit

Permalink
[coordinator] Continue write request when context is cancelled (#2682)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored Oct 2, 2020
1 parent 526da79 commit a700d56
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 13 deletions.
29 changes: 20 additions & 9 deletions src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ import (
"go.uber.org/zap/zapcore"
)

const (
minWriteWaitTimeout = time.Second
)

var (
errUnaggregatedAndAggregatedDisabled = goerrors.New("fetch options has both" +
" aggregated and unaggregated namespace lookup disabled")
Expand Down Expand Up @@ -615,13 +619,6 @@ func (s *m3storage) Write(
ctx context.Context,
query *storage.WriteQuery,
) error {
// Check if the query was interrupted.
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if query == nil {
return errors.ErrNilWriteQuery
}
Expand Down Expand Up @@ -655,14 +652,28 @@ func (s *m3storage) Write(
// capture var
datapoint := datapoint
wg.Add(1)
s.opts.WriteWorkerPool().Go(func() {

var (
now = time.Now()
deadline, deadlineExists = ctx.Deadline()
timeout = minWriteWaitTimeout
)
if deadlineExists {
if remain := deadline.Sub(now); remain >= timeout {
timeout = remain
}
}
spawned := s.opts.WriteWorkerPool().GoWithTimeout(func() {
if err := s.writeSingle(ctx, query, datapoint, id, tagIter); err != nil {
multiErr.add(err)
}

tagIter.Close()
wg.Done()
})
}, timeout)
if !spawned {
multiErr.add(fmt.Errorf("timeout exceeded waiting: %v", timeout))
}
}

wg.Wait()
Expand Down
68 changes: 68 additions & 0 deletions src/query/storage/m3/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,74 @@ func TestLocalReadExceedsRetention(t *testing.T) {
assertFetchResult(t, results, testTag)
}

// TestLocalWriteWithExpiredContext ensures that writes are at least attempted
// even with an expired context, this is so that data is not lost even if
// the original writer has already disconnected.
func TestLocalWriteWithExpiredContext(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
store := setupLocalWrite(t, ctrl)
writeQuery := newWriteQuery(t)

past := time.Now().Add(-time.Minute)

ctx, cancel := context.WithDeadline(context.Background(), past)
defer cancel()

// Ensure expired.
var expired bool
select {
case <-ctx.Done():
expired = true
default:
}
require.True(t, expired, "context expected to be expired")

err := store.Write(ctx, writeQuery)
assert.NoError(t, err)
assert.NoError(t, store.Close())
}

// TestLocalWritesWithExpiredContext ensures that writes are at least attempted
// even with an expired context, this is so that data is not lost even if
// the original writer has already disconnected.
func TestLocalWritesWithExpiredContext(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
store := setupLocalWrite(t, ctrl)
writeQueryOpts := newWriteQuery(t).Options()
writeQueryOpts.Datapoints = ts.Datapoints{
ts.Datapoint{
Timestamp: time.Now(),
Value: 42,
},
ts.Datapoint{
Timestamp: time.Now(),
Value: 84,
},
}
writeQuery, err := storage.NewWriteQuery(writeQueryOpts)
require.NoError(t, err)

past := time.Now().Add(-time.Minute)

ctx, cancel := context.WithDeadline(context.Background(), past)
defer cancel()

// Ensure expired.
var expired bool
select {
case <-ctx.Done():
expired = true
default:
}
require.True(t, expired, "context expected to be expired")

err = store.Write(ctx, writeQuery)
assert.NoError(t, err)
assert.NoError(t, store.Close())
}

func buildFetchOpts() *storage.FetchOptions {
opts := storage.NewFetchOptions()
opts.SeriesLimit = 100
Expand Down
34 changes: 32 additions & 2 deletions src/x/sync/pooled_worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math"
"sync"
"sync/atomic"
"time"

"github.com/MichaelTJones/pcg"
"github.com/uber-go/tally"
Expand Down Expand Up @@ -86,6 +87,14 @@ func (p *pooledWorkerPool) Init() {
}

func (p *pooledWorkerPool) Go(work Work) {
p.goWithTimeout(work, 0)
}

func (p *pooledWorkerPool) GoWithTimeout(work Work, timeout time.Duration) bool {
return p.goWithTimeout(work, timeout)
}

func (p *pooledWorkerPool) goWithTimeout(work Work, timeout time.Duration) bool {
var (
// Use time.Now() to avoid excessive synchronization
currTime = p.nowFn().UnixNano()
Expand All @@ -99,8 +108,28 @@ func (p *pooledWorkerPool) Go(work Work) {
}

if !p.growOnDemand {
workCh <- work
return
if timeout <= 0 {
workCh <- work
return true
}

// Attempt to try writing without allocating a ticker.
select {
case workCh <- work:
return true
default:
}

// Now allocate a ticker and attempt a write.
ticker := time.NewTicker(timeout)
defer ticker.Stop()

select {
case workCh <- work:
return true
case <-ticker.C:
return false
}
}

select {
Expand All @@ -119,6 +148,7 @@ func (p *pooledWorkerPool) Go(work Work) {
// before killing themselves.
p.spawnWorker(uint64(currTime), work, workCh, false)
}
return true
}

func (p *pooledWorkerPool) spawnWorker(
Expand Down
33 changes: 33 additions & 0 deletions src/x/sync/pooled_worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -48,6 +49,38 @@ func TestPooledWorkerPoolGo(t *testing.T) {
require.Equal(t, uint32(testWorkerPoolSize*2), count)
}

func TestPooledWorkerPoolGoWithTimeout(t *testing.T) {
var (
workers = 2
)

p, err := NewPooledWorkerPool(workers, NewPooledWorkerPoolOptions())
require.NoError(t, err)
p.Init()

var (
resultsTrue = 0
resultsFalse = 0
wg sync.WaitGroup
)
wg.Add(1)
for i := 0; i < workers*2; i++ {
result := p.GoWithTimeout(func() {
wg.Wait()
}, 100*time.Millisecond)
if result {
resultsTrue++
} else {
resultsFalse++
}
}

wg.Done()

require.Equal(t, workers, resultsTrue)
require.Equal(t, workers, resultsFalse)
}

func TestPooledWorkerPoolGrowOnDemand(t *testing.T) {
var count uint32

Expand Down
7 changes: 6 additions & 1 deletion src/x/sync/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ type PooledWorkerPool interface {
// size when the workload exceeds its capacity and shrink back down to its
// original size if/when the burst subsides.
Go(work Work)

// GoWithTimeout waits up to the given timeout for a worker to become
// available, returning true if a worker becomes available, or false
// otherwise.
GoWithTimeout(work Work, timeout time.Duration) bool
}

// WorkerPool provides a pool for goroutines.
Expand All @@ -75,7 +80,7 @@ type WorkerPool interface {

// GoWithTimeout waits up to the given timeout for a worker to become
// available, returning true if a worker becomes available, or false
// otherwise
// otherwise.
GoWithTimeout(work Work, timeout time.Duration) bool
}

Expand Down
17 changes: 16 additions & 1 deletion src/x/sync/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,29 @@ func (p *workerPool) GoIfAvailable(work Work) bool {
}

func (p *workerPool) GoWithTimeout(work Work, timeout time.Duration) bool {
// Attempt to try writing without allocating a ticker.
select {
case token := <-p.workCh:
go func() {
work()
p.workCh <- token
}()
return true
case <-time.After(timeout):
default:
}

// Now allocate a ticker and attempt a write.
ticker := time.NewTicker(timeout)
defer ticker.Stop()

select {
case token := <-p.workCh:
go func() {
work()
p.workCh <- token
}()
return true
case <-ticker.C:
return false
}
}

0 comments on commit a700d56

Please sign in to comment.