From 81e256d070bbb486735562843a72308e622aeee6 Mon Sep 17 00:00:00 2001 From: Chris Price Date: Mon, 5 Jun 2023 16:55:37 -0700 Subject: [PATCH] feat: add configurability for number of gRPC channels (#158) * feat: add configurability for number of gRPC channels For users who may end up with over 100 concurrent requests in flight, we need to be able to support multiple gRPC channels. This commit adds a configuration setting to allow users to specify the desired number of channels. This most likely won't doing anything useful until we add the config flag to disable persistent TCP connections in the gRPC channel constructor, because we will just end up re-using the same channel multiple times. --- src/Cache/CacheClient.php | 80 ++++++++++++------- src/Config/Configuration.php | 4 +- src/Config/Configurations/Laptop.php | 2 +- src/Config/IConfiguration.php | 6 ++ src/Config/Transport/IGrpcConfiguration.php | 4 + src/Config/Transport/ITransportStrategy.php | 2 - .../Transport/StaticGrpcConfiguration.php | 20 ++++- .../Transport/StaticTransportStrategy.php | 19 ++--- tests/Cache/CacheClientTest.php | 2 +- tests/Cache/Psr16ClientTest.php | 2 +- 10 files changed, 87 insertions(+), 54 deletions(-) diff --git a/src/Cache/CacheClient.php b/src/Cache/CacheClient.php index 3249347c..54ac88b1 100644 --- a/src/Cache/CacheClient.php +++ b/src/Cache/CacheClient.php @@ -35,6 +35,7 @@ use Momento\Cache\CacheOperationTypes\CreateCacheResponse; use Momento\Cache\CacheOperationTypes\DeleteCacheResponse; use Momento\Cache\CacheOperationTypes\ListCachesResponse; +use Momento\Cache\Errors\InvalidArgumentError; use Momento\Cache\Internal\IdleDataClientWrapper; use Momento\Cache\Internal\ScsControlClient; use Momento\Cache\Internal\ScsDataClient; @@ -54,7 +55,12 @@ class CacheClient implements LoggerAwareInterface protected ILoggerFactory $loggerFactory; protected LoggerInterface $logger; private ScsControlClient $controlClient; - private IdleDataClientWrapper $dataClientWrapper; + + /** + * @var IdleDataClientWrapper[] + */ + private array $dataClients; + private int $nextDataClientIndex = 0; /** * @param IConfiguration $configuration Configuration to use for transport. @@ -77,7 +83,16 @@ public function __construct( $defaultTtlSeconds ); }; - $this->dataClientWrapper = new IdleDataClientWrapper($dataClientFactory, $this->configuration); + $this->dataClients = []; + + $numGrpcChannels = $configuration->getTransportStrategy()->getGrpcConfig()->getNumGrpcChannels(); + $forceNewChannels = $configuration->getTransportStrategy()->getGrpcConfig()->getForceNewChannel(); + if (($numGrpcChannels > 1) && (! $forceNewChannels)) { + throw new InvalidArgumentError("When setting NumGrpcChannels > 1, you must also set ForceNewChannel to true, or else the gRPC library will re-use the same channel."); + } + for ($i = 0; $i < $numGrpcChannels; $i++) { + array_push($this->dataClients, new IdleDataClientWrapper($dataClientFactory, $this->configuration)); + } } /** @@ -171,7 +186,7 @@ public function deleteCache(string $cacheName): DeleteCacheResponse */ public function setAsync(string $cacheName, string $key, string $value, int $ttlSeconds = 0): ResponseFuture { - return $this->dataClientWrapper->getClient()->set($cacheName, $key, $value, $ttlSeconds); + return $this->getNextDataClient()->set($cacheName, $key, $value, $ttlSeconds); } /** @@ -223,7 +238,7 @@ public function set(string $cacheName, string $key, string $value, int $ttlSecon */ public function getAsync(string $cacheName, string $key): ResponseFuture { - return $this->dataClientWrapper->getClient()->get($cacheName, $key); + return $this->getNextDataClient()->get($cacheName, $key); } /** @@ -282,7 +297,7 @@ public function get(string $cacheName, string $key): GetResponse */ public function setIfNotExistsAsync(string $cacheName, string $key, string $value, int $ttlSeconds = 0): ResponseFuture { - return $this->dataClientWrapper->getClient()->setIfNotExists($cacheName, $key, $value, $ttlSeconds); + return $this->getNextDataClient()->setIfNotExists($cacheName, $key, $value, $ttlSeconds); } /** @@ -336,7 +351,7 @@ public function setIfNotExists(string $cacheName, string $key, string $value, in */ public function deleteAsync(string $cacheName, string $key): ResponseFuture { - return $this->dataClientWrapper->getClient()->delete($cacheName, $key); + return $this->getNextDataClient()->delete($cacheName, $key); } /** @@ -387,7 +402,7 @@ public function delete(string $cacheName, string $key): DeleteResponse */ public function keysExistAsync(string $cacheName, array $keys): ResponseFuture { - return $this->dataClientWrapper->getClient()->keysExist($cacheName, $keys); + return $this->getNextDataClient()->keysExist($cacheName, $keys); } /** @@ -440,7 +455,7 @@ public function keysExist(string $cacheName, array $keys): KeysExistResponse */ public function keyExistsAsync(string $cacheName, string $key): ResponseFuture { - return $this->dataClientWrapper->getClient()->keyExists($cacheName, $key); + return $this->getNextDataClient()->keyExists($cacheName, $key); } /** @@ -495,7 +510,7 @@ public function incrementAsync( string $cacheName, string $key, int $amount = 1, ?int $ttlSeconds = null ): ResponseFuture { - return $this->dataClientWrapper->getClient()->increment($cacheName, $key, $amount, $ttlSeconds); + return $this->getNextDataClient()->increment($cacheName, $key, $amount, $ttlSeconds); } /** @@ -543,7 +558,7 @@ public function increment( */ public function listFetch(string $cacheName, string $listName): ListFetchResponse { - return $this->dataClientWrapper->getClient()->listFetch($cacheName, $listName); + return $this->getNextDataClient()->listFetch($cacheName, $listName); } /** @@ -569,7 +584,7 @@ public function listPushFront( string $cacheName, string $listName, string $value, ?int $truncateBackToSize = null, ?CollectionTtl $ttl = null ): ListPushFrontResponse { - return $this->dataClientWrapper->getClient()->listPushFront($cacheName, $listName, $value, $truncateBackToSize, $ttl); + return $this->getNextDataClient()->listPushFront($cacheName, $listName, $value, $truncateBackToSize, $ttl); } /** @@ -595,7 +610,7 @@ public function listPushBack( string $cacheName, string $listName, string $value, ?int $truncateFrontToSize = null, ?CollectionTtl $ttl = null ): ListPushBackResponse { - return $this->dataClientWrapper->getClient()->listPushBack($cacheName, $listName, $value, $truncateFrontToSize, $ttl); + return $this->getNextDataClient()->listPushBack($cacheName, $listName, $value, $truncateFrontToSize, $ttl); } /** @@ -617,7 +632,7 @@ public function listPushBack( */ public function listPopFront(string $cacheName, string $listName): ListPopFrontResponse { - return $this->dataClientWrapper->getClient()->listPopFront($cacheName, $listName); + return $this->getNextDataClient()->listPopFront($cacheName, $listName); } /** @@ -639,7 +654,7 @@ public function listPopFront(string $cacheName, string $listName): ListPopFrontR */ public function listPopBack(string $cacheName, string $listName): ListPopBackResponse { - return $this->dataClientWrapper->getClient()->listPopBack($cacheName, $listName); + return $this->getNextDataClient()->listPopBack($cacheName, $listName); } /** @@ -659,7 +674,7 @@ public function listPopBack(string $cacheName, string $listName): ListPopBackRes */ public function listRemoveValue(string $cacheName, string $listName, string $value): ListRemoveValueResponse { - return $this->dataClientWrapper->getClient()->listRemoveValue($cacheName, $listName, $value); + return $this->getNextDataClient()->listRemoveValue($cacheName, $listName, $value); } /** @@ -680,7 +695,7 @@ public function listRemoveValue(string $cacheName, string $listName, string $val */ public function listLength(string $cacheName, string $listName): ListLengthResponse { - return $this->dataClientWrapper->getClient()->listLength($cacheName, $listName); + return $this->getNextDataClient()->listLength($cacheName, $listName); } /** @@ -702,7 +717,7 @@ public function listLength(string $cacheName, string $listName): ListLengthRespo */ public function dictionarySetField(string $cacheName, string $dictionaryName, string $field, string $value, ?CollectionTtl $ttl = null): DictionarySetFieldResponse { - return $this->dataClientWrapper->getClient()->dictionarySetField($cacheName, $dictionaryName, $field, $value, $ttl); + return $this->getNextDataClient()->dictionarySetField($cacheName, $dictionaryName, $field, $value, $ttl); } /** @@ -725,7 +740,7 @@ public function dictionarySetField(string $cacheName, string $dictionaryName, st */ public function dictionaryGetField(string $cacheName, string $dictionaryName, string $field): DictionaryGetFieldResponse { - return $this->dataClientWrapper->getClient()->dictionaryGetField($cacheName, $dictionaryName, $field); + return $this->getNextDataClient()->dictionaryGetField($cacheName, $dictionaryName, $field); } /** @@ -747,7 +762,7 @@ public function dictionaryGetField(string $cacheName, string $dictionaryName, st */ public function dictionaryFetch(string $cacheName, string $dictionaryName): DictionaryFetchResponse { - return $this->dataClientWrapper->getClient()->dictionaryFetch($cacheName, $dictionaryName); + return $this->getNextDataClient()->dictionaryFetch($cacheName, $dictionaryName); } /** @@ -768,7 +783,7 @@ public function dictionaryFetch(string $cacheName, string $dictionaryName): Dict */ public function dictionarySetFields(string $cacheName, string $dictionaryName, array $elements, ?CollectionTtl $ttl = null): DictionarySetFieldsResponse { - return $this->dataClientWrapper->getClient()->dictionarySetFields($cacheName, $dictionaryName, $elements, $ttl); + return $this->getNextDataClient()->dictionarySetFields($cacheName, $dictionaryName, $elements, $ttl); } /** @@ -799,7 +814,7 @@ public function dictionarySetFields(string $cacheName, string $dictionaryName, a */ public function dictionaryGetFields(string $cacheName, string $dictionaryName, array $fields): DictionaryGetFieldsResponse { - return $this->dataClientWrapper->getClient()->dictionaryGetFields($cacheName, $dictionaryName, $fields); + return $this->getNextDataClient()->dictionaryGetFields($cacheName, $dictionaryName, $fields); } /** @@ -828,7 +843,7 @@ public function dictionaryIncrement( string $cacheName, string $dictionaryName, string $field, int $amount = 1, ?CollectionTtl $ttl = null ): DictionaryIncrementResponse { - return $this->dataClientWrapper->getClient()->dictionaryIncrement($cacheName, $dictionaryName, $field, $amount, $ttl); + return $this->getNextDataClient()->dictionaryIncrement($cacheName, $dictionaryName, $field, $amount, $ttl); } /** @@ -847,7 +862,7 @@ public function dictionaryIncrement( */ public function dictionaryRemoveField(string $cacheName, string $dictionaryName, string $field): DictionaryRemoveFieldResponse { - return $this->dataClientWrapper->getClient()->dictionaryRemoveField($cacheName, $dictionaryName, $field); + return $this->getNextDataClient()->dictionaryRemoveField($cacheName, $dictionaryName, $field); } /** @@ -866,7 +881,7 @@ public function dictionaryRemoveField(string $cacheName, string $dictionaryName, */ public function dictionaryRemoveFields(string $cacheName, string $dictionaryName, array $fields): DictionaryRemoveFieldsResponse { - return $this->dataClientWrapper->getClient()->dictionaryRemoveFields($cacheName, $dictionaryName, $fields); + return $this->getNextDataClient()->dictionaryRemoveFields($cacheName, $dictionaryName, $fields); } /** @@ -895,7 +910,7 @@ public function dictionaryRemoveFields(string $cacheName, string $dictionaryName */ public function setAddElementAsync(string $cacheName, string $setName, string $element, ?CollectionTtl $ttl = null): ResponseFuture { - return $this->dataClientWrapper->getClient()->setAddElement($cacheName, $setName, $element, $ttl); + return $this->getNextDataClient()->setAddElement($cacheName, $setName, $element, $ttl); } /** @@ -944,7 +959,7 @@ public function setAddElement(string $cacheName, string $setName, string $elemen */ public function setAddElementsAsync(string $cacheName, string $setName, array $elements, ?CollectionTtl $ttl = null): ResponseFuture { - return $this->dataClientWrapper->getClient()->setAddElements($cacheName, $setName, $elements, $ttl); + return $this->getNextDataClient()->setAddElements($cacheName, $setName, $elements, $ttl); } /** @@ -994,7 +1009,7 @@ public function setAddElements(string $cacheName, string $setName, array $elemen */ public function setFetchAsync(string $cacheName, string $setName): ResponseFuture { - return $this->dataClientWrapper->getClient()->setFetch($cacheName, $setName); + return $this->getNextDataClient()->setFetch($cacheName, $setName); } /** @@ -1044,7 +1059,7 @@ public function setFetch(string $cacheName, string $setName): SetFetchResponse */ public function setLengthAsync(string $cacheName, string $setName): ResponseFuture { - return $this->dataClientWrapper->getClient()->setLength($cacheName, $setName); + return $this->getNextDataClient()->setLength($cacheName, $setName); } /** @@ -1093,7 +1108,7 @@ public function setLength(string $cacheName, string $setName): SetLengthResponse */ public function setRemoveElementAsync(string $cacheName, string $setName, string $element): ResponseFuture { - return $this->dataClientWrapper->getClient()->setRemoveElement($cacheName, $setName, $element); + return $this->getNextDataClient()->setRemoveElement($cacheName, $setName, $element); } /** @@ -1114,4 +1129,11 @@ public function setRemoveElement(string $cacheName, string $setName, string $ele { return $this->setRemoveElementAsync($cacheName, $setName, $element)->wait(); } + + private function getNextDataClient(): ScsDataClient + { + $client = $this->dataClients[$this->nextDataClientIndex]->getClient(); + $this->nextDataClientIndex = ($this->nextDataClientIndex + 1) % count($this->dataClients); + return $client; + } } diff --git a/src/Config/Configuration.php b/src/Config/Configuration.php index 6c0fe8e1..dde85aa9 100644 --- a/src/Config/Configuration.php +++ b/src/Config/Configuration.php @@ -31,9 +31,9 @@ public function getLoggerFactory(): ILoggerFactory { } /** - * @return ITransportStrategy|null The currently active transport strategy + * @return ITransportStrategy The currently active transport strategy */ - public function getTransportStrategy(): ITransportStrategy|null + public function getTransportStrategy(): ITransportStrategy { return $this->transportStrategy; } diff --git a/src/Config/Configurations/Laptop.php b/src/Config/Configurations/Laptop.php index c1402e53..944fc6d7 100644 --- a/src/Config/Configurations/Laptop.php +++ b/src/Config/Configurations/Laptop.php @@ -37,7 +37,7 @@ public static function v1(?ILoggerFactory $loggerFactory = null): Laptop { $loggerFactory = $loggerFactory ?? new NullLoggerFactory(); $grpcConfig = new StaticGrpcConfiguration(5000); - $transportStrategy = new StaticTransportStrategy(null, $grpcConfig, $loggerFactory, self::$maxIdleMillis); + $transportStrategy = new StaticTransportStrategy($grpcConfig, $loggerFactory, self::$maxIdleMillis); return new Laptop($loggerFactory, $transportStrategy); } } diff --git a/src/Config/IConfiguration.php b/src/Config/IConfiguration.php index 4d38db6a..22ff56d7 100644 --- a/src/Config/IConfiguration.php +++ b/src/Config/IConfiguration.php @@ -16,6 +16,12 @@ interface IConfiguration */ public function getLoggerFactory(): ILoggerFactory; + + /** + * @return ITransportStrategy The currently active transport strategy + */ + public function getTransportStrategy(): ITransportStrategy; + /** * Creates a new instance of the Configuration object, updated to use the specified transport strategy. * diff --git a/src/Config/Transport/IGrpcConfiguration.php b/src/Config/Transport/IGrpcConfiguration.php index afa7bcc7..ff4362c6 100644 --- a/src/Config/Transport/IGrpcConfiguration.php +++ b/src/Config/Transport/IGrpcConfiguration.php @@ -12,4 +12,8 @@ public function withDeadlineMilliseconds(int $deadlineMilliseconds): IGrpcConfig public function getForceNewChannel(): ?bool; public function withForceNewChannel(bool $forceNewChannel) : IGrpcConfiguration; + + public function getNumGrpcChannels(): int; + + public function withNumGrpcChannels(int $numGrpcChannels): IGrpcConfiguration; } diff --git a/src/Config/Transport/ITransportStrategy.php b/src/Config/Transport/ITransportStrategy.php index f4c50f54..477d430a 100644 --- a/src/Config/Transport/ITransportStrategy.php +++ b/src/Config/Transport/ITransportStrategy.php @@ -7,8 +7,6 @@ interface ITransportStrategy { - public function getMaxConcurrentRequests(): ?int; - public function getGrpcConfig(): ?IGrpcConfiguration; public function getMaxIdleMillis(): ?int; diff --git a/src/Config/Transport/StaticGrpcConfiguration.php b/src/Config/Transport/StaticGrpcConfiguration.php index 55823bf3..dc1d5598 100644 --- a/src/Config/Transport/StaticGrpcConfiguration.php +++ b/src/Config/Transport/StaticGrpcConfiguration.php @@ -7,12 +7,14 @@ class StaticGrpcConfiguration implements IGrpcConfiguration { private ?int $deadlineMilliseconds; - private ?bool $forceNewChannel; + private bool $forceNewChannel; + private int $numGrpcChannels; - public function __construct(?int $deadlineMilliseconds = null, ?bool $forceNewChannel = false) + public function __construct(?int $deadlineMilliseconds = null, bool $forceNewChannel = false, int $numGrpcChannels = 1) { $this->deadlineMilliseconds = $deadlineMilliseconds; $this->forceNewChannel = $forceNewChannel; + $this->numGrpcChannels = $numGrpcChannels; } public function getDeadlineMilliseconds(): int|null @@ -22,7 +24,7 @@ public function getDeadlineMilliseconds(): int|null public function withDeadlineMilliseconds(int $deadlineMilliseconds): StaticGrpcConfiguration { - return new StaticGrpcConfiguration($deadlineMilliseconds, $this->forceNewChannel); + return new StaticGrpcConfiguration($deadlineMilliseconds, $this->forceNewChannel, $this->numGrpcChannels); } public function getForceNewChannel(): bool|null @@ -32,6 +34,16 @@ public function getForceNewChannel(): bool|null public function withForceNewChannel(bool $forceNewChannel): StaticGrpcConfiguration { - return new StaticGrpcConfiguration($this->deadlineMilliseconds, $forceNewChannel); + return new StaticGrpcConfiguration($this->deadlineMilliseconds, $forceNewChannel, $this->numGrpcChannels); + } + + public function getNumGrpcChannels(): int + { + return $this->numGrpcChannels; + } + + public function withNumGrpcChannels(int $numGrpcChannels): IGrpcConfiguration + { + return new StaticGrpcConfiguration($this->deadlineMilliseconds, $this->forceNewChannel, $numGrpcChannels); } } diff --git a/src/Config/Transport/StaticTransportStrategy.php b/src/Config/Transport/StaticTransportStrategy.php index 433714a2..8b6c877b 100644 --- a/src/Config/Transport/StaticTransportStrategy.php +++ b/src/Config/Transport/StaticTransportStrategy.php @@ -7,29 +7,21 @@ class StaticTransportStrategy implements ITransportStrategy { - private ?int $maxConcurrentRequests; - private ?IGrpcConfiguration $grpcConfig; + private IGrpcConfiguration $grpcConfig; private ?ILoggerFactory $loggerFactory; private ?int $maxIdleMillis; public function __construct( - ?int $maxConcurrentRequests = null, - ?IGrpcConfiguration $grpcConfig = null, + IGrpcConfiguration $grpcConfig, ?ILoggerFactory $loggerFactory = null, ?int $maxIdleMillis = null, ) { - $this->maxConcurrentRequests = $maxConcurrentRequests; $this->grpcConfig = $grpcConfig; $this->loggerFactory = $loggerFactory; $this->maxIdleMillis = $maxIdleMillis; } - public function getMaxConcurrentRequests(): ?int - { - return $this->maxConcurrentRequests; - } - public function getGrpcConfig(): IGrpcConfiguration|null { return $this->grpcConfig; @@ -43,28 +35,27 @@ public function getMaxIdleMillis(): ?int public function withLoggerFactory(ILoggerFactory $loggerFactory): StaticTransportStrategy { return new StaticTransportStrategy( - $this->maxConcurrentRequests, $this->grpcConfig, $loggerFactory, $this->maxIdleMillis + $this->grpcConfig, $loggerFactory, $this->maxIdleMillis ); } public function withGrpcConfig(IGrpcConfiguration $grpcConfig): StaticTransportStrategy { return new StaticTransportStrategy( - $this->maxConcurrentRequests, $grpcConfig, $this->loggerFactory, $this->maxIdleMillis + $grpcConfig, $this->loggerFactory, $this->maxIdleMillis ); } public function withMaxIdleMillis(int $maxIdleMillis): StaticTransportStrategy { return new StaticTransportStrategy( - $this->maxConcurrentRequests, $this->grpcConfig, $this->loggerFactory, $maxIdleMillis + $this->grpcConfig, $this->loggerFactory, $maxIdleMillis ); } public function withClientTimeout(int $clientTimeout): StaticTransportStrategy { return new StaticTransportStrategy( - $this->maxConcurrentRequests, $this->grpcConfig->withDeadlineMilliseconds($clientTimeout), $this->loggerFactory, $this->maxIdleMillis diff --git a/tests/Cache/CacheClientTest.php b/tests/Cache/CacheClientTest.php index 36b01182..c3fb0dd3 100644 --- a/tests/Cache/CacheClientTest.php +++ b/tests/Cache/CacheClientTest.php @@ -64,7 +64,7 @@ private function getConfigurationWithDeadline(int $deadline) { $loggerFactory = new NullLoggerFactory(); $grpcConfig = new StaticGrpcConfiguration($deadline); - $transportStrategy = new StaticTransportStrategy(null, $grpcConfig, $loggerFactory); + $transportStrategy = new StaticTransportStrategy($grpcConfig, $loggerFactory); return new Configuration($loggerFactory, $transportStrategy); } diff --git a/tests/Cache/Psr16ClientTest.php b/tests/Cache/Psr16ClientTest.php index 263e2721..22e296b2 100644 --- a/tests/Cache/Psr16ClientTest.php +++ b/tests/Cache/Psr16ClientTest.php @@ -29,7 +29,7 @@ public function setUp(): void { $loggerFactory = new NullLoggerFactory(); $grpcConfiguration = new StaticGrpcConfiguration(5000); - $transportStrategy = new StaticTransportStrategy(null, $grpcConfiguration); + $transportStrategy = new StaticTransportStrategy($grpcConfiguration); $configuration = new Configuration($loggerFactory, $transportStrategy); $authProvider = new EnvMomentoTokenProvider("TEST_AUTH_TOKEN"); $this->client = new Psr16CacheClient($configuration, $authProvider, $this->DEFAULT_TTL_SECONDS);