diff --git a/src/Illuminate/Queue/Console/ListenCommand.php b/src/Illuminate/Queue/Console/ListenCommand.php index 66ceea4aea74..9388b58b1a51 100755 --- a/src/Illuminate/Queue/Console/ListenCommand.php +++ b/src/Illuminate/Queue/Console/ListenCommand.php @@ -4,8 +4,7 @@ use Illuminate\Queue\Listener; use Illuminate\Console\Command; -use Symfony\Component\Console\Input\InputOption; -use Symfony\Component\Console\Input\InputArgument; +use Illuminate\Queue\ListenerOptions; class ListenCommand extends Command { @@ -14,7 +13,14 @@ class ListenCommand extends Command * * @var string */ - protected $name = 'queue:listen'; + protected $signature = 'queue:listen + {connection? : The name of connection} + {--queue= : The queue to listen on} + {--delay=0 : Amount of time to delay failed jobs} + {--memory=128 : The memory limit in megabytes} + {--sleep=3 : Number of seconds to sleep when no job is available} + {--timeout=60 : The number of seconds a child process can run} + {--tries=0 : Number of times to attempt a job before logging it failed}'; /** * The console command description. @@ -40,7 +46,7 @@ public function __construct(Listener $listener) { parent::__construct(); - $this->listener = $listener; + $this->setOutputHandler($this->listener = $listener); } /** @@ -50,26 +56,15 @@ public function __construct(Listener $listener) */ public function fire() { - $this->setListenerOptions(); - - $connection = $this->input->getArgument('connection'); - - // The memory limit is the amount of memory we will allow the script to occupy - // before killing it and letting a process manager restart it for us, which - // is to protect us against any memory leaks that will be in the scripts. - $memory = $this->input->getOption('memory'); - - $timeout = $this->input->getOption('timeout'); - - $delay = $this->input->getOption('delay'); - // We need to get the right queue for the connection which is set in the queue // configuration file for the application. We will pull it based on the set // connection being run for the queue operation currently being executed. - $queue = $this->getQueue($connection); + $queue = $this->getQueue( + $connection = $this->input->getArgument('connection') + ); $this->listener->listen( - $connection, $queue, $delay, $memory, $timeout + $connection, $queue, $this->gatherOptions() ); } @@ -81,66 +76,36 @@ public function fire() */ protected function getQueue($connection) { - if (is_null($connection)) { - $connection = $this->laravel['config']['queue.default']; - } + $connection = $connection ?: $this->laravel['config']['queue.default']; - $queue = $this->laravel['config']->get( + return $this->input->getOption('queue') ?: $this->laravel['config']->get( "queue.connections.{$connection}.queue", 'default' ); - - return $this->input->getOption('queue') ?: $queue; } /** - * Set the options on the queue listener. + * Get the listener options for the command. * - * @return void + * @return \Illuminate\Queue\ListenerOptions */ - protected function setListenerOptions() + protected function gatherOptions() { - $this->listener->setEnvironment($this->laravel->environment()); - - $this->listener->setSleep($this->option('sleep')); - - $this->listener->setMaxTries($this->option('tries')); - - $this->listener->setOutputHandler(function ($type, $line) { - $this->output->write($line); - }); - } - - /** - * Get the console command arguments. - * - * @return array - */ - protected function getArguments() - { - return [ - ['connection', InputArgument::OPTIONAL, 'The name of connection'], - ]; + return new ListenerOptions( + $this->option('env'), $this->option('delay'), + $this->option('memory'), $this->option('timeout') + ); } /** - * Get the console command options. + * Set the options on the queue listener. * - * @return array + * @param \Illuminate\Queue\Listener $listener + * @return void */ - protected function getOptions() + protected function setOutputHandler(Listener $listener) { - return [ - ['queue', null, InputOption::VALUE_OPTIONAL, 'The queue to listen on', null], - - ['delay', null, InputOption::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0], - - ['memory', null, InputOption::VALUE_OPTIONAL, 'The memory limit in megabytes', 128], - - ['timeout', null, InputOption::VALUE_OPTIONAL, 'Seconds a job may run before timing out', 60], - - ['sleep', null, InputOption::VALUE_OPTIONAL, 'Seconds to wait before checking queue for jobs', 3], - - ['tries', null, InputOption::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0], - ]; + $listener->setOutputHandler(function ($type, $line) { + $this->output->write($line); + }); } } diff --git a/src/Illuminate/Queue/Listener.php b/src/Illuminate/Queue/Listener.php index a84cc6790a4c..fef3299e9952 100755 --- a/src/Illuminate/Queue/Listener.php +++ b/src/Illuminate/Queue/Listener.php @@ -60,7 +60,7 @@ class Listener public function __construct($commandPath) { $this->commandPath = $commandPath; - $this->workerCommand = $this->buildWorkerCommand(); + $this->workerCommand = $this->buildCommandTemplate(); } /** @@ -68,15 +68,35 @@ public function __construct($commandPath) * * @return string */ - protected function buildWorkerCommand() + protected function buildCommandTemplate() { - $binary = ProcessUtils::escapeArgument((new PhpExecutableFinder)->find(false)); + $command = 'queue:work %s --once --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s'; - $artisan = defined('ARTISAN_BINARY') ? ProcessUtils::escapeArgument(ARTISAN_BINARY) : 'artisan'; + return "{$this->phpBinary()} {$this->artisanBinary()} {$command}"; + } - $command = 'queue:work %s --once --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s'; + /** + * Get the PHP binary. + * + * @return string + */ + protected function phpBinary() + { + return ProcessUtils::escapeArgument( + (new PhpExecutableFinder)->find(false) + ); + } - return "{$binary} {$artisan} {$command}"; + /** + * Get the Artisan binary. + * + * @return string + */ + protected function artisanBinary() + { + return defined('ARTISAN_BINARY') + ? ProcessUtils::escapeArgument(ARTISAN_BINARY) + : 'artisan'; } /** @@ -84,17 +104,15 @@ protected function buildWorkerCommand() * * @param string $connection * @param string $queue - * @param string $delay - * @param string $memory - * @param int $timeout + * @param \Illuminate\Queue\ListenerOptions $options * @return void */ - public function listen($connection, $queue, $delay, $memory, $timeout = 60) + public function listen($connection, $queue, ListenerOptions $options) { - $process = $this->makeProcess($connection, $queue, $delay, $memory, $timeout); + $process = $this->makeProcess($connection, $queue, $options); while (true) { - $this->runProcess($process, $memory); + $this->runProcess($process, $options->memory); } } @@ -103,31 +121,29 @@ public function listen($connection, $queue, $delay, $memory, $timeout = 60) * * @param string $connection * @param string $queue - * @param int $delay - * @param int $memory - * @param int $timeout + * @param \Illuminate\Queue\ListenerOptions $options * @return \Symfony\Component\Process\Process */ - public function makeProcess($connection, $queue, $delay, $memory, $timeout) + public function makeProcess($connection, $queue, ListenerOptions $options) { $command = $this->workerCommand; // If the environment is set, we will append it to the command string so the // workers will run under the specified environment. Otherwise, they will // just run under the production environment which is not always right. - if (isset($this->environment)) { - $command = $this->addEnvironment($command); + if (isset($options->environment)) { + $command = $this->addEnvironment($command, $options); } // Next, we will just format out the worker commands with all of the various // options available for the command. This will produce the final command // line that we will pass into a Symfony process object for processing. $command = $this->formatCommand( - $command, $connection, $queue, $delay, $memory + $command, $connection, $queue, $options ); return new Process( - $command, $this->commandPath, null, null, $timeout + $command, $this->commandPath, null, null, $options->timeout ); } @@ -135,11 +151,12 @@ public function makeProcess($connection, $queue, $delay, $memory, $timeout) * Add the environment option to the given command. * * @param string $command + * @param \Illuminate\Queue\ListenerOptions $options * @return string */ - protected function addEnvironment($command) + protected function addEnvironment($command, ListenerOptions $options) { - return $command.' --env='.ProcessUtils::escapeArgument($this->environment); + return $command.' --env='.ProcessUtils::escapeArgument($options->environment); } /** @@ -148,17 +165,17 @@ protected function addEnvironment($command) * @param string $command * @param string $connection * @param string $queue - * @param int $delay - * @param int $memory + * @param \Illuminate\Queue\ListenerOptions $options * @return string */ - protected function formatCommand($command, $connection, $queue, $delay, $memory) + protected function formatCommand($command, $connection, $queue, ListenerOptions $options) { return sprintf( $command, ProcessUtils::escapeArgument($connection), ProcessUtils::escapeArgument($queue), - $delay, $memory, $this->sleep, $this->maxTries + $options->delay, $options->memory, + $options->sleep, $options->maxTries ); } @@ -228,37 +245,4 @@ public function setOutputHandler(Closure $outputHandler) { $this->outputHandler = $outputHandler; } - - /** - * Set the current environment. - * - * @param string $environment - * @return void - */ - public function setEnvironment($environment) - { - $this->environment = $environment; - } - - /** - * Set the amount of seconds to wait before polling the queue. - * - * @param int $sleep - * @return void - */ - public function setSleep($sleep) - { - $this->sleep = $sleep; - } - - /** - * Set the amount of times to try a job before logging it failed. - * - * @param int $tries - * @return void - */ - public function setMaxTries($tries) - { - $this->maxTries = $tries; - } } diff --git a/src/Illuminate/Queue/ListenerOptions.php b/src/Illuminate/Queue/ListenerOptions.php new file mode 100644 index 000000000000..9812a314da61 --- /dev/null +++ b/src/Illuminate/Queue/ListenerOptions.php @@ -0,0 +1,31 @@ +environment = $environment; + + parent::__construct($delay, $memory, $timeout, $sleep, $maxTries, $force); + } +} diff --git a/tests/Queue/QueueListenerTest.php b/tests/Queue/QueueListenerTest.php index 9a3734959a69..c904995b05a6 100755 --- a/tests/Queue/QueueListenerTest.php +++ b/tests/Queue/QueueListenerTest.php @@ -33,7 +33,11 @@ public function testListenerStopsWhenMemoryIsExceeded() public function testMakeProcessCorrectlyFormatsCommandLine() { $listener = new Illuminate\Queue\Listener(__DIR__); - $process = $listener->makeProcess('connection', 'queue', 1, 2, 3); + $options = new Illuminate\Queue\ListenerOptions(); + $options->delay = 1; + $options->memory = 2; + $options->timeout = 3; + $process = $listener->makeProcess('connection', 'queue', $options); $escape = '\\' === DIRECTORY_SEPARATOR ? '"' : '\''; $this->assertInstanceOf('Symfony\Component\Process\Process', $process);