Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple connection pool #27

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"require": {
"php": ">=5.4.0",
"react/socket": "0.8.*",
"react/dns": "0.4.*"
"react/dns": "0.4.*",
"react/promise": "^2.5"
},
"require-dev": {
"phpunit/dbunit": "1.4.*"
Expand Down
28 changes: 28 additions & 0 deletions examples/pool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php
require __DIR__ . '/init.php';

// create the main loop
$loop = React\EventLoop\Factory::create();

// create pool with 10 connections
$pool = \React\MySQL\Pool\PoolFactory::createPool($loop, array(
'dbname' => 'test',
'user' => 'test',
'passwd' => 'test',
), 10);

// make any query to pool
$pool
->query('select * from book')
->then(function (\React\MySQL\Pool\PoolQueryResult $result) {
$results = $result->getCmd()->resultRows; // get the results
$fields = $result->getCmd()->resultFields; // get table fields
})
->otherwise(function (\Exception $exception) {
// handle exception.
})
->always(function () use ($loop) {
$loop->stop();
});

$loop->run();
48 changes: 48 additions & 0 deletions src/Pool/AbstractPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

namespace React\MySQL\Pool;

use React\MySQL\Commands\QueryCommand;
use React\MySQL\ConnectionInterface;
use React\Promise\Deferred;

/**
* Class AbstractPool
*
* Base class for pools.
*
* @package React\MySQL\Pool
*/
abstract class AbstractPool implements PoolInterface
{

/**
* {@inheritDoc}
*/
public function query($sql, $params = null)
{
$params = func_get_args();
array_shift($params); // Remove $sql parameter.

return $this->getConnection()
->then(function (ConnectionInterface $connection) use ($sql, $params) {
$deferred = new Deferred();

$callback = function (QueryCommand $command) use ($deferred) {
if ($command->getError() !== null) {
$deferred->reject($command);
} else {
$deferred->resolve(new PoolQueryResult($this, $command));
}
};

//
// Inject callback into `query` method arguments.
//
$params = array_merge([ $sql, $callback ], $params);
call_user_func_array([ $connection, 'query' ], $params);

return $deferred->promise();
});
}
}
112 changes: 112 additions & 0 deletions src/Pool/Pool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
<?php

namespace React\MySQL\Pool;

use React\MySQL\ConnectionInterface;
use React\Promise\Deferred;

/**
* Class Pool
*
* @package React\MySQL\Pool
*/
class Pool extends AbstractPool
{

/**
* @var \SplObjectStorage|ConnectionInterface[]
*/
private $connections;

/**
* Index of current connection in pool.
*
* @var integer
*/
private $index = 0;

/**
* Pool constructor.
*
* @param ConnectionInterface[] $connections Array of MySQL connection.
*/
public function __construct(array $connections)
{
if (count($connections) === 0) {
throw new \InvalidArgumentException('Should be at least one connection in pool');
}

$this->connections = new \SplObjectStorage();

foreach ($connections as $connection) {
if (! $connection instanceof \React\MySQL\ConnectionInterface) {
throw new \InvalidArgumentException(sprintf(
'Each passed connection should implements \'%s\' but one of connections is \'%s\'',
'React\MySQL\ConnectionInterface',
is_object($connection) ? get_class($connection) : gettype($connection)
));
}
$this->connections->attach($connection);
}

$this->connections->rewind();
}

/**
* {@inheritDoc}
*/
public function getConnection()
{
$connection = $this->roundRobin();

//
// Connect if current connection is not initialized or already closed by
// some reason.
//
if (($connection->getState() === ConnectionInterface::STATE_INIT)
|| ($connection->getState() === ConnectionInterface::STATE_CLOSED)) {
$deferred = new Deferred();

$connection->connect(function (\Exception $exception = null, ConnectionInterface $connection) use ($deferred) {
if ($exception !== null) {
$deferred->reject($exception);
} else {
$deferred->resolve($connection);
}
});

return $deferred->promise();
}

return \React\Promise\resolve($connection);
}

/**
* Get connection from pool by RR algorithm.
*
* @return ConnectionInterface
*
* @link https://en.wikipedia.org/wiki/Round-robin_scheduling
*/
private function roundRobin()
{
if ($this->index === count($this->connections)) {
$this->connections->rewind();
$this->index = 0;
}

$connection = $this->connections->current();
$this->index++;
$this->connections->next();

return $connection;
}

/**
* {@inheritDoc}
*/
public function count()
{
return count($this->connections);
}
}
58 changes: 58 additions & 0 deletions src/Pool/PoolFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?php

namespace React\MySQL\Pool;

use React\EventLoop\LoopInterface;
use React\MySQL\Connection;

/**
* Class PoolFactory
*
* @package React\MySQL\Pool
*/
class PoolFactory
{

/**
* Create new pool.
*
* @param LoopInterface $loop A LoopInterface instance.
* @param array $connectOptions MySQL connection options.
* @param integer $count Number of created connections.
*
* @return PoolInterface
*
* @throws \InvalidArgumentException Got invalid options.
*/
public static function createPool(
LoopInterface $loop,
array $connectOptions,
$count
) {
if (! is_numeric($count)) {
throw new \InvalidArgumentException(sprintf(
'$count should be \'integer\' but \'%s\' given',
is_object($count) ? get_class($count) : gettype($count)
));
}
$count = (int) $count;

if ($count <= 0) {
throw new \InvalidArgumentException(sprintf(
'$count should be greater then 0 but %d given',
$count
));
}

//
// Create specified number of connection but not connect 'cause we do it
// then somebody request connection from pool or make query to pool.
//
$connections = [];
for ($i = 0; $i < $count; $i++) {
$connections[] = new Connection($loop, $connectOptions);
}

return new Pool($connections);
}
}
53 changes: 53 additions & 0 deletions src/Pool/PoolInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<?php

namespace React\MySQL\Pool;

use React\Promise\Promise;

/**
* Interface PoolInterface
*
* MySQL connection pool.
*
* @package React\MySQL\Pool
*/
interface PoolInterface extends \Countable
{

/**
* Do a async query.
*
* The query is performed on one particular connection from the pool.
*
* ```php
* $pool
* ->query('SELECT * FROM `table`')
* ->then(function (PoolQueryResult $result) { ... })
* ->otherwise(function (\Exception $exception) { ... })
* ```
*
* @param string $sql MySQL sql statement.
* @param mixed $params,... Parameters which should bind to query.
*
* @return Promise
*
* @see \React\MySQL\Pool\PoolQueryResult
*/
public function query($sql, $params = null);

/**
* Get next connection to MySQL server.
*
* Each concrete pool may use different algorithm for selecting connections.
*
* ```php
* $pool
* ->getConnection()
* ->then(function (ConnectionInterface $connection) { ... })
* ->otherwise(function (\Exception $exception) { ... })
* ```
*
* @return Promise
*/
public function getConnection();
}
60 changes: 60 additions & 0 deletions src/Pool/PoolQueryResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

namespace React\MySQL\Pool;

use React\MySQL\Commands\QueryCommand;

/**
* Class PoolQueryResult
*
* Represent result passed in resolved promise from pool query method.
*
* @package React\MySQL\Pool
*
* @see \React\MySQL\Pool\PoolInterface::query()
*/
class PoolQueryResult
{

/**
* @var PoolInterface
*/
private $pool;

/**
* @var QueryCommand
*/
private $cmd;

/**
* PoolQueryResult constructor.
*
* @param PoolInterface $pool A pool from which we got this result.
* @param QueryCommand $cmd A query command as is.
*/
public function __construct(PoolInterface $pool, QueryCommand $cmd)
{
$this->pool = $pool;
$this->cmd = $cmd;
}

/**
* Get pool from which we got this result.
*
* @return PoolInterface
*/
public function getPool()
{
return $this->pool;
}

/**
* Get executed QueryCommand instance.
*
* @return QueryCommand
*/
public function getCmd()
{
return $this->cmd;
}
}
Loading