Skip to content

Commit

Permalink
add listener optinos
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorotwell committed Dec 30, 2016
1 parent 07a9402 commit a82a25f
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 125 deletions.
95 changes: 30 additions & 65 deletions src/Illuminate/Queue/Console/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

use Illuminate\Queue\Listener;
use Illuminate\Console\Command;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputArgument;
use Illuminate\Queue\ListenerOptions;

class ListenCommand extends Command
{
Expand All @@ -14,7 +13,14 @@ class ListenCommand extends Command
*
* @var string
*/
protected $name = 'queue:listen';
protected $signature = 'queue:listen
{connection? : The name of connection}
{--queue= : The queue to listen on}
{--delay=0 : Amount of time to delay failed jobs}
{--memory=128 : The memory limit in megabytes}
{--sleep=3 : Number of seconds to sleep when no job is available}
{--timeout=60 : The number of seconds a child process can run}
{--tries=0 : Number of times to attempt a job before logging it failed}';

/**
* The console command description.
Expand All @@ -40,7 +46,7 @@ public function __construct(Listener $listener)
{
parent::__construct();

$this->listener = $listener;
$this->setOutputHandler($this->listener = $listener);
}

/**
Expand All @@ -50,26 +56,15 @@ public function __construct(Listener $listener)
*/
public function fire()
{
$this->setListenerOptions();

$connection = $this->input->getArgument('connection');

// The memory limit is the amount of memory we will allow the script to occupy
// before killing it and letting a process manager restart it for us, which
// is to protect us against any memory leaks that will be in the scripts.
$memory = $this->input->getOption('memory');

$timeout = $this->input->getOption('timeout');

$delay = $this->input->getOption('delay');

// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
$queue = $this->getQueue($connection);
$queue = $this->getQueue(
$connection = $this->input->getArgument('connection')
);

$this->listener->listen(
$connection, $queue, $delay, $memory, $timeout
$connection, $queue, $this->gatherOptions()
);
}

Expand All @@ -81,66 +76,36 @@ public function fire()
*/
protected function getQueue($connection)
{
if (is_null($connection)) {
$connection = $this->laravel['config']['queue.default'];
}
$connection = $connection ?: $this->laravel['config']['queue.default'];

$queue = $this->laravel['config']->get(
return $this->input->getOption('queue') ?: $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);

return $this->input->getOption('queue') ?: $queue;
}

/**
* Set the options on the queue listener.
* Get the listener options for the command.
*
* @return void
* @return \Illuminate\Queue\ListenerOptions
*/
protected function setListenerOptions()
protected function gatherOptions()
{
$this->listener->setEnvironment($this->laravel->environment());

$this->listener->setSleep($this->option('sleep'));

$this->listener->setMaxTries($this->option('tries'));

$this->listener->setOutputHandler(function ($type, $line) {
$this->output->write($line);
});
}

/**
* Get the console command arguments.
*
* @return array
*/
protected function getArguments()
{
return [
['connection', InputArgument::OPTIONAL, 'The name of connection'],
];
return new ListenerOptions(
$this->option('env'), $this->option('delay'),
$this->option('memory'), $this->option('timeout')
);
}

/**
* Get the console command options.
* Set the options on the queue listener.
*
* @return array
* @param \Illuminate\Queue\Listener $listener
* @return void
*/
protected function getOptions()
protected function setOutputHandler(Listener $listener)
{
return [
['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to listen on', null],

['delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0],

['memory', null, InputOption::VALUE_OPTIONAL, 'The memory limit in megabytes', 128],

['timeout', null, InputOption::VALUE_OPTIONAL, 'Seconds a job may run before timing out', 60],

['sleep', null, InputOption::VALUE_OPTIONAL, 'Seconds to wait before checking queue for jobs', 3],

['tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0],
];
$listener->setOutputHandler(function ($type, $line) {
$this->output->write($line);
});
}
}
102 changes: 43 additions & 59 deletions src/Illuminate/Queue/Listener.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,41 +60,59 @@ class Listener
public function __construct($commandPath)
{
$this->commandPath = $commandPath;
$this->workerCommand = $this->buildWorkerCommand();
$this->workerCommand = $this->buildCommandTemplate();
}

/**
* Build the environment specific worker command.
*
* @return string
*/
protected function buildWorkerCommand()
protected function buildCommandTemplate()
{
$binary = ProcessUtils::escapeArgument((new PhpExecutableFinder)->find(false));
$command = 'queue:work %s --once --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s';

$artisan = defined('ARTISAN_BINARY') ? ProcessUtils::escapeArgument(ARTISAN_BINARY) : 'artisan';
return "{$this->phpBinary()} {$this->artisanBinary()} {$command}";
}

$command = 'queue:work %s --once --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s';
/**
* Get the PHP binary.
*
* @return string
*/
protected function phpBinary()
{
return ProcessUtils::escapeArgument(
(new PhpExecutableFinder)->find(false)
);
}

return "{$binary} {$artisan} {$command}";
/**
* Get the Artisan binary.
*
* @return string
*/
protected function artisanBinary()
{
return defined('ARTISAN_BINARY')
? ProcessUtils::escapeArgument(ARTISAN_BINARY)
: 'artisan';
}

/**
* Listen to the given queue connection.
*
* @param string $connection
* @param string $queue
* @param string $delay
* @param string $memory
* @param int $timeout
* @param \Illuminate\Queue\ListenerOptions $options
* @return void
*/
public function listen($connection, $queue, $delay, $memory, $timeout = 60)
public function listen($connection, $queue, ListenerOptions $options)
{
$process = $this->makeProcess($connection, $queue, $delay, $memory, $timeout);
$process = $this->makeProcess($connection, $queue, $options);

while (true) {
$this->runProcess($process, $memory);
$this->runProcess($process, $options->memory);
}
}

Expand All @@ -103,43 +121,42 @@ public function listen($connection, $queue, $delay, $memory, $timeout = 60)
*
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $memory
* @param int $timeout
* @param \Illuminate\Queue\ListenerOptions $options
* @return \Symfony\Component\Process\Process
*/
public function makeProcess($connection, $queue, $delay, $memory, $timeout)
public function makeProcess($connection, $queue, ListenerOptions $options)
{
$command = $this->workerCommand;

// If the environment is set, we will append it to the command string so the
// workers will run under the specified environment. Otherwise, they will
// just run under the production environment which is not always right.
if (isset($this->environment)) {
$command = $this->addEnvironment($command);
if (isset($options->environment)) {
$command = $this->addEnvironment($command, $options);
}

// Next, we will just format out the worker commands with all of the various
// options available for the command. This will produce the final command
// line that we will pass into a Symfony process object for processing.
$command = $this->formatCommand(
$command, $connection, $queue, $delay, $memory
$command, $connection, $queue, $options
);

return new Process(
$command, $this->commandPath, null, null, $timeout
$command, $this->commandPath, null, null, $options->timeout
);
}

/**
* Add the environment option to the given command.
*
* @param string $command
* @param \Illuminate\Queue\ListenerOptions $options
* @return string
*/
protected function addEnvironment($command)
protected function addEnvironment($command, ListenerOptions $options)
{
return $command.' --env='.ProcessUtils::escapeArgument($this->environment);
return $command.' --env='.ProcessUtils::escapeArgument($options->environment);
}

/**
Expand All @@ -148,17 +165,17 @@ protected function addEnvironment($command)
* @param string $command
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $memory
* @param \Illuminate\Queue\ListenerOptions $options
* @return string
*/
protected function formatCommand($command, $connection, $queue, $delay, $memory)
protected function formatCommand($command, $connection, $queue, ListenerOptions $options)
{
return sprintf(
$command,
ProcessUtils::escapeArgument($connection),
ProcessUtils::escapeArgument($queue),
$delay, $memory, $this->sleep, $this->maxTries
$options->delay, $options->memory,
$options->sleep, $options->maxTries
);
}

Expand Down Expand Up @@ -228,37 +245,4 @@ public function setOutputHandler(Closure $outputHandler)
{
$this->outputHandler = $outputHandler;
}

/**
* Set the current environment.
*
* @param string $environment
* @return void
*/
public function setEnvironment($environment)
{
$this->environment = $environment;
}

/**
* Set the amount of seconds to wait before polling the queue.
*
* @param int $sleep
* @return void
*/
public function setSleep($sleep)
{
$this->sleep = $sleep;
}

/**
* Set the amount of times to try a job before logging it failed.
*
* @param int $tries
* @return void
*/
public function setMaxTries($tries)
{
$this->maxTries = $tries;
}
}
31 changes: 31 additions & 0 deletions src/Illuminate/Queue/ListenerOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

namespace Illuminate\Queue;

class ListenerOptions extends WorkerOptions
{
/**
* The environment the worker should run in.
*
* @var string
*/
public $environment;

/**
* Create a new listener options instance.
*
* @param string $environment
* @param int $delay
* @param int $memory
* @param int $timeout
* @param int $sleep
* @param int $maxTries
* @param bool $force
*/
public function __construct($environment = null, $delay = 0, $memory = 128, $timeout = 60, $sleep = 3, $maxTries = 0, $force = false)
{
$this->environment = $environment;

parent::__construct($delay, $memory, $timeout, $sleep, $maxTries, $force);
}
}
6 changes: 5 additions & 1 deletion tests/Queue/QueueListenerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ public function testListenerStopsWhenMemoryIsExceeded()
public function testMakeProcessCorrectlyFormatsCommandLine()
{
$listener = new Illuminate\Queue\Listener(__DIR__);
$process = $listener->makeProcess('connection', 'queue', 1, 2, 3);
$options = new Illuminate\Queue\ListenerOptions();
$options->delay = 1;
$options->memory = 2;
$options->timeout = 3;
$process = $listener->makeProcess('connection', 'queue', $options);
$escape = '\\' === DIRECTORY_SEPARATOR ? '"' : '\'';

$this->assertInstanceOf('Symfony\Component\Process\Process', $process);
Expand Down

0 comments on commit a82a25f

Please sign in to comment.