Skip to content

Commit

Permalink
Fix how reservation works when queue is paused.
Browse files Browse the repository at this point in the history
Queue was moving jobs to reserve queue even when queue was pause was
paused… Which can delay jobs unnecessarily.
  • Loading branch information
taylorotwell committed Jan 31, 2017
1 parent 565456d commit 9d348c5
Showing 1 changed file with 38 additions and 17 deletions.
55 changes: 38 additions & 17 deletions src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
$lastRestart = $this->getTimestampOfLastQueueRestart();

while (true) {
if (! $this->daemonShouldRun($options)) {
$this->pauseWorker($options, $lastRestart);

continue;
}

// First, we will attempt to get the next job off of the queue. We will also
// register the timeout handler and reset the alarm for this job so it is
// not stuck in a frozen state forever. Then, we can fire off this job.
Expand All @@ -101,7 +107,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
// If the daemon should run (not in maintenance mode, etc.), then we can run
// fire off this job for processing. Otherwise, we will need to sleep the
// worker so no more jobs are processed until they should be processed.
if ($job && $this->daemonShouldRun($options)) {
if ($job) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
Expand All @@ -110,11 +116,7 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
// Finally, we will check to see if we have exceeded our memory limits or if
// the queue should restart based on other indications. If so, we'll stop
// this worker and let whatever is "monitoring" it restart the process.
if ($this->memoryExceeded($options->memory)) {
$this->stop(12);
} elseif ($this->queueShouldRestart($lastRestart)) {
$this->stop();
}
$this->stopIfNecessary($options, $lastRestart);
}
}

Expand Down Expand Up @@ -148,8 +150,7 @@ protected function registerTimeoutHandler($job, WorkerOptions $options)
*/
protected function timeoutForJob($job, WorkerOptions $options)
{
return $job && ! is_null($job->timeout())
? $job->timeout() : $options->timeout;
return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout;
}

/**
Expand All @@ -160,18 +161,38 @@ protected function timeoutForJob($job, WorkerOptions $options)
*/
protected function daemonShouldRun(WorkerOptions $options)
{
if (($this->manager->isDownForMaintenance() && ! $options->force) ||
return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
$this->paused ||
$this->events->until(new Events\Looping) === false) {
// If the application is down for maintenance or doesn't want the queues to run
// we will sleep for one second just in case the developer has it set to not
// sleep at all. This just prevents CPU from maxing out in this situation.
$this->sleep(1);
$this->events->until(new Events\Looping) === false);
}

return false;
}
/**
* Pause the worker for the current loop.
*
* @param WorkerOptions $options
* @param int $lastRestart
* @return void
*/
protected function pauseWorker(WorkerOptions $options, $lastRestart)
{
$this->sleep($options->sleep > 0 ? $options->sleep : 1);

return true;
$this->stopIfNecessary($options, $lastRestart);
}

/**
* Stop the process if necessary.
*
* @param WorkerOptions $options
* @param int $lastRestart
*/
protected function stopIfNecessary(WorkerOptions $options, $lastRestart)
{
if ($this->memoryExceeded($options->memory)) {
$this->stop(12);
} elseif ($this->queueShouldRestart($lastRestart)) {
$this->stop();
}
}

/**
Expand Down

0 comments on commit 9d348c5

Please sign in to comment.