Skip to content

Commit

Permalink
Beginning re-factors of listener.
Browse files Browse the repository at this point in the history
  • Loading branch information
taylorotwell committed Dec 30, 2016
1 parent 215b158 commit 07a9402
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 65 deletions.
33 changes: 22 additions & 11 deletions src/Illuminate/Queue/Console/FailedTableCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,10 @@ public function fire()
{
$table = $this->laravel['config']['queue.failed.table'];

$tableClassName = Str::studly($table);

$fullPath = $this->createBaseMigration($table);

$stub = str_replace(
['{{table}}', '{{tableClassName}}'], [$table, $tableClassName], $this->files->get(__DIR__.'/stubs/failed_jobs.stub')
$this->replaceMigration(
$this->createBaseMigration($table), $table, Str::studly($table)
);

$this->files->put($fullPath, $stub);

$this->info('Migration created successfully!');

$this->composer->dumpAutoloads();
Expand All @@ -82,10 +76,27 @@ public function fire()
*/
protected function createBaseMigration($table = 'failed_jobs')
{
$name = 'create_'.$table.'_table';
return $this->laravel['migration.creator']->create(
'create_'.$table.'_table', $this->laravel->databasePath().'/migrations'
);
}

$path = $this->laravel->databasePath().'/migrations';
/**
* Replace the generated migration with the failed job table stub.
*
* @param string $path
* @param string $table
* @param string $tableClassName
* @return void
*/
protected function replaceMigration($path, $table, $tableClassName)
{
$stub = str_replace(
['{{table}}', '{{tableClassName}}'],
[$table, $tableClassName],
$this->files->get(__DIR__.'/stubs/failed_jobs.stub')
);

return $this->laravel['migration.creator']->create($name, $path);
$this->files->put($path, $stub);
}
}
10 changes: 6 additions & 4 deletions src/Illuminate/Queue/Console/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ public function fire()
{
$this->setListenerOptions();

$delay = $this->input->getOption('delay');
$connection = $this->input->getArgument('connection');

// The memory limit is the amount of memory we will allow the script to occupy
// before killing it and letting a process manager restart it for us, which
// is to protect us against any memory leaks that will be in the scripts.
$memory = $this->input->getOption('memory');

$connection = $this->input->getArgument('connection');

$timeout = $this->input->getOption('timeout');

$delay = $this->input->getOption('delay');

// We need to get the right queue for the connection which is set in the queue
// configuration file for the application. We will pull it based on the set
// connection being run for the queue operation currently being executed.
Expand All @@ -85,7 +85,9 @@ protected function getQueue($connection)
$connection = $this->laravel['config']['queue.default'];
}

$queue = $this->laravel['config']->get("queue.connections.{$connection}.queue", 'default');
$queue = $this->laravel['config']->get(
"queue.connections.{$connection}.queue", 'default'
);

return $this->input->getOption('queue') ?: $queue;
}
Expand Down
107 changes: 57 additions & 50 deletions src/Illuminate/Queue/Listener.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,27 +98,6 @@ public function listen($connection, $queue, $delay, $memory, $timeout = 60)
}
}

/**
* Run the given process.
*
* @param \Symfony\Component\Process\Process $process
* @param int $memory
* @return void
*/
public function runProcess(Process $process, $memory)
{
$process->run(function ($type, $line) {
$this->handleWorkerOutput($type, $line);
});

// Once we have run the job we'll go check if the memory limit has been
// exceeded for the script. If it has, we will kill this script so a
// process manager will restart this with a clean slate of memory.
if ($this->memoryExceeded($memory)) {
$this->stop();
}
}

/**
* Create a new Symfony process for the worker.
*
Expand All @@ -131,29 +110,77 @@ public function runProcess(Process $process, $memory)
*/
public function makeProcess($connection, $queue, $delay, $memory, $timeout)
{
$string = $this->workerCommand;
$command = $this->workerCommand;

// If the environment is set, we will append it to the command string so the
// workers will run under the specified environment. Otherwise, they will
// just run under the production environment which is not always right.
if (isset($this->environment)) {
$string .= ' --env='.ProcessUtils::escapeArgument($this->environment);
$command = $this->addEnvironment($command);
}

// Next, we will just format out the worker commands with all of the various
// options available for the command. This will produce the final command
// line that we will pass into a Symfony process object for processing.
$command = sprintf(
$string,
$command = $this->formatCommand(
$command, $connection, $queue, $delay, $memory
);

return new Process(
$command, $this->commandPath, null, null, $timeout
);
}

/**
* Add the environment option to the given command.
*
* @param string $command
* @return string
*/
protected function addEnvironment($command)
{
return $command.' --env='.ProcessUtils::escapeArgument($this->environment);
}

/**
* Format the given command with the listener options.
*
* @param string $command
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $memory
* @return string
*/
protected function formatCommand($command, $connection, $queue, $delay, $memory)
{
return sprintf(
$command,
ProcessUtils::escapeArgument($connection),
ProcessUtils::escapeArgument($queue),
$delay,
$memory,
$this->sleep,
$this->maxTries
$delay, $memory, $this->sleep, $this->maxTries
);
}

/**
* Run the given process.
*
* @param \Symfony\Component\Process\Process $process
* @param int $memory
* @return void
*/
public function runProcess(Process $process, $memory)
{
$process->run(function ($type, $line) {
$this->handleWorkerOutput($type, $line);
});

return new Process($command, $this->commandPath, null, null, $timeout);
// Once we have run the job we'll go check if the memory limit has been exceeded
// for the script. If it has, we will kill this script so the process manager
// will restart this with a clean slate of memory automatically on exiting.
if ($this->memoryExceeded($memory)) {
$this->stop();
}
}

/**
Expand Down Expand Up @@ -202,16 +229,6 @@ public function setOutputHandler(Closure $outputHandler)
$this->outputHandler = $outputHandler;
}

/**
* Get the current listener environment.
*
* @return string
*/
public function getEnvironment()
{
return $this->environment;
}

/**
* Set the current environment.
*
Expand All @@ -223,16 +240,6 @@ public function setEnvironment($environment)
$this->environment = $environment;
}

/**
* Get the amount of seconds to wait before polling the queue.
*
* @return int
*/
public function getSleep()
{
return $this->sleep;
}

/**
* Set the amount of seconds to wait before polling the queue.
*
Expand Down

0 comments on commit 07a9402

Please sign in to comment.