Skip to content

Commit

Permalink
Merge pull request #10 from clue-labs/concurrent
Browse files Browse the repository at this point in the history
Add ConnectionManagerConcurrent
  • Loading branch information
clue committed May 30, 2016
2 parents e8779d4 + b62b440 commit 87ff48a
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 0 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ any of the given `ConnectionManager`s in consecutive order until the first one s
The `ConnectionManagerRandom($connectors)` works much like `ConnectionManagerConsecutive` but instead
of using a fixed order, it always uses a randomly shuffled order.

### Concurrent

The `ConnectionManagerConcurrent($connectors)` establishes connections by trying to connect through
ALL of the given `ConnectionManager`s at once, until the first one succeeds.

### Selective

The `ConnectionManagerSelective()` manages several `Connector`s and forwards connection through either of
Expand Down
40 changes: 40 additions & 0 deletions src/Multiple/ConnectionManagerConcurrent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

namespace ConnectionManager\Extra\Multiple;

use ConnectionManager\Extra\Multiple\ConnectionManagerConsecutive;
use React\Promise;
use React\Promise\CancellablePromiseInterface;

class ConnectionManagerConcurrent extends ConnectionManagerConsecutive
{
public function create($host, $port)
{
if (!$this->managers) {
return Promise\reject(new \UnderflowException('No managers to try to connect through'));
}

$all = array();
foreach ($this->managers as $connector) {
/* @var $connection Connector */
$all []= $connector->create($host, $port);
}
return Promise\any($all)->then(function ($conn) use ($all) {
// a connection attempt succeeded
// => cancel all pending connection attempts
foreach ($all as $promise) {
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}

// if promise resolves despite cancellation, immediately close stream
$promise->then(function ($stream) use ($conn) {
if ($stream !== $conn) {
$stream->close();
}
});
}
return $conn;
});
}
}
70 changes: 70 additions & 0 deletions tests/Multiple/ConnectionManagerConcurrentTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php

use ConnectionManager\Extra\Multiple\ConnectionManagerConcurrent;
use React\Promise;

class ConnectionManagerConcurrentTest extends TestCase
{
public function testEmptyRejects()
{
$connector = new ConnectionManagerConcurrent();

$promise = $connector->create('google.com', 80);

$this->assertPromiseReject($promise);
}

public function testWillForwardToInnerConnector()
{
$pending = new Promise\Promise(function() { });

$only = $this->getMock('React\SocketClient\ConnectorInterface');
$only->expects($this->once())->method('create')->with('google.com', 80)->willReturn($pending);

$connector = new ConnectionManagerConcurrent();
$connector->addConnectionManager($only);

$promise = $connector->create('google.com', 80);

$promise->then($this->expectCallableNever(), $this->expectCallableNever());
}

public function testWillCancelOtherIfOneResolves()
{
$resolved = Promise\resolve($this->getMock('React\Stream\DuplexStreamInterface'));
$first = $this->getMock('React\SocketClient\ConnectorInterface');
$first->expects($this->once())->method('create')->with('google.com', 80)->willReturn($resolved);

$pending = new Promise\Promise(function() { }, $this->expectCallableOnce());
$second = $this->getMock('React\SocketClient\ConnectorInterface');
$second->expects($this->once())->method('create')->with('google.com', 80)->willReturn($pending);

$connector = new ConnectionManagerConcurrent();
$connector->addConnectionManager($first);
$connector->addConnectionManager($second);

$promise = $connector->create('google.com', 80);

$this->assertPromiseResolve($promise);
}

public function testWillCloseOtherIfOneResolves()
{
$resolved = Promise\resolve($this->getMock('React\Stream\DuplexStreamInterface'));
$first = $this->getMock('React\SocketClient\ConnectorInterface');
$first->expects($this->once())->method('create')->with('google.com', 80)->willReturn($resolved);

$slower = $this->getMock('React\Stream\DuplexStreamInterface');
$slower->expects($this->once())->method('close');
$second = $this->getMock('React\SocketClient\ConnectorInterface');
$second->expects($this->once())->method('create')->with('google.com', 80)->willReturn(Promise\resolve($slower));

$connector = new ConnectionManagerConcurrent();
$connector->addConnectionManager($first);
$connector->addConnectionManager($second);

$promise = $connector->create('google.com', 80);

$this->assertPromiseResolve($promise);
}
}

0 comments on commit 87ff48a

Please sign in to comment.