diff --git a/.github/workflows/codesniffer.yml b/.github/workflows/codesniffer.yml new file mode 100644 index 0000000..86094bf --- /dev/null +++ b/.github/workflows/codesniffer.yml @@ -0,0 +1,25 @@ +name: PHP code sniffer + +on: + push: + branches: + - master + pull_request: null + +jobs: + codesniffer: + runs-on: ubuntu-latest + name: PHP code sniffer + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Composer update + run: composer update --no-progress --no-interaction + + - name: Install PHP code sniffer as dev dependency + run: composer require squizlabs/php_codesniffer --dev + + - name: Code sniffer + run: vendor/bin/phpcs src --standard=PSR2 -n diff --git a/.github/workflows/composer.yml b/.github/workflows/composer.yml new file mode 100644 index 0000000..a694320 --- /dev/null +++ b/.github/workflows/composer.yml @@ -0,0 +1,32 @@ +name: Composer outdated + +on: + push: + branches: + - master + pull_request: null + +jobs: + composer: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + php: [ '7.4', '8.0', '8.1' ] + + name: Composer outdated - PHP ${{ matrix.php }} + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + + - name: Composer update + run: composer update --no-progress --no-interaction + + - name: Composer outdated + run: composer outdated -D --strict diff --git a/.github/workflows/extensions_finder.yml b/.github/workflows/extensions_finder.yml new file mode 100644 index 0000000..774ae77 --- /dev/null +++ b/.github/workflows/extensions_finder.yml @@ -0,0 +1,25 @@ +name: PHP extensions finder + +on: + push: + branches: + - master + pull_request: null + +jobs: + extensions_finder: + runs-on: ubuntu-latest + name: PHP extensions finder + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Composer update + run: composer update --no-progress --no-interaction + + - name: Install PHP extensions finder as dev dependency + run: composer require efabrica/php-extensions-finder --dev + + - name: PHP extensions finder + run: vendor/bin/php-extensions-finder check src diff --git a/.github/workflows/phpstan.yml b/.github/workflows/phpstan.yml new file mode 100644 index 0000000..9785b84 --- /dev/null +++ b/.github/workflows/phpstan.yml @@ -0,0 +1,36 @@ +name: PHP static analysis + +on: + push: + branches: + - master + pull_request: null + +jobs: + phpstan: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + php: [ '7.4', '8.0', '8.1' ] + + name: PHPStan - PHP ${{ matrix.php }} + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: none # disable xdebug, pcov + + - name: Composer update + run: composer update --no-progress --no-interaction + + - name: Install PHPStan as dev dependency + run: composer require phpstan/phpstan --dev + + - name: PHPStan analyse + run: vendor/bin/phpstan analyze src --level=max --no-progress diff --git a/.github/workflows/phpstan_lowest.yml b/.github/workflows/phpstan_lowest.yml new file mode 100644 index 0000000..471d25d --- /dev/null +++ b/.github/workflows/phpstan_lowest.yml @@ -0,0 +1,36 @@ +name: PHP static analysis with lowest dependencies + +on: + push: + branches: + - master + pull_request: null + +jobs: + phpstan: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + php: [ '7.4', '8.0', '8.1' ] + + name: PHPStan with lowest dependencies - PHP ${{ matrix.php }} + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: none # disable xdebug, pcov + + - name: Composer update + run: composer update --prefer-lowest --no-progress --no-interaction + + - name: Install PHPStan as dev dependency + run: composer require phpstan/phpstan --dev + + - name: PHPStan analyse + run: vendor/bin/phpstan analyze src --level=max --no-progress diff --git a/.github/workflows/syntax_checker.yml b/.github/workflows/syntax_checker.yml new file mode 100644 index 0000000..47d7fbe --- /dev/null +++ b/.github/workflows/syntax_checker.yml @@ -0,0 +1,30 @@ +name: PHP syntax checker + +on: + push: + branches: + - master + pull_request: null + +jobs: + syntax_checker: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + php: [ '7.4', '8.0', '8.1' ] + + name: PHP syntax checker - PHP ${{ matrix.php }} + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: none # disable xdebug, pcov + + - name: Check syntax + run: find src -name "*.php" -print0 | xargs -0 -n1 -P8 php -l diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0e36ec8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +/vendor +/composer.lock +/.idea diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..d667cba --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,11 @@ +# Change Log + +## [Unreleased][unreleased] + +### Added +- HermesWorker (symfony command) +- RedisProxySetDriver (driver implementation using RedisProxy) +- RedisProxySortedSetDriver +- DummyDriver (for testing purposes) + +[unreleased]: https://github.com/efabrica-team/hermes-extension/compare/8b055557b0c87b5c52961cf2bfa13340e50915ad...HEAD diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..3425249 --- /dev/null +++ b/composer.json @@ -0,0 +1,17 @@ +{ + "name": "efabrica/hermes-extension", + "description": "Extension for tomaj/hermes", + "keywords": ["worker", "driver", "redis-proxy"], + "license": "MIT", + "require": { + "php": ">= 7.4 < 8.2", + "tomaj/hermes": "^4.0", + "lulco/redis-proxy": "dev-master as 1.0.0", + "symfony/console": "^5.4 | ^6.0" + }, + "autoload": { + "psr-4": { + "Efabrica\\HermesExtension\\": "src/" + } + } +} diff --git a/src/Command/HermesWorker.php b/src/Command/HermesWorker.php new file mode 100644 index 0000000..e9705c2 --- /dev/null +++ b/src/Command/HermesWorker.php @@ -0,0 +1,33 @@ +dispatcher = $dispatcher; + } + + protected function configure(): void + { + $this->setDescription('Handle hermes messages'); + } + + protected function execute(InputInterface $input, OutputInterface $output): int + { + $this->dispatcher->handle(); + $output->writeln('Hermes worker end.'); + return self::SUCCESS; + } +} diff --git a/src/Driver/DummyDriver.php b/src/Driver/DummyDriver.php new file mode 100644 index 0000000..6cccbd1 --- /dev/null +++ b/src/Driver/DummyDriver.php @@ -0,0 +1,28 @@ + */ + private array $queues = []; + + private RedisProxy $redis; + + private int $refreshInterval; + + public function __construct(RedisProxy $redis, string $key, int $refreshInterval = 1) + { + $this->setupPriorityQueue($key, Dispatcher::DEFAULT_PRIORITY); + + $this->redis = $redis; + $this->refreshInterval = $refreshInterval; + $this->serializer = new MessageSerializer(); + } + + /** + * @throws RedisProxyException + * @throws SerializeException + * @throws UnknownPriorityException + */ + public function send(MessageInterface $message, int $priority = Dispatcher::DEFAULT_PRIORITY): bool + { + $key = $this->getKey($priority); + return (bool)$this->redis->sadd($key, $this->serializer->serialize($message)); + } + + public function setupPriorityQueue(string $name, int $priority): void + { + $this->queues[$priority] = $name; + } + + /** + * @throws RedisProxyException + * @throws SerializeException + * @throws ShutdownException + * @throws UnknownPriorityException + */ + public function wait(Closure $callback, array $priorities = []): void + { + $queues = $this->queues; + krsort($queues); + while (true) { + $this->checkShutdown(); + if (!$this->shouldProcessNext()) { + break; + } + + $messageString = null; + $foundPriority = null; + + foreach ($queues as $priority => $name) { + if (count($priorities) > 0 && !in_array($priority, $priorities, true)) { + continue; + } + + $messageString = $this->pop($this->getKey($priority)); + $foundPriority = $priority; + + if ($messageString !== null) { + break; + } + } + + if ($messageString !== null) { + $message = $this->serializer->unserialize($messageString); + $callback($message, $foundPriority); + $this->incrementProcessedItems(); + } elseif ($this->refreshInterval) { + $this->checkShutdown(); + sleep($this->refreshInterval); + } + } + } + + /** + * @throws UnknownPriorityException + */ + private function getKey(int $priority): string + { + if (!isset($this->queues[$priority])) { + throw new UnknownPriorityException("Unknown priority {$priority}"); + } + return $this->queues[$priority]; + } + + /** + * @throws RedisProxyException + */ + private function pop(string $key): ?string + { + $messageString = $this->redis->spop($key); + if (is_string($messageString) && $messageString !== '') { + return $messageString; + } + + return null; + } +} diff --git a/src/Driver/RedisProxySortedSetDriver.php b/src/Driver/RedisProxySortedSetDriver.php new file mode 100644 index 0000000..a3ca630 --- /dev/null +++ b/src/Driver/RedisProxySortedSetDriver.php @@ -0,0 +1,146 @@ + */ + private array $queues = []; + + private RedisProxy $redis; + + private int $refreshInterval; + + private ?string $scheduleKey; + + public function __construct(RedisProxy $redis, string $key, ?string $scheduleKey = null, int $refreshInterval = 1) + { + $this->setupPriorityQueue($key, Dispatcher::DEFAULT_PRIORITY); + + $this->redis = $redis; + $this->refreshInterval = $refreshInterval; + $this->scheduleKey = $scheduleKey; + $this->serializer = new MessageSerializer(); + } + + /** + * @throws SerializeException + * @throws UnknownPriorityException + * @throws RedisProxyException + */ + public function send(MessageInterface $message, int $priority = Dispatcher::DEFAULT_PRIORITY): bool + { + if ($message->getExecuteAt() !== null && $message->getExecuteAt() > microtime(true)) { + if (!$this->scheduleKey) { + throw new InvalidArgumentException('Schedule key is not configured'); + } + $this->redis->zadd($this->scheduleKey, $message->getExecuteAt(), $this->serializer->serialize($message)); + } else { + $key = $this->getKey($priority); + $this->redis->zadd($key, $message->getExecuteAt(), $this->serializer->serialize($message)); + } + return true; + } + + public function setupPriorityQueue(string $name, int $priority): void + { + $this->queues[$priority] = $name; + } + + /** + * @throws UnknownPriorityException + */ + private function getKey(int $priority): string + { + if (!isset($this->queues[$priority])) { + throw new UnknownPriorityException("Unknown priority {$priority}"); + } + return $this->queues[$priority]; + } + + /** + * @throws ShutdownException + * @throws UnknownPriorityException + * @throws SerializeException + * @throws RedisProxyException + */ + public function wait(Closure $callback, array $priorities = []): void + { + $queues = $this->queues; + krsort($queues); + while (true) { + $this->checkShutdown(); + if (!$this->shouldProcessNext()) { + break; + } + + // check schedule + if ($this->scheduleKey) { + $messageStrings = $this->redis->zrangebyscore($this->scheduleKey, '-inf', (string)microtime(true), ['limit' => [0, 1]]); + foreach ($messageStrings as $messageString) { + $this->redis->zrem($this->scheduleKey, $messageString); + $this->send($this->serializer->unserialize($messageString)); + } + } + + $messageString = null; + $foundPriority = null; + + foreach ($queues as $priority => $name) { + if (count($priorities) > 0 && !in_array($priority, $priorities)) { + continue; + } + if ($messageString !== null) { + break; + } + + $messageString = $this->pop($this->getKey($priority)); + $foundPriority = $priority; + } + + if ($messageString !== null) { + $message = $this->serializer->unserialize($messageString); + $callback($message, $foundPriority); + $this->incrementProcessedItems(); + } else { + if ($this->refreshInterval) { + $this->checkShutdown(); + sleep($this->refreshInterval); + } + } + } + } + + private function pop(string $key): ?string + { + $messageArray = $this->redis->zpopmin($key); + foreach ($messageArray as $messageString => $score) { + if (is_string($messageString) && $messageString !== '') { + return $messageString; + } + return null; + } + return null; + } +}