Skip to content

Commit

Permalink
Merge pull request #23 from flownative/boost-mode
Browse files Browse the repository at this point in the history
FEATURE: Introduce boost mode
  • Loading branch information
kdambekalns authored Jul 7, 2022
2 parents 4c5ccce + a2a56de commit 2eacf16
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 40 deletions.
76 changes: 53 additions & 23 deletions Classes/Queue/DoctrineQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
* source code.
*/

use Doctrine\ORM\EntityManagerInterface;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Exception\InvalidArgumentException;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\ORM\EntityManagerInterface;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueInterface;

Expand Down Expand Up @@ -50,11 +50,32 @@ class DoctrineQueue implements QueueInterface
protected $defaultTimeout = 60;

/**
* Interval messages are looked up in waitAnd*(), in seconds
* Interval messages are looked up in waitAnd*(), in microseconds
*
* @var int
*/
protected $pollInterval = 1;
protected $pollInterval = 1000000;

/**
* Interval messages are looked up in waitAnd*(), if any messages were processed within the last $boostTime microseconds; in microseconds
*
* @var int
*/
protected $boostPollInterval = 500000;

/**
* Number of microseconds of the "boost time": If any messages were processed within that time, the special $boostPollInterval is used instead of the default $pollInterval; in microseconds
*
* @var int
*/
protected $boostTime = 10000000;

/**
* Time when the last message was processed
*
* @var int|null
*/
protected $lastMessageTime;

/**
* Name of the table to store queue messages. Defaults to "<name>_messages"
Expand All @@ -71,10 +92,16 @@ public function __construct(string $name, array $options)
{
$this->name = $name;
if (isset($options['defaultTimeout'])) {
$this->defaultTimeout = (integer)$options['defaultTimeout'];
$this->defaultTimeout = (int)$options['defaultTimeout'];
}
if (isset($options['pollInterval'])) {
$this->pollInterval = (integer)$options['pollInterval'];
$this->pollInterval = (int)($options['pollInterval'] * 1000000);
}
if (isset($options['boostPollInterval'])) {
$this->boostPollInterval = (int)($options['boostPollInterval'] * 1000000);
}
if (isset($options['boostTime'])) {
$this->boostTime = (int)($options['boostTime'] * 1000000);
}
if (isset($options['tableName'])) {
$this->tableName = $options['tableName'];
Expand Down Expand Up @@ -107,10 +134,10 @@ public function setUp(): void
switch ($this->connection->getDatabasePlatform()->getName()) {
case 'sqlite':
$createDatabaseStatement = "CREATE TABLE IF NOT EXISTS {$this->connection->quoteIdentifier($this->tableName)} (id INTEGER PRIMARY KEY AUTOINCREMENT, payload LONGTEXT NOT NULL, state VARCHAR(255) NOT NULL, failures INTEGER NOT NULL DEFAULT 0, scheduled TEXT DEFAULT NULL)";
break;
break;
case 'postgresql':
$createDatabaseStatement = "CREATE TABLE IF NOT EXISTS {$this->connection->quoteIdentifier($this->tableName)} (id SERIAL PRIMARY KEY, payload TEXT NOT NULL, state VARCHAR(255) NOT NULL, failures INTEGER NOT NULL DEFAULT 0, scheduled TIMESTAMP(0) WITHOUT TIME ZONE DEFAULT NULL)";
break;
break;
default:
$createDatabaseStatement = "CREATE TABLE IF NOT EXISTS {$this->connection->quoteIdentifier($this->tableName)} (id INTEGER PRIMARY KEY AUTO_INCREMENT, payload LONGTEXT NOT NULL, state VARCHAR(255) NOT NULL, failures INTEGER NOT NULL DEFAULT 0, scheduled DATETIME DEFAULT NULL) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci ENGINE = InnoDB";
}
Expand Down Expand Up @@ -160,7 +187,7 @@ public function waitAndTake(?int $timeout = null): ?Message
return null;
}

$numberOfDeletedRows = $this->connection->delete($this->connection->quoteIdentifier($this->tableName), ['id' => (integer)$message->getIdentifier()]);
$numberOfDeletedRows = $this->connection->delete($this->connection->quoteIdentifier($this->tableName), ['id' => (int)$message->getIdentifier()]);
if ($numberOfDeletedRows !== 1) {
// TODO error handling
return null;
Expand All @@ -179,8 +206,8 @@ public function waitAndReserve(?int $timeout = null): ?Message
}

/**
* @param int $timeout
* @return Message
* @param int|null $timeout
* @return Message|null
* @throws DBALException
*/
protected function reserveMessage(?int $timeout = null): ?Message
Expand All @@ -198,15 +225,18 @@ protected function reserveMessage(?int $timeout = null): ?Message
throw new \RuntimeException(sprintf('The queue table "%s" could not be found. Did you run ./flow queue:setup "%s"?', $this->tableName, $this->name), 1469117906, $exception);
}
if ($row !== false) {
$numberOfUpdatedRows = (int)$this->connection->executeStatement("UPDATE {$this->connection->quoteIdentifier($this->tableName)} SET state = 'reserved' WHERE id = :id AND state = 'ready' AND {$this->getScheduledQueryConstraint()}", ['id' => (integer)$row['id']]);
$numberOfUpdatedRows = (int)$this->connection->executeStatement("UPDATE {$this->connection->quoteIdentifier($this->tableName)} SET state = 'reserved' WHERE id = :id AND state = 'ready' AND {$this->getScheduledQueryConstraint()}", ['id' => (int)$row['id']]);
if ($numberOfUpdatedRows === 1) {
$this->lastMessageTime = time();
return $this->getMessageFromRow($row);
}
}
if (time() - $startTime >= $timeout) {
return null;
}
sleep($this->pollInterval);

$currentPollInterval = ((int)$this->lastMessageTime + (int)($this->boostTime / 1000000) > time()) ? $this->boostPollInterval : $this->pollInterval;
usleep($currentPollInterval);
} while (true);
}

Expand All @@ -216,15 +246,15 @@ protected function reserveMessage(?int $timeout = null): ?Message
*/
public function release(string $messageId, array $options = []): void
{
$this->connection->executeStatement("UPDATE {$this->connection->quoteIdentifier($this->tableName)} SET state = 'ready', failures = failures + 1, scheduled = {$this->resolveScheduledQueryPart($options)} WHERE id = :id", ['id' => (integer)$messageId]);
$this->connection->executeStatement("UPDATE {$this->connection->quoteIdentifier($this->tableName)} SET state = 'ready', failures = failures + 1, scheduled = {$this->resolveScheduledQueryPart($options)} WHERE id = :id", ['id' => (int)$messageId]);
}

/**
* @inheritdoc
*/
public function abort(string $messageId): void
{
$this->connection->update($this->connection->quoteIdentifier($this->tableName), ['state' => 'failed'], ['id' => (integer)$messageId]);
$this->connection->update($this->connection->quoteIdentifier($this->tableName), ['state' => 'failed'], ['id' => (int)$messageId]);
}

/**
Expand All @@ -233,7 +263,7 @@ public function abort(string $messageId): void
*/
public function finish(string $messageId): bool
{
return $this->connection->delete($this->connection->quoteIdentifier($this->tableName), ['id' => (integer)$messageId]) === 1;
return $this->connection->delete($this->connection->quoteIdentifier($this->tableName), ['id' => (int)$messageId]) === 1;
}

/**
Expand All @@ -256,23 +286,23 @@ public function peek(int $limit = 1): array
*/
public function countReady(): int
{
return (integer)$this->connection->fetchOne("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'ready'");
return (int)$this->connection->fetchOne("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'ready'");
}

/**
* @inheritdoc
*/
public function countReserved(): int
{
return (integer)$this->connection->fetchOne("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'reserved'");
return (int)$this->connection->fetchOne("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'reserved'");
}

/**
* @inheritdoc
*/
public function countFailed(): int
{
return (integer)$this->connection->fetchColumn("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'failed'");
return (int)$this->connection->fetchColumn("SELECT COUNT(*) FROM {$this->connection->quoteIdentifier($this->tableName)} WHERE state = 'failed'");
}

/**
Expand All @@ -291,7 +321,7 @@ public function flush(): void
*/
protected function getMessageFromRow(array $row): Message
{
return new Message($row['id'], json_decode($row['payload'], true), (integer)$row['failures']);
return new Message($row['id'], json_decode($row['payload'], true), (int)$row['failures']);
}

/**
Expand All @@ -305,11 +335,11 @@ protected function resolveScheduledQueryPart(array $options): string
}
switch ($this->connection->getDatabasePlatform()->getName()) {
case 'sqlite':
return 'datetime(\'now\', \'+' . (integer)$options['delay'] . ' second\')';
return 'datetime(\'now\', \'+' . (int)$options['delay'] . ' second\')';
case 'postgresql':
return 'NOW() + INTERVAL \'' . (integer)$options['delay'] . ' SECOND\'';
return 'NOW() + INTERVAL \'' . (int)$options['delay'] . ' SECOND\'';
default:
return 'DATE_ADD(NOW(), INTERVAL ' . (integer)$options['delay'] . ' SECOND)';
return 'DATE_ADD(NOW(), INTERVAL ' . (int)$options['delay'] . ' SECOND)';
}
}

Expand All @@ -325,7 +355,7 @@ protected function getScheduledQueryConstraint(): string
return '(scheduled IS NULL OR scheduled <= NOW())';
}
}

/**
* Reconnects the database connection associated with this queue, if it doesn't respond to a ping
*
Expand Down
2 changes: 2 additions & 0 deletions Configuration/Settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#
# defaultTimeout: 60
# pollInterval: 1
# boostPollInterval: 0.5
# boostTime: 10
#
# If no "tableName" is specified, a table name prefixed with "flowpack_jobqueue_messages_" will be used:
# tableName: 'custom_table_name'
Expand Down
55 changes: 38 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# Flowpack.JobQueue.Doctrine

A job queue backend for the [Flowpack.JobQueue.Common](https://github.com/Flowpack/jobqueue-common) package based on [Doctrine](http://www.doctrine-project.org/).
A job queue backend for the [Flowpack.JobQueue.Common](https://github.
com/Flowpack/jobqueue-common) package based on [Doctrine](http://www.doctrine-project.org/).

## Usage

Install the package using composer:
Install the package using Composer:

```
composer require flowpack/jobqueue-doctrine
```

If not already installed, that will fetch its requirements, namely the `jobqueue-common` package.
If not already installed, that will fetch its requirements, namely the
`jobqueue-common` package.

Now the queue can be configured like this:

Expand All @@ -35,34 +37,53 @@ The required tables can be created executing:
./flow queue:setup some-queue
```

## Boost Mode

The poll interval should be short enough to process messages in time, and long
enough to minimize resource consumption for the database. Boost mode is a
solution which automatically handles spikes by processing messages in quick
succession. When no new messages appear for a specified time, boost mode is
disabled again.

The frequency by which the queue loop will look for new messages is the
configured `pollInterval`. In boost mode, the option `boostPollInterval` is
used instead. `boostTime` defines the time since the last processed message
after which boost mode is deactivated again.

## Specific options

The `DoctrineQueue` supports following options:

| Option | Type | Default | Description |
| ----------------------- |---------| ---------------------------------------:| ---------------------------------------- |
| defaultTimeout | integer | 60 | Number of seconds new messages are waited for before a timeout occurs (This is overridden by a "timeout" argument in the `waitAndTake()` and `waitAndReserve()` methods |
| pollInterval | integer | 1 | Number of seconds between SQL lookups for new messages |
| tableName | string | flowpack_jobqueue_messages_<queue-name> | Name of the database table for this queue. By default this is the queue name prefixed with "flowpack_jobqueue_messages_" |
| backendOptions | array | - | Doctrine-specific connection params (see [Doctrine reference](http://doctrine-orm.readthedocs.io/projects/doctrine-dbal/en/latest/reference/configuration.html))|
| Option | Type | Default | Description |
|-------------------|---------|----------------------------------------:|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| defaultTimeout | integer | 60 | Number of seconds new messages are waited for before a timeout occurs, this is overridden by a "timeout" argument in the `waitAndTake()` and `waitAndReserve()` methods |
| pollInterval | float | 1 | Number of seconds between SQL lookups for new messages |
| boostPollInterval | float | 0.5 | Number of seconds between SQL lookups for new messages when in "boost mode" |
| boostTime | float | 10 | Maximum number of seconds since last processed message to activate "boost mode" |
| tableName | string | flowpack_jobqueue_messages_<queue-name> | Name of the database table for this queue. By default this is the queue name prefixed with "flowpack_jobqueue_messages_" |
| backendOptions | array | - | Doctrine-specific connection params (see [Doctrine reference](http://doctrine-orm.readthedocs.io/projects/doctrine-dbal/en/latest/reference/configuration.html)) |

*NOTE:* The `DoctrineQueue` currently supports `MySQL`, `PostgreSQL` and `SQLite` backends. You can specify the backend via the `backendOptions`. If you omit this setting, the *current connection* will be re-used (i.e. the currently active Flow database).
*NOTE:* The `DoctrineQueue` currently supports `MySQL`, `PostgreSQL` and
`SQLite` backends. You can specify the backend via the `backendOptions`. If
you omit this setting, the *current connection* will be re-used (i.e. the
currently active Flow database).

### Submit options

Additional options supported by `JobManager::queue()`, `DoctrineQueue::submit()` and the `Job\Defer` annotation:
Additional options supported by `JobManager::queue()`, `DoctrineQueue::submit
()` and the `Job\Defer` annotation:

| Option | Type | Default | Description |
| ----------------------- |---------| ----------------:| ---------------------------------------- |
| delay | integer | 0 | Number of seconds before a message is marked "ready" after submission. This can be useful to prevent premature execution of jobs (i.e. before entities are persisted) |
| Option | Type | Default | Description |
|--------|---------|--------:|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| delay | integer | 0 | Number of seconds before a message is marked "ready" after submission. This can be useful to prevent premature execution of jobs (i.e. before entities are persisted) |

### Release options

Additional options to be specified via `releaseOptions`:

| Option | Type | Default | Description |
| ----------------------- |---------| ----------------:| ---------------------------------------- |
| delay | integer | 0 | Number of seconds before a message is marked "ready" after it has been released. |
| Option | Type | Default | Description |
|--------|---------|--------:|----------------------------------------------------------------------------------|
| delay | integer | 0 | Number of seconds before a message is marked "ready" after it has been released. |

## License

Expand Down

0 comments on commit 2eacf16

Please sign in to comment.