diff --git a/query/worker.go b/query/worker.go index 5c2d2517..1552a7eb 100644 --- a/query/worker.go +++ b/query/worker.go @@ -2,6 +2,7 @@ package query import ( "errors" + "sync/atomic" "time" "github.com/btcsuite/btcd/wire" @@ -58,6 +59,10 @@ type jobResult struct { type worker struct { peer Peer + // quit indicates that the worker has already quit and is not accepting + // any more jobs. + quit int32 + // nextJob is a channel of queries to be distributed, where the worker // will poll new work from. nextJob chan *queryJob @@ -70,7 +75,7 @@ var _ Worker = (*worker)(nil) func NewWorker(peer Peer) Worker { return &worker{ peer: peer, - nextJob: make(chan *queryJob), + nextJob: make(chan *queryJob, maxJobs), } } @@ -88,7 +93,10 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { // Subscribe to messages from the peer. msgChan, cancel := peer.SubscribeRecvMsg() - defer cancel() + defer func() { + atomic.AddInt32(&w.quit, 1) + cancel() + }() for { log.Tracef("Worker %v waiting for more work", peer.Addr()) @@ -266,5 +274,10 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { // // NOTE: Part of the Worker interface. func (w *worker) NewJob() chan<- *queryJob { + // The worker has already quit so don't return the nextJob channel. + if atomic.LoadInt32(&w.quit) != 0 { + return nil + } + return w.nextJob } diff --git a/query/worker_test.go b/query/worker_test.go index 287ba891..fabe28d7 100644 --- a/query/worker_test.go +++ b/query/worker_test.go @@ -86,7 +86,7 @@ func makeJob() *queryJob { } type testCtx struct { - nextJob chan<- *queryJob + worker Worker jobResults chan *jobResult peer *mockPeer workerDone chan struct{} @@ -101,7 +101,7 @@ func startWorker() (*testCtx, error) { subscriptions: make(chan chan wire.Message), quit: make(chan struct{}), } - results := make(chan *jobResult) + results := make(chan *jobResult, maxJobs) quit := make(chan struct{}) wk := NewWorker(peer) @@ -123,7 +123,7 @@ func startWorker() (*testCtx, error) { peer.responses = sub return &testCtx{ - nextJob: wk.NewJob(), + worker: wk, jobResults: results, peer: peer, workerDone: done, @@ -144,7 +144,7 @@ func TestWorkerIgnoreMsgs(t *testing.T) { task := makeJob() select { - case ctx.nextJob <- task: + case ctx.worker.NewJob() <- task: case <-time.After(1 * time.Second): t.Fatalf("did not pick up job") } @@ -215,7 +215,7 @@ func TestWorkerTimeout(t *testing.T) { // Give the worker the new job. select { - case ctx.nextJob <- task: + case ctx.worker.NewJob() <- task: case <-time.After(1 * time.Second): t.Fatalf("did not pick up job") } @@ -253,7 +253,7 @@ func TestWorkerTimeout(t *testing.T) { // It will immediately attempt to fetch another task. select { - case ctx.nextJob <- task: + case ctx.worker.NewJob() <- task: case <-time.After(1 * time.Second): t.Fatalf("did not pick up job") } @@ -272,7 +272,7 @@ func TestWorkerDisconnect(t *testing.T) { // Give the worker a new job. task := makeJob() select { - case ctx.nextJob <- task: + case ctx.worker.NewJob() <- task: case <-time.After(1 * time.Second): t.Fatalf("did not pick up job") } @@ -312,7 +312,7 @@ func TestWorkerDisconnect(t *testing.T) { // No more jobs should be accepted by the worker after it has exited. select { - case ctx.nextJob <- task: + case ctx.worker.NewJob() <- task: t.Fatalf("exited worker did pick up job") default: } @@ -342,7 +342,7 @@ func TestWorkerProgress(t *testing.T) { task.timeout = taskTimeout select { - case ctx.nextJob <- task: + case ctx.worker.NewJob() <- task: case <-time.After(1 * time.Second): t.Fatalf("did not pick up job") } @@ -421,7 +421,7 @@ func TestWorkerJobCanceled(t *testing.T) { canceled := false for i := 0; i < 2; i++ { select { - case ctx.nextJob <- task: + case ctx.worker.NewJob() <- task: case <-time.After(1 * time.Second): t.Fatalf("did not pick up job") } diff --git a/query/workmanager.go b/query/workmanager.go index 9d6317fb..c616a48b 100644 --- a/query/workmanager.go +++ b/query/workmanager.go @@ -15,6 +15,9 @@ const ( // maxQueryTimeout is the maximum timeout given to a single query. maxQueryTimeout = 32 * time.Second + + // maxJobs is the maximum amount of jobs a single worker can have. + maxJobs = 10 ) var ( @@ -74,11 +77,10 @@ type PeerRanking interface { // activeWorker wraps a Worker that is currently running, together with the job // we have given to it. -// TODO(halseth): support more than one active job at a time. type activeWorker struct { - w Worker + w Worker activeJobs map[uint64]*queryJob - onExit chan struct{} + onExit chan struct{} } // Config holds the configuration options for a new WorkManager. @@ -126,8 +128,8 @@ var _ WorkManager = (*peerWorkManager)(nil) func NewWorkManager(cfg *Config) WorkManager { return &peerWorkManager{ cfg: cfg, - newBatches: make(chan *batch), - jobResults: make(chan *jobResult), + newBatches: make(chan *batch, maxJobs), + jobResults: make(chan *jobResult, maxJobs), quit: make(chan struct{}), } } @@ -220,7 +222,7 @@ Loop: for p, r := range workers { // Only one active job at a time is currently // supported. - if len(r.activeJobs) >= 1 { + if len(r.activeJobs) >= maxJobs { continue } diff --git a/query/workmanager_test.go b/query/workmanager_test.go index c88b2f18..0ff0c72d 100644 --- a/query/workmanager_test.go +++ b/query/workmanager_test.go @@ -84,8 +84,8 @@ func startWorkManager(t *testing.T, numWorkers int) (WorkManager, NewWorker: func(peer Peer) Worker { m := &mockWorker{ peer: peer, - nextJob: make(chan *queryJob), - results: make(chan *jobResult), + nextJob: make(chan *queryJob, maxJobs), + results: make(chan *jobResult, maxJobs), } workerChan <- m return m @@ -205,7 +205,7 @@ func TestWorkManagerWorkDispatcherFailures(t *testing.T) { for i := 0; i < numQueries; i++ { q := &Request{} queries[i] = q - scheduledJobs[i] = make(chan sched) + scheduledJobs[i] = make(chan sched, maxJobs) } // For each worker, spin up a goroutine that will forward the job it @@ -387,7 +387,7 @@ func TestWorkManagerCancelBatch(t *testing.T) { // TestWorkManagerWorkRankingScheduling checks that the work manager schedules // jobs among workers according to the peer ranking. func TestWorkManagerWorkRankingScheduling(t *testing.T) { - const numQueries = 4 + const numQueries = 40 const numWorkers = 8 workMgr, workers := startWorkManager(t, numWorkers) @@ -414,7 +414,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) { var jobs []*queryJob for i := 0; i < numQueries; i++ { select { - case job := <-workers[i].nextJob: + case job := <-workers[i/10].nextJob: if job.index != uint64(i) { t.Fatalf("unexpected job") } @@ -449,7 +449,7 @@ func TestWorkManagerWorkRankingScheduling(t *testing.T) { // Go backwards, and succeed the queries. for i := numQueries - 1; i >= 0; i-- { select { - case workers[i].results <- &jobResult{ + case workers[i/10].results <- &jobResult{ job: jobs[i], err: nil, }: