Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis retries #20

Merged
merged 20 commits into from
Sep 6, 2023
2 changes: 1 addition & 1 deletion .github/workflows/composer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ jobs:
run: composer update --no-progress --no-interaction

- name: Composer outdated
run: composer outdated -D --strict --ignore symfony/console --ignore symfony/finder --ignore symfony/yaml
run: composer outdated -D --strict --ignore symfony/console --ignore symfony/finder --ignore symfony/yaml --ignore phpunit/phpunit
2 changes: 0 additions & 2 deletions src/ConnectionFactory/ConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

namespace RedisProxy\ConnectionFactory;

use RedisProxy\Driver\Driver;

interface ConnectionFactory
{
/**
Expand Down
2 changes: 1 addition & 1 deletion src/ConnectionFactory/PredisConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function create(string $host, int $port, float $timeout = 0.0)
$redis = new Client([
'host' => $host,
'port' => $port,
'timeout' => $timeout
'timeout' => $timeout,
]);
$redis->connect();
return $redis;
Expand Down
11 changes: 8 additions & 3 deletions src/ConnectionPool/ConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@

namespace RedisProxy\ConnectionPool;

use RedisProxy\Driver\Driver;

interface ConnectionPool
{
public function getConnection(string $command);

public function handleFailed(): bool;
/**
* @param int $attempt First attempt is 1
*/
public function handleFailed(int $attempt): bool;

public function setRetryWait(int $retryWait): self;

public function setMaxFails(int $maxFails): self;
}
8 changes: 4 additions & 4 deletions src/ConnectionPool/SentinelConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SentinelConnectionPool implements ConnectionPool

private int $maxFails = 3;

private float $retryWait = 1000;
private int $retryWait = 1000;

private bool $writeToReplicas = true;

Expand Down Expand Up @@ -81,7 +81,7 @@ public function getConnection(string $command)
return $this->getMasterConnection();
}

public function handleFailed(): bool
public function handleFailed(int $attempt): bool
{
$this->failedCount++;
$result = $this->loadMasterReplicasDataFromSentinel();
Expand Down Expand Up @@ -212,11 +212,11 @@ private function normalizeResponze(array $arr): array
}, ARRAY_FILTER_USE_KEY));

if (count($keys) != count($values)) {
throw new RedisProxyException("Wrong number of arguments");
throw new RedisProxyException('Wrong number of arguments');
}
return array_combine($keys, $values);
}

private function getReadOnlyOperations(): array
{
return [
Expand Down
31 changes: 27 additions & 4 deletions src/ConnectionPool/SingleNodeConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

namespace RedisProxy\ConnectionPool;

use Redis;
use Predis\Client;
use Redis;
use RedisProxy\Driver\Driver;
use RedisProxy\RedisProxyException;

Expand All @@ -21,17 +21,23 @@ class SingleNodeConnectionPool implements ConnectionPool

private bool $autoSelectDb;

/** @var Redis|Client */
/** @var Redis|Client|null */
private $connection = null;

public function __construct(Driver $driver, string $host, int $port, int $database = 0, float $timeout = 0.0, bool $autoSelectDb = true)
private int $retryWait;

private int $maxFails;

public function __construct(Driver $driver, string $host, int $port, int $database = 0, float $timeout = 0.0, bool $autoSelectDb = true, ?int $retryWait = null, ?int $maxFails = null)
{
$this->driver = $driver;
$this->host = $host;
$this->port = $port;
$this->database = $database;
$this->timeout = $timeout;
$this->autoSelectDb = $autoSelectDb;
$this->retryWait = $retryWait ?? 1000;
$this->maxFails = $maxFails ?? 1;
}

/**
Expand All @@ -51,8 +57,25 @@ public function getConnection(string $command)
return $this->connection;
}

public function handleFailed(): bool
public function handleFailed(int $attempt): bool
{
$this->connection = null; // retry connection on fail
if ($attempt < $this->maxFails) {
usleep($this->retryWait * 1000);
return true;
}
return false;
}

public function setRetryWait(int $retryWait): self
{
$this->retryWait = $retryWait;
return $this;
}

public function setMaxFails(int $maxFails): self
{
$this->maxFails = $maxFails;
return $this;
}
}
10 changes: 8 additions & 2 deletions src/ConnectionPoolFactory/SingleNodeConnectionPoolFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ class SingleNodeConnectionPoolFactory implements ConnectionPoolFactory

private bool $autoSelectDb;

public function __construct(string $host, int $port, int $database = 0, float $timeout = 0.0, bool $autoSelectDb = true)
private ?int $retryWait;

private ?int $maxFails;

public function __construct(string $host, int $port, int $database = 0, float $timeout = 0.0, bool $autoSelectDb = true, ?int $retryWait = null, ?int $maxFails = null)
{
$this->host = $host;
$this->port = $port;
$this->database = $database;
$this->timeout = $timeout;
$this->autoSelectDb = $autoSelectDb;
$this->retryWait = $retryWait;
$this->maxFails = $maxFails;
}

public function create(Driver $driver): SingleNodeConnectionPool
{
return new SingleNodeConnectionPool($driver, $this->host, $this->port, $this->database, $this->timeout, $this->autoSelectDb);
return new SingleNodeConnectionPool($driver, $this->host, $this->port, $this->database, $this->timeout, $this->autoSelectDb, $this->retryWait, $this->maxFails);
}
}
2 changes: 1 addition & 1 deletion src/Driver/Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function getDriverFactory(): DriverFactory;
public function connectionRole($connection): string;

/**
* @throws RedisProxyException('Invalid DB index');
* @throws RedisProxyException ('Invalid DB index');
*/
public function connectionSelect($connection, int $database): bool;
}
30 changes: 15 additions & 15 deletions src/Driver/PredisDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

namespace RedisProxy\Driver;

use Predis\CommunicationException;
use Predis\Connection\ConnectionException;
use Predis\Response\Status;
use RedisException;
use RedisProxy\ConnectionFactory\PredisConnectionFactory;
use RedisProxy\ConnectionPool\ConnectionPool;
use RedisProxy\ConnectionPoolFactory\ConnectionPoolFactory;
Expand Down Expand Up @@ -56,20 +54,22 @@ public function getDriverFactory(): PredisDriverFactory
*/
public function call(string $command, array $params = [])
{
try {
if (method_exists($this, $command)) {
return call_user_func_array([$this, $command], $params);
}

$result = call_user_func_array([$this->connectionPool->getConnection($command), $command], $params);
return $this->transformResult($result);
} catch (RedisProxyException $e) {
throw $e;
} catch (Throwable $t) {
if ($t instanceof ConnectionException && $this->connectionPool->handleFailed()) {
return $this->call($command, $params);
$attempt = 0;
while (true) {
try {
if (method_exists($this, $command)) {
return call_user_func_array([$this, $command], $params);
}

$result = call_user_func_array([$this->connectionPool->getConnection($command), $command], $params);
return $this->transformResult($result);
} catch (RedisProxyException $e) {
throw $e;
} catch (Throwable $t) {
if (!$t instanceof ConnectionException || !$this->connectionPool->handleFailed(++$attempt)) {
throw new RedisProxyException("Error for command '$command', use getPrevious() for more info", 1484162284, $t);
}
}
throw new RedisProxyException("Error for command '$command', use getPrevious() for more info", 1484162284, $t);
}
}

Expand Down
26 changes: 14 additions & 12 deletions src/Driver/RedisDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,21 @@ public function getDriverFactory(): RedisDriverFactory
*/
public function call(string $command, array $params = [])
{
try {
if (method_exists($this, $command)) {
return call_user_func_array([$this, $command], $params);
}

return call_user_func_array([$this->connectionPool->getConnection($command), $command], $params);
} catch (RedisProxyException $e) {
throw $e;
} catch (Throwable $t) {
if ($t instanceof RedisException && $this->connectionPool->handleFailed()) {
return $this->call($command, $params);
$attempt = 0;
while (true) {
try {
if (method_exists($this, $command)) {
return call_user_func_array([$this, $command], $params);
}

return call_user_func_array([$this->connectionPool->getConnection($command), $command], $params);
} catch (RedisProxyException $e) {
throw $e;
} catch (Throwable $t) {
if (!$t instanceof RedisException || !$this->connectionPool->handleFailed(++$attempt)) {
throw new RedisProxyException("Error for command '$command', use getPrevious() for more info", 1484162284, $t);
}
}
throw new RedisProxyException("Error for command '$command', use getPrevious() for more info", 1484162284, $t);
}
}

Expand Down
11 changes: 8 additions & 3 deletions src/RedisProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use RedisProxy\Driver\Driver;
use RedisProxy\Driver\PredisDriver;
use RedisProxy\Driver\RedisDriver;
use Throwable;

/**
* @method string|null type(string $key)
Expand Down Expand Up @@ -68,9 +67,15 @@ class RedisProxy
self::DRIVER_PREDIS,
];

public function __construct(string $host = '127.0.0.1', int $port = 6379, int $database = 0, float $timeout = 0.0)
/**
* @param float $timeout seconds (default is 0.0 = unlimited)
* @param int|null $retryWait milliseconds (null defaults to 1 second)
* @param int|null $maxFails 1 = no retries, one attempt (default)
* 2 = one retry, two attempts, ...
*/
public function __construct(string $host = '127.0.0.1', int $port = 6379, int $database = 0, float $timeout = 0.0, ?int $retryWait = null, ?int $maxFails = null)
{
$this->connectionPoolFactory = new SingleNodeConnectionPoolFactory($host, $port, $database, $timeout);
$this->connectionPoolFactory = new SingleNodeConnectionPoolFactory($host, $port, $database, $timeout, true, $retryWait, $maxFails);
$this->driversOrder = $this->supportedDrivers;
}

Expand Down
Loading