Skip to content

Commit

Permalink
Merge pull request #111 from magemojo/1.3.7
Browse files Browse the repository at this point in the history
1.3.7
  • Loading branch information
gnuzealot authored Nov 16, 2020
2 parents 24fcf60 + 70d60e5 commit d3b4733
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 30 deletions.
5 changes: 5 additions & 0 deletions Controller/Adminhtml/Settings/Index.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public function execute()
$fail = true;
$this->messageManager->addError('Exporters Timeout must be numeric');
}
if ($this->getRequest()->getParam('consumersgovernor')) {
$this->resource->setConfigValue('magemojo/cron/consumersgovernor','default',0,1);
} else {
$this->resource->setConfigValue('magemojo/cron/consumersgovernor','default',0,0);
}
if (!$fail) {
$this->messageManager->addSuccess('Cron Configuration Saved');
}
Expand Down
10 changes: 8 additions & 2 deletions Model/ResourceModel/Schedule.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,16 @@ public function setJobStatus($scheduleid, $status, $output) {
public function getPendingJobs() {
$connection = $this->getConnection();
$select = $connection->select()
->from($this->getTable('cron_schedule'),['max(schedule_id) as schedule_id','job_code','count(*) as job_count'])
->from($this->getTable('cron_schedule'), [
'max(schedule_id) as schedule_id',
'job_code',
'count(*) as job_count',
'min(scheduled_at) as scheduled_at'
])
->where('status = ?', 'pending')
->where('scheduled_at < ?', date('Y-m-d H:i:s',time()))
->group('job_code');
->group('job_code')
->order(new \Zend_Db_Expr("scheduled_at ASC"));
$result = $connection->fetchAll($select);
return $result;
}
Expand Down
158 changes: 138 additions & 20 deletions Model/Schedule.php
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,22 @@ public function initialize() {

$this->getConfig();
$this->getRuntimeParameters();
$this->cleanupProcesses();
$this->lastJobTime = $this->resource->getLastJobTime();
if ($this->lastJobTime < time() - 360) {
$this->lastJobTime = time();
}
$pid = getmypid();
$this->setPid('cron.pid',$pid);
$this->pendingjobs = $this->resource->getAllPendingJobs();
$this->loadavgtest = true;
if (!is_readable('/proc/cpuinfo')) {
$this->loadavgtest = false;
$this->printWarn('Unable to test loadaverage disabling loadaverage checking');
if (!$this->cronenabled) {
$this->printWarn('Cron is disabled');
} else {
$this->cleanupProcesses();
$this->lastJobTime = $this->resource->getLastJobTime();
if ($this->lastJobTime < time() - 360) {
$this->lastJobTime = time();
}
$pid = getmypid();
$this->setPid('cron.pid',$pid);
$this->pendingjobs = $this->resource->getAllPendingJobs();
$this->loadavgtest = true;
if (!is_readable('/proc/cpuinfo')) {
$this->loadavgtest = false;
$this->printWarn('Unable to test loadaverage disabling loadaverage checking');
}
}
}

Expand All @@ -149,7 +153,6 @@ public function checkPid($pidfile) {
* @return void
*/
public function setPid($file,$scheduleid) {
#print 'file='.$file;
file_put_contents(self::VAR_FOLDER_PATH.'/cron/'.$file,$scheduleid);
}

Expand Down Expand Up @@ -202,10 +205,18 @@ public function checkProcess($pid) {
*
* @return string
*/
public function getJobOutput($scheduleid) {
public function getJobOutput($scheduleid,$tail=False) {
$file = self::VAR_FOLDER_PATH.self::CRON_FOLDER_PATH.".{$scheduleid}";
if (file_exists($file)){
return trim(file_get_contents($file));
if ($tail) {
if (file_exists($file)){
$handler = fopen($file,"r");
fseek($handler, -2000, SEEK_END);
return fread($handler,2000);
}
} else {
if (file_exists($file)){
return trim(file_get_contents($file));
}
}
return NULL;
}
Expand All @@ -216,6 +227,7 @@ public function getJobOutput($scheduleid) {
* @return void
*/
public function cleanupProcesses() {
$this->printInfo('Running Process Cleanup');
$running = array();
$pids = $this->getRunningPids();
foreach ($pids as $pid=>$scheduleid) {
Expand All @@ -227,8 +239,23 @@ public function cleanupProcesses() {
}
}
$this->runningPids = $running;
if ($this->governor) {
$this->consumersCleanup();
}
}

public function consumersCleanup() {
$this->printInfo('Running Consumers Cleanup');
while ($pgrep = exec('pgrep -x strace')) {
$pids = explode("\n",$pgrep);
foreach ($pids as $pid) {
$childpid = $this->getChildProcess($pid);
$this->consumersTerminate($pid);
}
}
}


/**
* Set runtime parameters
*
Expand All @@ -240,6 +267,7 @@ public function getRuntimeParameters() {
$this->maxload = $this->resource->getConfigValue('magemojo/cron/maxload',0,'default');
$this->history = $this->resource->getConfigValue('magemojo/cron/history',0,'default');
$this->cronenabled = $this->resource->getConfigValue('magemojo/cron/enabled',0,'default');
$this->governor = $this->resource->getConfigValue('magemojo/cron/consumersgovernor',0,'default');
}

/**
Expand Down Expand Up @@ -437,6 +465,15 @@ function setJobStatus($scheduleid,$status,$output) {
$this->resource->setJobStatus($scheduleid,$status,$output);
}

/**
* Set a job by schedule id
*
* @return array
*/
function getJob($scheduleid) {
return $this->pendingjobs[$scheduleid];
}

/**
* Service loop for running crons
*
Expand Down Expand Up @@ -475,8 +512,24 @@ public function service() {
$running = $this->getRunningPids();
$jobcount = 0;
foreach ($running as $pid=>$scheduleid) {

if ($this->governor) {
$job = $this->getJob($scheduleid);
$jobconfig = $this->getJobConfig($job["job_code"]);
if (isset($jobconfig["consumers"]) && $jobconfig["consumers"]) {
#run the consumers governor
$this->consumersGovenor($pid,$scheduleid);
}
}

if (!$this->checkProcess($pid)) {
$output = $this->getJobOutput($scheduleid);
#IF this is a consumers job it was run under strace and we do not want this output
if (isset($jobconfig["consumers"]) && $jobconfig["consumers"]) {
$output = '';
} else {
$output = $this->getJobOutput($scheduleid);
}

#If output had "error" in the text, assume it errored
if (strpos(strtolower($output),'error') > 0) {
$this->setJobStatus($scheduleid,'error',$output);
Expand All @@ -502,7 +555,7 @@ public function service() {
$exportersTimeout = 0;
}
while (count($pending) && $this->canRunJobs($jobcount, $pending)) {
$job = array_pop($pending);
$job = array_shift($pending);
$runcheck = $this->resource->getJobByStatus($job["job_code"],'running');
if (count($runcheck) == 0) {
$jobconfig = $this->getJobConfig($job["job_code"]);
Expand All @@ -525,6 +578,9 @@ public function service() {
$runtime = "timeout -s 9 ".$consumersTimeout." ".$runtime;
}
$cmd = $runtime;
if ($this->governor) {
$cmd = 'strace '.$cmd;
}
} else {
$runtime = $this->prepareStub($jobconfig,$stub,$job["schedule_id"]);
if ($runtime) {
Expand All @@ -537,7 +593,7 @@ public function service() {
$exec = sprintf("%s; %s > %s 2>&1 & echo $!",
'cd ' . escapeshellarg($this->basedir),
$cmd,
escapeshellarg($this->basedir . "/var/cron/schedule." . $job["schedule_id"])
escapeshellarg($this->basedir . "/var/cron/schedule." . $job["schedule_id"])
);
$pid = exec($exec);

Expand Down Expand Up @@ -567,6 +623,11 @@ public function service() {
}
}

/**
* Execute a cron from CLI
*
* @return void
*/
public function executeImmediate($jobname) {
#Force UTC
date_default_timezone_set('UTC');
Expand Down Expand Up @@ -602,6 +663,63 @@ public function executeImmediate($jobname) {
}


/**
* Check for consumers processes in infinate loop states and terminate them
*
* @return void
*/
public function consumersGovenor($pid,$scheduleid) {
$tail = $this->getJobOutput($scheduleid,True);
#Check for repeating strings indicating an infinately looping processes
$checks = array(
$this->consumersCheck($tail,'SELECT `queue_message`.`top"',1),
$this->consumersCheck($tail,'rt_sigsuspend([]',0)
);
if (in_array(True,$checks)) {
$this->consumersTerminate($pid);
}
}

/**
* Check for consumers processes in infinate loop states and terminate them
*
* @return bool
*/
public function consumersCheck($log,$loopstring,$instances) {
if (substr_count($log,$loopstring) > $instances) {
return True;
}
return False;
}

/**
* Terminate a consumers process
*
* @return void
*/
public function consumersTerminate($pid) {
$childpid = $this->getChildProcess($pid);
if ($this->checkProcess($pid)) {
exec("kill -9 $childpid");
}
}

/**
* Gets the child process of a running cron
*
* @return int
*/
public function getChildProcess($pid) {
$childpid = exec('pgrep -P '.$pid);
if ($childpid) {
#Recursive call to get the final php process
$childpid = $this->getChildProcess($childpid);
return $childpid;
} else {
return $pid;
}
}

/**
* Check for processes that have gone insane and handle the errors
*
Expand Down Expand Up @@ -708,7 +826,7 @@ private function printError($msg = '') {
$time = date('Y-m-d H:i:s', time());
print "[$time] ERR $msg" . PHP_EOL;
}

/**
* Checks if a consumers job can be run
*
Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
# Cron
#### This module for Magento 2 overrides base magento cron functionality, fixes known bugs, and provides a cron service model to control cron process execution.

![Version 1.3.6](https://img.shields.io/badge/Version-1.3.6-green.svg)
![Version 1.3.7](https://img.shields.io/badge/Version-1.3.7-green.svg)

NOTICE: Version 1.3x is only supported for Magento 2.3 and above. Older Magento 2 version use module version 1.2

The default cron can overlap and fill the cron_schedule table, which can cause exponentially more jobs to run on each cron interval, until finally the crons run continously and never complete. The high number of cron jobs can also crash servers hosting Magento 2.
The default cron can overlap and fill the cron_schedule table, which can cause exponentially more jobs to run on each cron interval, until finally the crons run continuously and never complete. The high number of cron jobs can also crash servers hosting Magento 2.

This module replaces the cron management with a service that accepts jobs. As jobs are scheduled, they are picked up by this service for execution. If a job is already running and another is picked up with the same job code, the new one is marked as missed. Duplicate jobs are prevented from running, reducing server overhead.

Expand All @@ -19,7 +19,9 @@ In addition to the service model many other enhancements have been made. For ex

In version 1.1 Cron Reporting was added to the admin to show job code statistics and list cron run errors.

In version 1.3 fixes are implemented for the consumers_runner cron job. This job code is a throwback from magento 1 and is more frequently used in Magento 2.3. It runs under its own scheduler which can execute many child jobs and bomb the system. In this version of the module this parent job is intercepted and written as individual jobs in the cron_schedule table and then run in a sane manner from there. These consumer jobs can also go into infinate loops, so a timeout is imposed on them by default of 30 seconds. This setting can be adjusted in the admin.
In version 1.3 fixes are implemented for the consumers_runner cron job. This job code is a throwback from magento 1 and is more frequently used in Magento 2.3. It runs under its own scheduler which can execute many child jobs and bomb the system. In this version of the module this parent job is intercepted and written as individual jobs in the cron_schedule table and then run in a sane manner from there. These consumer jobs can also go into infinite loops, so a timeout is imposed on them by default of 30 seconds. This setting can be adjusted in the admin.

In version 1.3.7 the consumers governor was added to terminate idle consumers jobs. Bugs in these jobs otherwise prevent these jobs from completing.

## Contributing
See [CONTRIBUTING.md](CONTRIBUTING.md).
Expand All @@ -32,7 +34,7 @@ See [CONTRIBUTING.md](CONTRIBUTING.md).

* Prevents cron history records from exploding.

* Stops cron processes from overruning each other.
* Stops cron processes from overrunning each other.

* Stops the cron from running while system is under configurable load conditions.

Expand Down
13 changes: 10 additions & 3 deletions Setup/UpgradeData.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ class UpgradeData implements UpgradeDataInterface

public function upgrade(ModuleDataSetupInterface $setup, ModuleContextInterface $context)
{
$connection = $setup->getConnection();
if (version_compare($context->getVersion(), '1.3.0', '<')) {
$connection = $setup->getConnection();


$select = $connection->select()->from($setup->getTable('core_config_data'))->where('path like ?', 'magemojo/cron/consumers_timeout');
$result = $connection->fetchAll($select);

Expand All @@ -26,6 +24,7 @@ public function upgrade(ModuleDataSetupInterface $setup, ModuleContextInterface
}
}
if (version_compare($context->getVersion(), '1.3.6', '<')) {

$select = $connection->select()->from($setup->getTable('core_config_data'))->where('path like ?', 'magemojo/cron/exporters_timeout');
$result = $connection->fetchAll($select);

Expand All @@ -35,7 +34,15 @@ public function upgrade(ModuleDataSetupInterface $setup, ModuleContextInterface
array_push($insertData,array('scope' => 'default', 'scope_id' => 0, 'path' => 'magemojo/cron/exporters_timeout', 'value' => '3600'));
$connection->insertMultiple($setup->getTable('core_config_data'), $insertData);
}
$select = $connection->select()->from($setup->getTable('core_config_data'))->where('path like ?', 'magemojo/cron/consumersgovernor');
$result = $connection->fetchAll($select);

#Create core_config_data settings
if (count($result) == 0) {
$insertData = array();
array_push($insertData,array('scope' => 'default', 'scope_id' => 0, 'path' => 'magemojo/cron/consumersgovernor', 'value' => '1'));
$connection->insertMultiple($setup->getTable('core_config_data'), $insertData);
}
}
}
}
2 changes: 1 addition & 1 deletion etc/module.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
<module name="MageMojo_Cron" setup_version="1.3.6">
<module name="MageMojo_Cron" setup_version="1.3.7">
<sequence>
<module name="Magento_Cron"/>
</sequence>
Expand Down
7 changes: 7 additions & 0 deletions view/adminhtml/templates/settings.phtml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@
<tr>
<td colspan="2" style="padding-bottom: 10px;">The number of seconds to allow a consumers job to run. These jobs can infinitely run under some conditions.</td>
</tr>
<tr id="row_general_country_default">
<td class="label" width="250"><span data-config-scope="[STORE VIEW]"><b>Consumers Govenor:</b></span></label></td>
<td class="value" align="left"><?php $block->checkbox('magemojo/cron/consumersgovernor','consumersgovernor'); ?></td>
</tr>
<tr>
<td colspan="2" style="padding-bottom: 10px;">Many bugs in consumers processes cause them to run infinately. The consumers governor will detect these states and terminate the processes.</td>
</tr>
</table>
</div>
</div>
Expand Down

0 comments on commit d3b4733

Please sign in to comment.