Skip to content

Commit

Permalink
Merge branch 'master' into epic-vm
Browse files Browse the repository at this point in the history
  • Loading branch information
travis-architect authored Mar 22, 2021
2 parents acaec76 + cd59585 commit 19c6961
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 34 deletions.
3 changes: 2 additions & 1 deletion amqp_canceller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type cancelCommand struct {
Type string `json:"type"`
JobID uint64 `json:"job_id"`
Source string `json:"source"`
Reason string `json:"reason"`
}

// AMQPCanceller is responsible for listening to a command queue on AMQP and
Expand Down Expand Up @@ -113,7 +114,7 @@ func (d *AMQPCanceller) processCommand(delivery amqp.Delivery) error {
return nil
}

d.cancellationBroadcaster.Broadcast(command.JobID)
d.cancellationBroadcaster.Broadcast(CancellationCommand{JobID: command.JobID, Reason: command.Reason})

return nil
}
Expand Down
24 changes: 15 additions & 9 deletions canceller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,60 @@ package worker

import "sync"

type CancellationCommand struct {
JobID uint64
Reason string
}

// A CancellationBroadcaster allows you to subscribe to and unsubscribe from
// cancellation messages for a given job ID.
type CancellationBroadcaster struct {
registryMutex sync.Mutex
registry map[uint64][](chan struct{})
registry map[uint64][](chan CancellationCommand)
}

// NewCancellationBroadcaster sets up a new cancellation broadcaster with an
// empty registry.
func NewCancellationBroadcaster() *CancellationBroadcaster {
return &CancellationBroadcaster{
registry: make(map[uint64][](chan struct{})),
registry: make(map[uint64][](chan CancellationCommand)),
}
}

// Broadcast broacasts a cancellation message to all currently subscribed
// cancellers.
func (cb *CancellationBroadcaster) Broadcast(id uint64) {
func (cb *CancellationBroadcaster) Broadcast(command CancellationCommand) {
cb.registryMutex.Lock()
defer cb.registryMutex.Unlock()

chans := cb.registry[id]
delete(cb.registry, id)
chans := cb.registry[command.JobID]
delete(cb.registry, command.JobID)

for _, ch := range chans {
ch <- command
close(ch)
}
}

// Subscribe will set up a subscription for cancellation messages for the
// given job ID. When a cancellation message comes in, the returned channel
// will be closed.
func (cb *CancellationBroadcaster) Subscribe(id uint64) <-chan struct{} {
func (cb *CancellationBroadcaster) Subscribe(id uint64) <-chan CancellationCommand {
cb.registryMutex.Lock()
defer cb.registryMutex.Unlock()

if _, ok := cb.registry[id]; !ok {
cb.registry[id] = make([](chan struct{}), 0, 1)
cb.registry[id] = make([](chan CancellationCommand), 0, 1)
}

ch := make(chan struct{})
ch := make(chan CancellationCommand, 1)
cb.registry[id] = append(cb.registry[id], ch)

return ch
}

// Unsubscribe removes an existing subscription for the channel.
func (cb *CancellationBroadcaster) Unsubscribe(id uint64, ch <-chan struct{}) {
func (cb *CancellationBroadcaster) Unsubscribe(id uint64, ch <-chan CancellationCommand) {
cb.registryMutex.Lock()
defer cb.registryMutex.Unlock()

Expand Down
30 changes: 18 additions & 12 deletions canceller_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package worker

import "testing"
import (
"testing"
)

func TestCancellationBroadcaster(t *testing.T) {
cb := NewCancellationBroadcaster()
Expand All @@ -12,33 +14,37 @@ func TestCancellationBroadcaster(t *testing.T) {

cb.Unsubscribe(1, ch1_2)

cb.Broadcast(1)
cb.Broadcast(1)
cb.Broadcast(CancellationCommand{JobID: 1, Reason: "42"})
cb.Broadcast(CancellationCommand{JobID: 1, Reason: "42"})

assertClosed(t, "ch1_1", ch1_1)
assertReceived(t, "ch1_1", ch1_1, CancellationCommand{JobID: 1, Reason: "42"})
assertWaiting(t, "ch1_2", ch1_2)
assertClosed(t, "ch1_3", ch1_3)
assertReceived(t, "ch1_3", ch1_3, CancellationCommand{JobID: 1, Reason: "42"})
assertWaiting(t, "ch2", ch2)
}

func assertClosed(t *testing.T, name string, ch <-chan struct{}) {
func assertReceived(t *testing.T, name string, ch <-chan CancellationCommand, expected CancellationCommand) {
select {
case _, ok := (<-ch):
case val, ok := (<-ch):
if ok {
t.Errorf("expected %s to be closed, but it received a value", name)
if expected != val {
t.Errorf("expected to receive %v, got %v", expected, val)
}
} else {
t.Errorf("expected %s to not be closed, but it was closed", name)
}
default:
t.Errorf("expected %s to be closed, but it wasn't", name)
t.Errorf("expected %s to receive a value, but it didn't", name)
}
}

func assertWaiting(t *testing.T, name string, ch <-chan struct{}) {
func assertWaiting(t *testing.T, name string, ch <-chan CancellationCommand) {
select {
case _, ok := (<-ch):
if ok {
t.Errorf("expected %s to be not be closed and not have a value, but it received a value", name)
t.Errorf("expected %s to not be closed and not have a value, but it received a value", name)
} else {
t.Errorf("expected %s to be not be closed and not have a value, but it was closed", name)
t.Errorf("expected %s to not be closed and not have a value, but it was closed", name)
}
default:
}
Expand Down
4 changes: 2 additions & 2 deletions http_job_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (q *HTTPJobQueue) fetchJob(ctx gocontext.Context, jobID uint64) (Job, <-cha
return q.deleteJob(ctx, jobID)
},
cancelSelf: func(ctx gocontext.Context) {
q.cb.Broadcast(jobID)
q.cb.Broadcast(CancellationCommand{JobID: jobID})
},
}
startAttrs := &httpJobPayloadStartAttrs{
Expand Down Expand Up @@ -495,7 +495,7 @@ func (q *HTTPJobQueue) generateJobRefreshClaimFunc(jobID uint64) (func(gocontext
"err": err,
"job_id": jobID,
}).Error("cancelling")
q.cb.Broadcast(jobID)
q.cb.Broadcast(CancellationCommand{JobID: jobID})
return
}

Expand Down
2 changes: 1 addition & 1 deletion processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestProcessor(t *testing.T) {
if tc.isCancelled {
go func(sl time.Duration, i uint64) {
time.Sleep(sl)
cancellationBroadcaster.Broadcast(i)
cancellationBroadcaster.Broadcast(CancellationCommand{JobID: i})
}(tc.runSleep-1, jobID)
}

Expand Down
7 changes: 4 additions & 3 deletions step_check_cancellation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package worker
import (
gocontext "context"
"errors"
"fmt"

"github.com/mitchellh/multistep"
"github.com/travis-ci/worker/context"
Expand All @@ -14,20 +15,20 @@ var JobCancelledError = errors.New("job cancelled")
type stepCheckCancellation struct{}

func (s *stepCheckCancellation) Run(state multistep.StateBag) multistep.StepAction {
cancelChan := state.Get("cancelChan").(<-chan struct{})
cancelChan := state.Get("cancelChan").(<-chan CancellationCommand)

ctx := state.Get("ctx").(gocontext.Context)

_, span := trace.StartSpan(ctx, "CheckCancellation.Run")
defer span.End()

select {
case <-cancelChan:
case command := <-cancelChan:
ctx := state.Get("ctx").(gocontext.Context)
buildJob := state.Get("buildJob").(Job)
if _, ok := state.GetOk("logWriter"); ok {
logWriter := state.Get("logWriter").(LogWriter)
s.writeLogAndFinishWithState(ctx, logWriter, buildJob, FinishStateCancelled, "\n\nDone: Job Cancelled\n\n")
s.writeLogAndFinishWithState(ctx, logWriter, buildJob, FinishStateCancelled, fmt.Sprintf("\n\nDone: Job Cancelled\n\n%s", command.Reason))
} else {
err := buildJob.Finish(ctx, FinishStateCancelled)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions step_run_script.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (s *stepRunScript) Run(state multistep.StateBag) multistep.StepAction {
buildJob := state.Get("buildJob").(Job)
instance := state.Get("instance").(backend.Instance)
logWriter := state.Get("logWriter").(LogWriter)
cancelChan := state.Get("cancelChan").(<-chan struct{})
cancelChan := state.Get("cancelChan").(<-chan CancellationCommand)

defer context.TimeSince(ctx, "step_run_script_run", time.Now())

Expand Down Expand Up @@ -134,15 +134,15 @@ func (s *stepRunScript) Run(state multistep.StateBag) multistep.StepAction {

logger.Info("context was cancelled, stopping job")
return multistep.ActionHalt
case <-cancelChan:
case cancelCommand := <-cancelChan:
state.Put("err", JobCancelledError)

span.SetStatus(trace.Status{
Code: trace.StatusCodeUnavailable,
Message: JobCancelledError.Error(),
})

s.writeLogAndFinishWithState(preTimeoutCtx, ctx, logWriter, buildJob, FinishStateCancelled, "\n\nDone: Job Cancelled\n\n")
s.writeLogAndFinishWithState(preTimeoutCtx, ctx, logWriter, buildJob, FinishStateCancelled, fmt.Sprintf("\n\nDone: Job Cancelled\n\n%s", cancelCommand.Reason))

return multistep.ActionHalt
case <-logWriter.Timeout():
Expand Down
6 changes: 3 additions & 3 deletions step_subscribe_cancellation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ func (s *stepSubscribeCancellation) Run(state multistep.StateBag) multistep.Step
defer span.End()

if s.cancellationBroadcaster == nil {
ch := make(chan struct{})
state.Put("cancelChan", (<-chan struct{})(ch))
ch := make(chan CancellationCommand)
state.Put("cancelChan", (<-chan CancellationCommand)(ch))
return multistep.ActionContinue
}

Expand All @@ -41,6 +41,6 @@ func (s *stepSubscribeCancellation) Cleanup(state multistep.StateBag) {
defer span.End()

buildJob := state.Get("buildJob").(Job)
ch := state.Get("cancelChan").(<-chan struct{})
ch := state.Get("cancelChan").(<-chan CancellationCommand)
s.cancellationBroadcaster.Unsubscribe(buildJob.Payload().Job.ID, ch)
}

0 comments on commit 19c6961

Please sign in to comment.