Skip to content

Commit

Permalink
Add a method to stop a running Engine via the HTTP API
Browse files Browse the repository at this point in the history
Closes grafana#1352.
  • Loading branch information
hynd committed Mar 11, 2020
1 parent 567e5a0 commit 15d1edd
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 32 deletions.
2 changes: 2 additions & 0 deletions api/v1/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Status struct {
Paused null.Bool `json:"paused" yaml:"paused"`
VUs null.Int `json:"vus" yaml:"vus"`
VUsMax null.Int `json:"vus-max" yaml:"vus-max"`
Stopped bool `json:"stopped" yaml:"stopped"`
Running bool `json:"running" yaml:"running"`
Tainted bool `json:"tainted" yaml:"tainted"`
}
Expand All @@ -42,6 +43,7 @@ func NewStatus(engine *core.Engine) Status {
Status: executionState.GetCurrentExecutionStatus(),
Running: executionState.HasStarted() && !executionState.HasEnded(),
Paused: null.BoolFrom(executionState.IsPaused()),
Stopped: engine.IsStopped(),
VUs: null.IntFrom(executionState.GetCurrentlyActiveVUsCount()),
VUsMax: null.IntFrom(executionState.GetInitializedVUsCount()),
Tainted: engine.IsTainted(),
Expand Down
53 changes: 29 additions & 24 deletions api/v1/status_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,34 +71,39 @@ func HandlePatchStatus(rw http.ResponseWriter, r *http.Request, p httprouter.Par
return
}

if status.Paused.Valid {
if err = engine.ExecutionScheduler.SetPaused(status.Paused.Bool); err != nil {
apiError(rw, "Pause error", err.Error(), http.StatusInternalServerError)
return
if status.Stopped {
engine.Stop()
} else {
if status.Paused.Valid {
if err = engine.ExecutionScheduler.SetPaused(status.Paused.Bool); err != nil {
apiError(rw, "Pause error", err.Error(), http.StatusInternalServerError)
return
}
}
}

if status.VUsMax.Valid || status.VUs.Valid {
//TODO: add ability to specify the actual executor id? Though this should
//likely be in the v2 REST API, where we could implement it in a way that
//may allow us to eventually support other executor types.
executor, updateErr := getFirstExternallyControlledExecutor(engine.ExecutionScheduler)
if updateErr != nil {
apiError(rw, "Execution config error", updateErr.Error(), http.StatusInternalServerError)
return
}
newConfig := executor.GetCurrentConfig().ExternallyControlledConfigParams
if status.VUsMax.Valid {
newConfig.MaxVUs = status.VUsMax
}
if status.VUs.Valid {
newConfig.VUs = status.VUs
}
if updateErr := executor.UpdateConfig(r.Context(), newConfig); err != nil {
apiError(rw, "Config update error", updateErr.Error(), http.StatusInternalServerError)
return
if status.VUsMax.Valid || status.VUs.Valid {
//TODO: add ability to specify the actual executor id? Though this should
//likely be in the v2 REST API, where we could implement it in a way that
//may allow us to eventually support other executor types.
executor, updateErr := getFirstExternallyControlledExecutor(engine.ExecutionScheduler)
if updateErr != nil {
apiError(rw, "Execution config error", updateErr.Error(), http.StatusInternalServerError)
return
}
newConfig := executor.GetCurrentConfig().ExternallyControlledConfigParams
if status.VUsMax.Valid {
newConfig.MaxVUs = status.VUsMax
}
if status.VUs.Valid {
newConfig.VUs = status.VUs
}
if updateErr := executor.UpdateConfig(r.Context(), newConfig); err != nil {
apiError(rw, "Config update error", updateErr.Error(), http.StatusInternalServerError)
return
}
}
}

data, err := jsonapi.Marshal(NewStatus(engine))
if err != nil {
apiError(rw, "Encoding error", err.Error(), http.StatusInternalServerError)
Expand Down
1 change: 1 addition & 0 deletions api/v1/status_routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func TestGetStatus(t *testing.T) {
assert.True(t, status.Paused.Valid)
assert.True(t, status.VUs.Valid)
assert.True(t, status.VUsMax.Valid)
assert.False(t, status.Stopped)
assert.False(t, status.Tainted)
})
}
Expand Down
31 changes: 26 additions & 5 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type Engine struct {
NoSummary bool
SummaryExport bool

logger *logrus.Logger
logger *logrus.Logger
stopChan chan struct{}

Metrics map[string]*stats.Metric
MetricsLock sync.Mutex
Expand All @@ -84,10 +85,11 @@ func NewEngine(ex lib.ExecutionScheduler, o lib.Options, logger *logrus.Logger)
ExecutionScheduler: ex,
executionState: ex.GetState(),

Options: o,
Metrics: make(map[string]*stats.Metric),
Samples: make(chan stats.SampleContainer, o.MetricSamplesBufferSize.Int64),
logger: logger,
Options: o,
Metrics: make(map[string]*stats.Metric),
Samples: make(chan stats.SampleContainer, o.MetricSamplesBufferSize.Int64),
stopChan: make(chan struct{}),
logger: logger,
}

e.thresholds = o.Thresholds
Expand Down Expand Up @@ -218,6 +220,10 @@ func (e *Engine) Run(ctx context.Context) error {
e.logger.Debug("run: context expired; exiting...")
e.setRunStatus(lib.RunStatusAbortedUser)
return nil
case <-e.stopChan:
e.logger.Debug("run: stopped by user; exiting...")
e.setRunStatus(lib.RunStatusAbortedUser)
return nil
}
}
}
Expand All @@ -226,6 +232,21 @@ func (e *Engine) IsTainted() bool {
return e.thresholdsTainted
}

// Stop closes a signal channel, forcing a running Engine to return
func (e *Engine) Stop() {
close(e.stopChan)
}

// IsStopped returns a bool indicating whether the Engine has been stopped
func (e *Engine) IsStopped() bool {
select {
case <-e.stopChan:
return true
default:
return false
}
}

func (e *Engine) runMetricsEmission(ctx context.Context) {
ticker := time.NewTicker(MetricsRate)
for {
Expand Down
20 changes: 17 additions & 3 deletions core/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,20 @@ func TestEngineAtTime(t *testing.T) {
assert.NoError(t, e.Run(ctx))
}

func TestEngineStopped(t *testing.T) {
e := newTestEngine(t, nil, nil, lib.Options{
VUs: null.IntFrom(1),
Duration: types.NullDurationFrom(20 * time.Second),
})

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
assert.NoError(t, e.Run(ctx))
assert.Equal(t, false, e.IsStopped(), "engine should be running")
e.Stop()
assert.Equal(t, true, e.IsStopped(), "engine should be stopped")
}

func TestEngineCollector(t *testing.T) {
testMetric := stats.New("test_metric", stats.Trend)

Expand Down Expand Up @@ -419,9 +433,9 @@ func TestSentReceivedMetrics(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
engine := newTestEngine(t, ctx, r, lib.Options{
Iterations: null.IntFrom(tc.Iterations),
VUs: null.IntFrom(tc.VUs),
Hosts: tb.Dialer.Hosts,
Iterations: null.IntFrom(tc.Iterations),
VUs: null.IntFrom(tc.VUs),
Hosts: tb.Dialer.Hosts,
InsecureSkipTLSVerify: null.BoolFrom(true),
NoVUConnectionReuse: null.BoolFrom(noConnReuse),
Batch: null.IntFrom(20),
Expand Down

0 comments on commit 15d1edd

Please sign in to comment.