Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add configurability for number of gRPC channels #158

Merged
merged 4 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 51 additions & 29 deletions src/Cache/CacheClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
use Momento\Cache\CacheOperationTypes\CreateCacheResponse;
use Momento\Cache\CacheOperationTypes\DeleteCacheResponse;
use Momento\Cache\CacheOperationTypes\ListCachesResponse;
use Momento\Cache\Errors\InvalidArgumentError;
use Momento\Cache\Internal\IdleDataClientWrapper;
use Momento\Cache\Internal\ScsControlClient;
use Momento\Cache\Internal\ScsDataClient;
Expand All @@ -54,7 +55,12 @@ class CacheClient implements LoggerAwareInterface
protected ILoggerFactory $loggerFactory;
protected LoggerInterface $logger;
private ScsControlClient $controlClient;
private IdleDataClientWrapper $dataClientWrapper;

/**
* @var IdleDataClientWrapper[]
*/
private array $dataClients;
private int $nextDataClientIndex = 0;

/**
* @param IConfiguration $configuration Configuration to use for transport.
Expand All @@ -77,7 +83,16 @@ public function __construct(
$defaultTtlSeconds
);
};
$this->dataClientWrapper = new IdleDataClientWrapper($dataClientFactory, $this->configuration);
$this->dataClients = [];

$numGrpcChannels = $configuration->getTransportStrategy()->getGrpcConfig()->getNumGrpcChannels();
$forceNewChannels = $configuration->getTransportStrategy()->getGrpcConfig()->getForceNewChannel();
if (($numGrpcChannels > 0) && (! $forceNewChannels)) {
pgautier404 marked this conversation as resolved.
Show resolved Hide resolved
throw new InvalidArgumentError("When setting NumGrpcChannels > 1, you must also set ForceNewChannel to true, or else the gRPC library will re-use the same channel.");
}
for ($i = 0; $i < $numGrpcChannels; $i++) {
array_push($this->dataClients, new IdleDataClientWrapper($dataClientFactory, $this->configuration));
}
}

/**
Expand Down Expand Up @@ -171,7 +186,7 @@ public function deleteCache(string $cacheName): DeleteCacheResponse
*/
public function setAsync(string $cacheName, string $key, string $value, int $ttlSeconds = 0): ResponseFuture
{
return $this->dataClientWrapper->getClient()->set($cacheName, $key, $value, $ttlSeconds);
return $this->getNextDataClient()->set($cacheName, $key, $value, $ttlSeconds);
}

/**
Expand Down Expand Up @@ -223,7 +238,7 @@ public function set(string $cacheName, string $key, string $value, int $ttlSecon
*/
public function getAsync(string $cacheName, string $key): ResponseFuture
{
return $this->dataClientWrapper->getClient()->get($cacheName, $key);
return $this->getNextDataClient()->get($cacheName, $key);
}

/**
Expand Down Expand Up @@ -282,7 +297,7 @@ public function get(string $cacheName, string $key): GetResponse
*/
public function setIfNotExistsAsync(string $cacheName, string $key, string $value, int $ttlSeconds = 0): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setIfNotExists($cacheName, $key, $value, $ttlSeconds);
return $this->getNextDataClient()->setIfNotExists($cacheName, $key, $value, $ttlSeconds);
}

/**
Expand Down Expand Up @@ -336,7 +351,7 @@ public function setIfNotExists(string $cacheName, string $key, string $value, in
*/
public function deleteAsync(string $cacheName, string $key): ResponseFuture
{
return $this->dataClientWrapper->getClient()->delete($cacheName, $key);
return $this->getNextDataClient()->delete($cacheName, $key);
}

/**
Expand Down Expand Up @@ -387,7 +402,7 @@ public function delete(string $cacheName, string $key): DeleteResponse
*/
public function keysExistAsync(string $cacheName, array $keys): ResponseFuture
{
return $this->dataClientWrapper->getClient()->keysExist($cacheName, $keys);
return $this->getNextDataClient()->keysExist($cacheName, $keys);
}

/**
Expand Down Expand Up @@ -440,7 +455,7 @@ public function keysExist(string $cacheName, array $keys): KeysExistResponse
*/
public function keyExistsAsync(string $cacheName, string $key): ResponseFuture
{
return $this->dataClientWrapper->getClient()->keyExists($cacheName, $key);
return $this->getNextDataClient()->keyExists($cacheName, $key);
}

/**
Expand Down Expand Up @@ -495,7 +510,7 @@ public function incrementAsync(
string $cacheName, string $key, int $amount = 1, ?int $ttlSeconds = null
): ResponseFuture
{
return $this->dataClientWrapper->getClient()->increment($cacheName, $key, $amount, $ttlSeconds);
return $this->getNextDataClient()->increment($cacheName, $key, $amount, $ttlSeconds);
}

/**
Expand Down Expand Up @@ -543,7 +558,7 @@ public function increment(
*/
public function listFetch(string $cacheName, string $listName): ListFetchResponse
{
return $this->dataClientWrapper->getClient()->listFetch($cacheName, $listName);
return $this->getNextDataClient()->listFetch($cacheName, $listName);
}

/**
Expand All @@ -569,7 +584,7 @@ public function listPushFront(
string $cacheName, string $listName, string $value, ?int $truncateBackToSize = null, ?CollectionTtl $ttl = null
): ListPushFrontResponse
{
return $this->dataClientWrapper->getClient()->listPushFront($cacheName, $listName, $value, $truncateBackToSize, $ttl);
return $this->getNextDataClient()->listPushFront($cacheName, $listName, $value, $truncateBackToSize, $ttl);
}

/**
Expand All @@ -595,7 +610,7 @@ public function listPushBack(
string $cacheName, string $listName, string $value, ?int $truncateFrontToSize = null, ?CollectionTtl $ttl = null
): ListPushBackResponse
{
return $this->dataClientWrapper->getClient()->listPushBack($cacheName, $listName, $value, $truncateFrontToSize, $ttl);
return $this->getNextDataClient()->listPushBack($cacheName, $listName, $value, $truncateFrontToSize, $ttl);
}

/**
Expand All @@ -617,7 +632,7 @@ public function listPushBack(
*/
public function listPopFront(string $cacheName, string $listName): ListPopFrontResponse
{
return $this->dataClientWrapper->getClient()->listPopFront($cacheName, $listName);
return $this->getNextDataClient()->listPopFront($cacheName, $listName);
}

/**
Expand All @@ -639,7 +654,7 @@ public function listPopFront(string $cacheName, string $listName): ListPopFrontR
*/
public function listPopBack(string $cacheName, string $listName): ListPopBackResponse
{
return $this->dataClientWrapper->getClient()->listPopBack($cacheName, $listName);
return $this->getNextDataClient()->listPopBack($cacheName, $listName);
}

/**
Expand All @@ -659,7 +674,7 @@ public function listPopBack(string $cacheName, string $listName): ListPopBackRes
*/
public function listRemoveValue(string $cacheName, string $listName, string $value): ListRemoveValueResponse
{
return $this->dataClientWrapper->getClient()->listRemoveValue($cacheName, $listName, $value);
return $this->getNextDataClient()->listRemoveValue($cacheName, $listName, $value);
}

/**
Expand All @@ -680,7 +695,7 @@ public function listRemoveValue(string $cacheName, string $listName, string $val
*/
public function listLength(string $cacheName, string $listName): ListLengthResponse
{
return $this->dataClientWrapper->getClient()->listLength($cacheName, $listName);
return $this->getNextDataClient()->listLength($cacheName, $listName);
}

/**
Expand All @@ -702,7 +717,7 @@ public function listLength(string $cacheName, string $listName): ListLengthRespo
*/
public function dictionarySetField(string $cacheName, string $dictionaryName, string $field, string $value, ?CollectionTtl $ttl = null): DictionarySetFieldResponse
{
return $this->dataClientWrapper->getClient()->dictionarySetField($cacheName, $dictionaryName, $field, $value, $ttl);
return $this->getNextDataClient()->dictionarySetField($cacheName, $dictionaryName, $field, $value, $ttl);
}

/**
Expand All @@ -725,7 +740,7 @@ public function dictionarySetField(string $cacheName, string $dictionaryName, st
*/
public function dictionaryGetField(string $cacheName, string $dictionaryName, string $field): DictionaryGetFieldResponse
{
return $this->dataClientWrapper->getClient()->dictionaryGetField($cacheName, $dictionaryName, $field);
return $this->getNextDataClient()->dictionaryGetField($cacheName, $dictionaryName, $field);
}

/**
Expand All @@ -747,7 +762,7 @@ public function dictionaryGetField(string $cacheName, string $dictionaryName, st
*/
public function dictionaryFetch(string $cacheName, string $dictionaryName): DictionaryFetchResponse
{
return $this->dataClientWrapper->getClient()->dictionaryFetch($cacheName, $dictionaryName);
return $this->getNextDataClient()->dictionaryFetch($cacheName, $dictionaryName);
}

/**
Expand All @@ -768,7 +783,7 @@ public function dictionaryFetch(string $cacheName, string $dictionaryName): Dict
*/
public function dictionarySetFields(string $cacheName, string $dictionaryName, array $elements, ?CollectionTtl $ttl = null): DictionarySetFieldsResponse
{
return $this->dataClientWrapper->getClient()->dictionarySetFields($cacheName, $dictionaryName, $elements, $ttl);
return $this->getNextDataClient()->dictionarySetFields($cacheName, $dictionaryName, $elements, $ttl);
}

/**
Expand Down Expand Up @@ -799,7 +814,7 @@ public function dictionarySetFields(string $cacheName, string $dictionaryName, a
*/
public function dictionaryGetFields(string $cacheName, string $dictionaryName, array $fields): DictionaryGetFieldsResponse
{
return $this->dataClientWrapper->getClient()->dictionaryGetFields($cacheName, $dictionaryName, $fields);
return $this->getNextDataClient()->dictionaryGetFields($cacheName, $dictionaryName, $fields);
}

/**
Expand Down Expand Up @@ -828,7 +843,7 @@ public function dictionaryIncrement(
string $cacheName, string $dictionaryName, string $field, int $amount = 1, ?CollectionTtl $ttl = null
): DictionaryIncrementResponse
{
return $this->dataClientWrapper->getClient()->dictionaryIncrement($cacheName, $dictionaryName, $field, $amount, $ttl);
return $this->getNextDataClient()->dictionaryIncrement($cacheName, $dictionaryName, $field, $amount, $ttl);
}

/**
Expand All @@ -847,7 +862,7 @@ public function dictionaryIncrement(
*/
public function dictionaryRemoveField(string $cacheName, string $dictionaryName, string $field): DictionaryRemoveFieldResponse
{
return $this->dataClientWrapper->getClient()->dictionaryRemoveField($cacheName, $dictionaryName, $field);
return $this->getNextDataClient()->dictionaryRemoveField($cacheName, $dictionaryName, $field);
}

/**
Expand All @@ -866,7 +881,7 @@ public function dictionaryRemoveField(string $cacheName, string $dictionaryName,
*/
public function dictionaryRemoveFields(string $cacheName, string $dictionaryName, array $fields): DictionaryRemoveFieldsResponse
{
return $this->dataClientWrapper->getClient()->dictionaryRemoveFields($cacheName, $dictionaryName, $fields);
return $this->getNextDataClient()->dictionaryRemoveFields($cacheName, $dictionaryName, $fields);
}

/**
Expand Down Expand Up @@ -895,7 +910,7 @@ public function dictionaryRemoveFields(string $cacheName, string $dictionaryName
*/
public function setAddElementAsync(string $cacheName, string $setName, string $element, ?CollectionTtl $ttl = null): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setAddElement($cacheName, $setName, $element, $ttl);
return $this->getNextDataClient()->setAddElement($cacheName, $setName, $element, $ttl);
}

/**
Expand Down Expand Up @@ -944,7 +959,7 @@ public function setAddElement(string $cacheName, string $setName, string $elemen
*/
public function setAddElementsAsync(string $cacheName, string $setName, array $elements, ?CollectionTtl $ttl = null): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setAddElements($cacheName, $setName, $elements, $ttl);
return $this->getNextDataClient()->setAddElements($cacheName, $setName, $elements, $ttl);
}

/**
Expand Down Expand Up @@ -994,7 +1009,7 @@ public function setAddElements(string $cacheName, string $setName, array $elemen
*/
public function setFetchAsync(string $cacheName, string $setName): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setFetch($cacheName, $setName);
return $this->getNextDataClient()->setFetch($cacheName, $setName);
}

/**
Expand Down Expand Up @@ -1044,7 +1059,7 @@ public function setFetch(string $cacheName, string $setName): SetFetchResponse
*/
public function setLengthAsync(string $cacheName, string $setName): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setLength($cacheName, $setName);
return $this->getNextDataClient()->setLength($cacheName, $setName);
}

/**
Expand Down Expand Up @@ -1093,7 +1108,7 @@ public function setLength(string $cacheName, string $setName): SetLengthResponse
*/
public function setRemoveElementAsync(string $cacheName, string $setName, string $element): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setRemoveElement($cacheName, $setName, $element);
return $this->getNextDataClient()->setRemoveElement($cacheName, $setName, $element);
}

/**
Expand All @@ -1114,4 +1129,11 @@ public function setRemoveElement(string $cacheName, string $setName, string $ele
{
return $this->setRemoveElementAsync($cacheName, $setName, $element)->wait();
}

private function getNextDataClient(): ScsDataClient
{
$client = $this->dataClients[$this->nextDataClientIndex]->getClient();
$this->nextDataClientIndex = ($this->nextDataClientIndex + 1) % count($this->dataClients);
return $client;
}
}
4 changes: 2 additions & 2 deletions src/Config/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public function getLoggerFactory(): ILoggerFactory {
}

/**
* @return ITransportStrategy|null The currently active transport strategy
* @return ITransportStrategy The currently active transport strategy
*/
public function getTransportStrategy(): ITransportStrategy|null
public function getTransportStrategy(): ITransportStrategy
{
return $this->transportStrategy;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Config/Configurations/Laptop.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static function v1(?ILoggerFactory $loggerFactory = null): Laptop
{
$loggerFactory = $loggerFactory ?? new NullLoggerFactory();
$grpcConfig = new StaticGrpcConfiguration(5000);
$transportStrategy = new StaticTransportStrategy(null, $grpcConfig, $loggerFactory, self::$maxIdleMillis);
$transportStrategy = new StaticTransportStrategy($grpcConfig, $loggerFactory, self::$maxIdleMillis);
return new Laptop($loggerFactory, $transportStrategy);
}
}
6 changes: 6 additions & 0 deletions src/Config/IConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ interface IConfiguration
*/
public function getLoggerFactory(): ILoggerFactory;


/**
* @return ITransportStrategy The currently active transport strategy
*/
public function getTransportStrategy(): ITransportStrategy;

/**
* Creates a new instance of the Configuration object, updated to use the specified transport strategy.
*
Expand Down
4 changes: 4 additions & 0 deletions src/Config/Transport/IGrpcConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ public function withDeadlineMilliseconds(int $deadlineMilliseconds): IGrpcConfig
public function getForceNewChannel(): ?bool;

public function withForceNewChannel(bool $forceNewChannel) : IGrpcConfiguration;

public function getNumGrpcChannels(): int;

public function withNumGrpcChannels(int $numGrpcChannels): IGrpcConfiguration;
}
2 changes: 0 additions & 2 deletions src/Config/Transport/ITransportStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

interface ITransportStrategy
{
public function getMaxConcurrentRequests(): ?int;

public function getGrpcConfig(): ?IGrpcConfiguration;

public function getMaxIdleMillis(): ?int;
Expand Down
20 changes: 16 additions & 4 deletions src/Config/Transport/StaticGrpcConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ class StaticGrpcConfiguration implements IGrpcConfiguration
{

private ?int $deadlineMilliseconds;
private ?bool $forceNewChannel;
private bool $forceNewChannel;
private int $numGrpcChannels;

public function __construct(?int $deadlineMilliseconds = null, ?bool $forceNewChannel = false)
public function __construct(?int $deadlineMilliseconds = null, bool $forceNewChannel = false, int $numGrpcChannels = 1)
{
$this->deadlineMilliseconds = $deadlineMilliseconds;
$this->forceNewChannel = $forceNewChannel;
$this->numGrpcChannels = $numGrpcChannels;
}

public function getDeadlineMilliseconds(): int|null
Expand All @@ -22,7 +24,7 @@ public function getDeadlineMilliseconds(): int|null

public function withDeadlineMilliseconds(int $deadlineMilliseconds): StaticGrpcConfiguration
{
return new StaticGrpcConfiguration($deadlineMilliseconds, $this->forceNewChannel);
return new StaticGrpcConfiguration($deadlineMilliseconds, $this->forceNewChannel, $this->numGrpcChannels);
}

public function getForceNewChannel(): bool|null
Expand All @@ -32,6 +34,16 @@ public function getForceNewChannel(): bool|null

public function withForceNewChannel(bool $forceNewChannel): StaticGrpcConfiguration
{
return new StaticGrpcConfiguration($this->deadlineMilliseconds, $forceNewChannel);
return new StaticGrpcConfiguration($this->deadlineMilliseconds, $forceNewChannel, $this->numGrpcChannels);
}

public function getNumGrpcChannels(): int
{
return $this->numGrpcChannels;
}

public function withNumGrpcChannels(int $numGrpcChannels): IGrpcConfiguration
{
return new StaticGrpcConfiguration($this->deadlineMilliseconds, $this->forceNewChannel, $numGrpcChannels);
}
}
Loading