Skip to content

Commit

Permalink
feat: add configurability for number of gRPC channels (#158)
Browse files Browse the repository at this point in the history
* feat: add configurability for number of gRPC channels

For users who may end up with over 100 concurrent requests in flight,
we need to be able to support multiple gRPC channels. This commit adds
a configuration setting to allow users to specify the desired number
of channels.

This most likely won't doing anything useful until we add the config
flag to disable persistent TCP connections in the gRPC channel
constructor, because we will just end up re-using the same channel
multiple times.
  • Loading branch information
cprice404 authored Jun 5, 2023
1 parent 87795cd commit 81e256d
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 54 deletions.
80 changes: 51 additions & 29 deletions src/Cache/CacheClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
use Momento\Cache\CacheOperationTypes\CreateCacheResponse;
use Momento\Cache\CacheOperationTypes\DeleteCacheResponse;
use Momento\Cache\CacheOperationTypes\ListCachesResponse;
use Momento\Cache\Errors\InvalidArgumentError;
use Momento\Cache\Internal\IdleDataClientWrapper;
use Momento\Cache\Internal\ScsControlClient;
use Momento\Cache\Internal\ScsDataClient;
Expand All @@ -54,7 +55,12 @@ class CacheClient implements LoggerAwareInterface
protected ILoggerFactory $loggerFactory;
protected LoggerInterface $logger;
private ScsControlClient $controlClient;
private IdleDataClientWrapper $dataClientWrapper;

/**
* @var IdleDataClientWrapper[]
*/
private array $dataClients;
private int $nextDataClientIndex = 0;

/**
* @param IConfiguration $configuration Configuration to use for transport.
Expand All @@ -77,7 +83,16 @@ public function __construct(
$defaultTtlSeconds
);
};
$this->dataClientWrapper = new IdleDataClientWrapper($dataClientFactory, $this->configuration);
$this->dataClients = [];

$numGrpcChannels = $configuration->getTransportStrategy()->getGrpcConfig()->getNumGrpcChannels();
$forceNewChannels = $configuration->getTransportStrategy()->getGrpcConfig()->getForceNewChannel();
if (($numGrpcChannels > 1) && (! $forceNewChannels)) {
throw new InvalidArgumentError("When setting NumGrpcChannels > 1, you must also set ForceNewChannel to true, or else the gRPC library will re-use the same channel.");
}
for ($i = 0; $i < $numGrpcChannels; $i++) {
array_push($this->dataClients, new IdleDataClientWrapper($dataClientFactory, $this->configuration));
}
}

/**
Expand Down Expand Up @@ -171,7 +186,7 @@ public function deleteCache(string $cacheName): DeleteCacheResponse
*/
public function setAsync(string $cacheName, string $key, string $value, int $ttlSeconds = 0): ResponseFuture
{
return $this->dataClientWrapper->getClient()->set($cacheName, $key, $value, $ttlSeconds);
return $this->getNextDataClient()->set($cacheName, $key, $value, $ttlSeconds);
}

/**
Expand Down Expand Up @@ -223,7 +238,7 @@ public function set(string $cacheName, string $key, string $value, int $ttlSecon
*/
public function getAsync(string $cacheName, string $key): ResponseFuture
{
return $this->dataClientWrapper->getClient()->get($cacheName, $key);
return $this->getNextDataClient()->get($cacheName, $key);
}

/**
Expand Down Expand Up @@ -282,7 +297,7 @@ public function get(string $cacheName, string $key): GetResponse
*/
public function setIfNotExistsAsync(string $cacheName, string $key, string $value, int $ttlSeconds = 0): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setIfNotExists($cacheName, $key, $value, $ttlSeconds);
return $this->getNextDataClient()->setIfNotExists($cacheName, $key, $value, $ttlSeconds);
}

/**
Expand Down Expand Up @@ -336,7 +351,7 @@ public function setIfNotExists(string $cacheName, string $key, string $value, in
*/
public function deleteAsync(string $cacheName, string $key): ResponseFuture
{
return $this->dataClientWrapper->getClient()->delete($cacheName, $key);
return $this->getNextDataClient()->delete($cacheName, $key);
}

/**
Expand Down Expand Up @@ -387,7 +402,7 @@ public function delete(string $cacheName, string $key): DeleteResponse
*/
public function keysExistAsync(string $cacheName, array $keys): ResponseFuture
{
return $this->dataClientWrapper->getClient()->keysExist($cacheName, $keys);
return $this->getNextDataClient()->keysExist($cacheName, $keys);
}

/**
Expand Down Expand Up @@ -440,7 +455,7 @@ public function keysExist(string $cacheName, array $keys): KeysExistResponse
*/
public function keyExistsAsync(string $cacheName, string $key): ResponseFuture
{
return $this->dataClientWrapper->getClient()->keyExists($cacheName, $key);
return $this->getNextDataClient()->keyExists($cacheName, $key);
}

/**
Expand Down Expand Up @@ -495,7 +510,7 @@ public function incrementAsync(
string $cacheName, string $key, int $amount = 1, ?int $ttlSeconds = null
): ResponseFuture
{
return $this->dataClientWrapper->getClient()->increment($cacheName, $key, $amount, $ttlSeconds);
return $this->getNextDataClient()->increment($cacheName, $key, $amount, $ttlSeconds);
}

/**
Expand Down Expand Up @@ -543,7 +558,7 @@ public function increment(
*/
public function listFetch(string $cacheName, string $listName): ListFetchResponse
{
return $this->dataClientWrapper->getClient()->listFetch($cacheName, $listName);
return $this->getNextDataClient()->listFetch($cacheName, $listName);
}

/**
Expand All @@ -569,7 +584,7 @@ public function listPushFront(
string $cacheName, string $listName, string $value, ?int $truncateBackToSize = null, ?CollectionTtl $ttl = null
): ListPushFrontResponse
{
return $this->dataClientWrapper->getClient()->listPushFront($cacheName, $listName, $value, $truncateBackToSize, $ttl);
return $this->getNextDataClient()->listPushFront($cacheName, $listName, $value, $truncateBackToSize, $ttl);
}

/**
Expand All @@ -595,7 +610,7 @@ public function listPushBack(
string $cacheName, string $listName, string $value, ?int $truncateFrontToSize = null, ?CollectionTtl $ttl = null
): ListPushBackResponse
{
return $this->dataClientWrapper->getClient()->listPushBack($cacheName, $listName, $value, $truncateFrontToSize, $ttl);
return $this->getNextDataClient()->listPushBack($cacheName, $listName, $value, $truncateFrontToSize, $ttl);
}

/**
Expand All @@ -617,7 +632,7 @@ public function listPushBack(
*/
public function listPopFront(string $cacheName, string $listName): ListPopFrontResponse
{
return $this->dataClientWrapper->getClient()->listPopFront($cacheName, $listName);
return $this->getNextDataClient()->listPopFront($cacheName, $listName);
}

/**
Expand All @@ -639,7 +654,7 @@ public function listPopFront(string $cacheName, string $listName): ListPopFrontR
*/
public function listPopBack(string $cacheName, string $listName): ListPopBackResponse
{
return $this->dataClientWrapper->getClient()->listPopBack($cacheName, $listName);
return $this->getNextDataClient()->listPopBack($cacheName, $listName);
}

/**
Expand All @@ -659,7 +674,7 @@ public function listPopBack(string $cacheName, string $listName): ListPopBackRes
*/
public function listRemoveValue(string $cacheName, string $listName, string $value): ListRemoveValueResponse
{
return $this->dataClientWrapper->getClient()->listRemoveValue($cacheName, $listName, $value);
return $this->getNextDataClient()->listRemoveValue($cacheName, $listName, $value);
}

/**
Expand All @@ -680,7 +695,7 @@ public function listRemoveValue(string $cacheName, string $listName, string $val
*/
public function listLength(string $cacheName, string $listName): ListLengthResponse
{
return $this->dataClientWrapper->getClient()->listLength($cacheName, $listName);
return $this->getNextDataClient()->listLength($cacheName, $listName);
}

/**
Expand All @@ -702,7 +717,7 @@ public function listLength(string $cacheName, string $listName): ListLengthRespo
*/
public function dictionarySetField(string $cacheName, string $dictionaryName, string $field, string $value, ?CollectionTtl $ttl = null): DictionarySetFieldResponse
{
return $this->dataClientWrapper->getClient()->dictionarySetField($cacheName, $dictionaryName, $field, $value, $ttl);
return $this->getNextDataClient()->dictionarySetField($cacheName, $dictionaryName, $field, $value, $ttl);
}

/**
Expand All @@ -725,7 +740,7 @@ public function dictionarySetField(string $cacheName, string $dictionaryName, st
*/
public function dictionaryGetField(string $cacheName, string $dictionaryName, string $field): DictionaryGetFieldResponse
{
return $this->dataClientWrapper->getClient()->dictionaryGetField($cacheName, $dictionaryName, $field);
return $this->getNextDataClient()->dictionaryGetField($cacheName, $dictionaryName, $field);
}

/**
Expand All @@ -747,7 +762,7 @@ public function dictionaryGetField(string $cacheName, string $dictionaryName, st
*/
public function dictionaryFetch(string $cacheName, string $dictionaryName): DictionaryFetchResponse
{
return $this->dataClientWrapper->getClient()->dictionaryFetch($cacheName, $dictionaryName);
return $this->getNextDataClient()->dictionaryFetch($cacheName, $dictionaryName);
}

/**
Expand All @@ -768,7 +783,7 @@ public function dictionaryFetch(string $cacheName, string $dictionaryName): Dict
*/
public function dictionarySetFields(string $cacheName, string $dictionaryName, array $elements, ?CollectionTtl $ttl = null): DictionarySetFieldsResponse
{
return $this->dataClientWrapper->getClient()->dictionarySetFields($cacheName, $dictionaryName, $elements, $ttl);
return $this->getNextDataClient()->dictionarySetFields($cacheName, $dictionaryName, $elements, $ttl);
}

/**
Expand Down Expand Up @@ -799,7 +814,7 @@ public function dictionarySetFields(string $cacheName, string $dictionaryName, a
*/
public function dictionaryGetFields(string $cacheName, string $dictionaryName, array $fields): DictionaryGetFieldsResponse
{
return $this->dataClientWrapper->getClient()->dictionaryGetFields($cacheName, $dictionaryName, $fields);
return $this->getNextDataClient()->dictionaryGetFields($cacheName, $dictionaryName, $fields);
}

/**
Expand Down Expand Up @@ -828,7 +843,7 @@ public function dictionaryIncrement(
string $cacheName, string $dictionaryName, string $field, int $amount = 1, ?CollectionTtl $ttl = null
): DictionaryIncrementResponse
{
return $this->dataClientWrapper->getClient()->dictionaryIncrement($cacheName, $dictionaryName, $field, $amount, $ttl);
return $this->getNextDataClient()->dictionaryIncrement($cacheName, $dictionaryName, $field, $amount, $ttl);
}

/**
Expand All @@ -847,7 +862,7 @@ public function dictionaryIncrement(
*/
public function dictionaryRemoveField(string $cacheName, string $dictionaryName, string $field): DictionaryRemoveFieldResponse
{
return $this->dataClientWrapper->getClient()->dictionaryRemoveField($cacheName, $dictionaryName, $field);
return $this->getNextDataClient()->dictionaryRemoveField($cacheName, $dictionaryName, $field);
}

/**
Expand All @@ -866,7 +881,7 @@ public function dictionaryRemoveField(string $cacheName, string $dictionaryName,
*/
public function dictionaryRemoveFields(string $cacheName, string $dictionaryName, array $fields): DictionaryRemoveFieldsResponse
{
return $this->dataClientWrapper->getClient()->dictionaryRemoveFields($cacheName, $dictionaryName, $fields);
return $this->getNextDataClient()->dictionaryRemoveFields($cacheName, $dictionaryName, $fields);
}

/**
Expand Down Expand Up @@ -895,7 +910,7 @@ public function dictionaryRemoveFields(string $cacheName, string $dictionaryName
*/
public function setAddElementAsync(string $cacheName, string $setName, string $element, ?CollectionTtl $ttl = null): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setAddElement($cacheName, $setName, $element, $ttl);
return $this->getNextDataClient()->setAddElement($cacheName, $setName, $element, $ttl);
}

/**
Expand Down Expand Up @@ -944,7 +959,7 @@ public function setAddElement(string $cacheName, string $setName, string $elemen
*/
public function setAddElementsAsync(string $cacheName, string $setName, array $elements, ?CollectionTtl $ttl = null): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setAddElements($cacheName, $setName, $elements, $ttl);
return $this->getNextDataClient()->setAddElements($cacheName, $setName, $elements, $ttl);
}

/**
Expand Down Expand Up @@ -994,7 +1009,7 @@ public function setAddElements(string $cacheName, string $setName, array $elemen
*/
public function setFetchAsync(string $cacheName, string $setName): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setFetch($cacheName, $setName);
return $this->getNextDataClient()->setFetch($cacheName, $setName);
}

/**
Expand Down Expand Up @@ -1044,7 +1059,7 @@ public function setFetch(string $cacheName, string $setName): SetFetchResponse
*/
public function setLengthAsync(string $cacheName, string $setName): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setLength($cacheName, $setName);
return $this->getNextDataClient()->setLength($cacheName, $setName);
}

/**
Expand Down Expand Up @@ -1093,7 +1108,7 @@ public function setLength(string $cacheName, string $setName): SetLengthResponse
*/
public function setRemoveElementAsync(string $cacheName, string $setName, string $element): ResponseFuture
{
return $this->dataClientWrapper->getClient()->setRemoveElement($cacheName, $setName, $element);
return $this->getNextDataClient()->setRemoveElement($cacheName, $setName, $element);
}

/**
Expand All @@ -1114,4 +1129,11 @@ public function setRemoveElement(string $cacheName, string $setName, string $ele
{
return $this->setRemoveElementAsync($cacheName, $setName, $element)->wait();
}

private function getNextDataClient(): ScsDataClient
{
$client = $this->dataClients[$this->nextDataClientIndex]->getClient();
$this->nextDataClientIndex = ($this->nextDataClientIndex + 1) % count($this->dataClients);
return $client;
}
}
4 changes: 2 additions & 2 deletions src/Config/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ public function getLoggerFactory(): ILoggerFactory {
}

/**
* @return ITransportStrategy|null The currently active transport strategy
* @return ITransportStrategy The currently active transport strategy
*/
public function getTransportStrategy(): ITransportStrategy|null
public function getTransportStrategy(): ITransportStrategy
{
return $this->transportStrategy;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Config/Configurations/Laptop.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static function v1(?ILoggerFactory $loggerFactory = null): Laptop
{
$loggerFactory = $loggerFactory ?? new NullLoggerFactory();
$grpcConfig = new StaticGrpcConfiguration(5000);
$transportStrategy = new StaticTransportStrategy(null, $grpcConfig, $loggerFactory, self::$maxIdleMillis);
$transportStrategy = new StaticTransportStrategy($grpcConfig, $loggerFactory, self::$maxIdleMillis);
return new Laptop($loggerFactory, $transportStrategy);
}
}
6 changes: 6 additions & 0 deletions src/Config/IConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ interface IConfiguration
*/
public function getLoggerFactory(): ILoggerFactory;


/**
* @return ITransportStrategy The currently active transport strategy
*/
public function getTransportStrategy(): ITransportStrategy;

/**
* Creates a new instance of the Configuration object, updated to use the specified transport strategy.
*
Expand Down
4 changes: 4 additions & 0 deletions src/Config/Transport/IGrpcConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ public function withDeadlineMilliseconds(int $deadlineMilliseconds): IGrpcConfig
public function getForceNewChannel(): ?bool;

public function withForceNewChannel(bool $forceNewChannel) : IGrpcConfiguration;

public function getNumGrpcChannels(): int;

public function withNumGrpcChannels(int $numGrpcChannels): IGrpcConfiguration;
}
2 changes: 0 additions & 2 deletions src/Config/Transport/ITransportStrategy.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

interface ITransportStrategy
{
public function getMaxConcurrentRequests(): ?int;

public function getGrpcConfig(): ?IGrpcConfiguration;

public function getMaxIdleMillis(): ?int;
Expand Down
20 changes: 16 additions & 4 deletions src/Config/Transport/StaticGrpcConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ class StaticGrpcConfiguration implements IGrpcConfiguration
{

private ?int $deadlineMilliseconds;
private ?bool $forceNewChannel;
private bool $forceNewChannel;
private int $numGrpcChannels;

public function __construct(?int $deadlineMilliseconds = null, ?bool $forceNewChannel = false)
public function __construct(?int $deadlineMilliseconds = null, bool $forceNewChannel = false, int $numGrpcChannels = 1)
{
$this->deadlineMilliseconds = $deadlineMilliseconds;
$this->forceNewChannel = $forceNewChannel;
$this->numGrpcChannels = $numGrpcChannels;
}

public function getDeadlineMilliseconds(): int|null
Expand All @@ -22,7 +24,7 @@ public function getDeadlineMilliseconds(): int|null

public function withDeadlineMilliseconds(int $deadlineMilliseconds): StaticGrpcConfiguration
{
return new StaticGrpcConfiguration($deadlineMilliseconds, $this->forceNewChannel);
return new StaticGrpcConfiguration($deadlineMilliseconds, $this->forceNewChannel, $this->numGrpcChannels);
}

public function getForceNewChannel(): bool|null
Expand All @@ -32,6 +34,16 @@ public function getForceNewChannel(): bool|null

public function withForceNewChannel(bool $forceNewChannel): StaticGrpcConfiguration
{
return new StaticGrpcConfiguration($this->deadlineMilliseconds, $forceNewChannel);
return new StaticGrpcConfiguration($this->deadlineMilliseconds, $forceNewChannel, $this->numGrpcChannels);
}

public function getNumGrpcChannels(): int
{
return $this->numGrpcChannels;
}

public function withNumGrpcChannels(int $numGrpcChannels): IGrpcConfiguration
{
return new StaticGrpcConfiguration($this->deadlineMilliseconds, $this->forceNewChannel, $numGrpcChannels);
}
}
Loading

0 comments on commit 81e256d

Please sign in to comment.