Skip to content

Commit

Permalink
workerpool: remove mergeContext (#2201) (#2486)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 12, 2021
1 parent 151fa39 commit 6bd4d95
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 361 deletions.
114 changes: 0 additions & 114 deletions pkg/workerpool/context.go

This file was deleted.

237 changes: 0 additions & 237 deletions pkg/workerpool/context_test.go

This file was deleted.

2 changes: 2 additions & 0 deletions pkg/workerpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ type EventHandle interface {
// AddEvent adds an `event` object to the internal queue, so that the `f` used to register the handle can be called.
// Note: events are always processed in the order they are added.
// Unregistering the EventHandle MAY CAUSE EVENT LOSSES. But for an event lost, any event after it is guaranteed to be lost too.
// Cancelling `ctx` here will cancel the on-going or next execution of the event.
AddEvent(ctx context.Context, event interface{}) error

// SetTimer is used to provide a function that is periodic called, as long as the EventHandle has not been unregistered.
// The current implementation uses as the base clock source a ticker whose interval is the const workerPoolDefaultClockSourceInterval.
// DO NOT set an interval less than workerPoolDefaultClockSourceInterval.
// Cancelling `ctx` here will cancel the on-going or next execution of `f`.
SetTimer(ctx context.Context, interval time.Duration, f func(ctx context.Context) error) EventHandle

// Unregister removes the EventHandle from the WorkerPool.
Expand Down
12 changes: 2 additions & 10 deletions pkg/workerpool/pool_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,7 @@ func (h *defaultEventHandle) AddEvent(ctx context.Context, event interface{}) er
task := &task{
handle: h,
f: func(ctx1 context.Context) error {
// Here we merge the context passed down from WorkerPool.Run,
// with the context supplied by AddEvent,
// because we want operations to be cancellable by both contexts.
mContext, cancel := MergeContexts(ctx, ctx1)
// this cancels the merged context only.
defer cancel()
return h.f(mContext, event)
return h.f(ctx, event)
},
}

Expand All @@ -154,9 +148,7 @@ func (h *defaultEventHandle) SetTimer(ctx context.Context, interval time.Duratio

h.timerInterval = interval
h.timerHandler = func(ctx1 context.Context) error {
mContext, cancel := MergeContexts(ctx, ctx1)
defer cancel()
return f(mContext)
return f(ctx)
}
// mark the timer handler function as valid
atomic.StoreInt32(&h.hasTimer, 1)
Expand Down
Loading

0 comments on commit 6bd4d95

Please sign in to comment.