Skip to content

Commit

Permalink
Add queue for downloader to limit concurrent requests (#253)
Browse files Browse the repository at this point in the history
* Add queue for downloader to limit concurrent requests

* Make one download queue

* Add tests for getAsyncContents
  • Loading branch information
akondas authored Sep 2, 2020
1 parent 60d25a0 commit 45b5ff6
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 4 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"buddy-works/buddy-works-php-api": "^1.3",
"buddy-works/oauth2-client": "^0.1",
"cbschuld/browser.php": "^1.9",
"clue/mq-react": "^1.2",
"composer/composer": "^1.10.1",
"composer/semver": "^1.5",
"doctrine/doctrine-bundle": "~2.0.10",
Expand Down
71 changes: 69 additions & 2 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions phpstan.neon
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ parameters:
message: "#^Property .+\\:\\:\\$key is unused\\.$#"
count: 1
path: src/Entity/Config.php
-
message: "#^Variable method call on React\\\\Http\\\\Browser\\.$#"
count: 1
path: src/Service/Downloader/ReactDownloader.php

bootstrapFiles:
- vendor/twig/twig/src/Extension/CoreExtension.php # twig global functions
Expand Down
10 changes: 8 additions & 2 deletions src/Service/Downloader/ReactDownloader.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,28 @@

use Buddy\Repman\Kernel;
use Buddy\Repman\Service\Downloader;
use Clue\React\Mq\Queue;
use Munus\Control\Option;
use Psr\Http\Message\ResponseInterface;
use React\EventLoop\Factory;
use React\EventLoop\LoopInterface;
use React\Http\Browser;
use React\Promise\PromiseInterface;
use React\Socket\Connector;

final class ReactDownloader implements Downloader
{
private LoopInterface $loop;
private Browser $browser;
private Queue $queue;

public function __construct()
{
$this->loop = Factory::create();
$this->browser = new Browser($this->loop, new Connector($this->loop, ['timeout' => 10]));
$this->queue = new Queue(100, null, function (string $type, string $url, array $headers = []): PromiseInterface {
return $this->browser->{$type}($url, array_merge($headers, ['User-Agent' => $this->userAgent()]));
});
}

/**
Expand Down Expand Up @@ -49,7 +55,7 @@ public function getContents(string $url, array $headers = [], callable $notFound

public function getAsyncContents(string $url, array $headers, callable $onFulfilled): void
{
$this->browser->get($url, array_merge($headers, ['User-Agent' => $this->userAgent()]))
($this->queue)('get', $url, $headers)
->then(function (ResponseInterface $response) use ($onFulfilled): void {
$stream = $response->getBody()->detach();
if (!is_resource($stream)) {
Expand All @@ -64,7 +70,7 @@ public function getAsyncContents(string $url, array $headers, callable $onFulfil
*/
public function getLastModified(string $url, callable $onFulfilled): void
{
$this->browser->head($url, ['User-Agent' => $this->userAgent()])->then(function (ResponseInterface $response) use ($onFulfilled): void {
($this->queue)('head', $url)->then(function (ResponseInterface $response) use ($onFulfilled): void {
$lastModified = $response->getHeader('Last-Modified');
if ($lastModified !== []) {
$onFulfilled((int) strtotime($lastModified[0]));
Expand Down
3 changes: 3 additions & 0 deletions symfony.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
"cbschuld/browser.php": {
"version": "1.9.4"
},
"clue/mq-react": {
"version": "v1.2.0"
},
"clue/stream-filter": {
"version": "v1.4.1"
},
Expand Down
13 changes: 13 additions & 0 deletions tests/Unit/Service/Downloader/ReactDownloaderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,17 @@ public function testLastModified(): void
});
$downloader->run();
}

public function testAsyncContent(): void
{
$downloader = new ReactDownloader();
$downloader->getAsyncContents('https://repman.io', [], function ($stream): void {
$meta = stream_get_meta_data($stream);
self::assertTrue($meta['uri'] === 'https://repman.io');
});
$downloader->getAsyncContents('/tmp/not-exists', [], function ($stream): void {
throw new \LogicException('Should not happen');
});
$downloader->run();
}
}

0 comments on commit 45b5ff6

Please sign in to comment.