From 266d2f102ccb982c7a88c58a3401058a7a494394 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Thu, 28 Nov 2024 19:15:40 +0100 Subject: [PATCH 1/3] feat(flagd-rpc): add caching with tests Signed-off-by: Simon Schrottner --- .../openfeature-provider-flagd/pyproject.toml | 2 + .../openfeature-provider-flagd/pytest.ini | 10 + .../contrib/provider/flagd/config.py | 45 ++++- .../contrib/provider/flagd/provider.py | 8 +- .../contrib/provider/flagd/resolvers/grpc.py | 42 +++- .../tests/e2e/config.feature | 189 ++++++++++++++++++ .../tests/e2e/conftest.py | 9 + .../tests/e2e/rpc_cache.feature | 44 ++++ .../tests/e2e/steps.py | 10 + .../tests/e2e/test_config.py | 100 +++++++++ ...rocess-file.py => test_in_process_file.py} | 3 +- .../tests/e2e/test_rpc.py | 1 + .../tests/test_config.py | 37 +++- 13 files changed, 477 insertions(+), 23 deletions(-) create mode 100644 providers/openfeature-provider-flagd/pytest.ini create mode 100644 providers/openfeature-provider-flagd/tests/e2e/config.feature create mode 100644 providers/openfeature-provider-flagd/tests/e2e/rpc_cache.feature create mode 100644 providers/openfeature-provider-flagd/tests/e2e/test_config.py rename providers/openfeature-provider-flagd/tests/e2e/{test_in-process-file.py => test_in_process_file.py} (96%) diff --git a/providers/openfeature-provider-flagd/pyproject.toml b/providers/openfeature-provider-flagd/pyproject.toml index 7e74e810..738ba02a 100644 --- a/providers/openfeature-provider-flagd/pyproject.toml +++ b/providers/openfeature-provider-flagd/pyproject.toml @@ -24,6 +24,7 @@ dependencies = [ "panzi-json-logic>=1.0.1", "semver>=3,<4", "pyyaml>=6.0.1", + "cachebox" ] requires-python = ">=3.8" @@ -59,6 +60,7 @@ cov = [ "cov-report", ] + [tool.hatch.envs.mypy] dependencies = [ "mypy[faster-cache]>=1.13.0", diff --git a/providers/openfeature-provider-flagd/pytest.ini b/providers/openfeature-provider-flagd/pytest.ini new file mode 100644 index 00000000..66da895f --- /dev/null +++ b/providers/openfeature-provider-flagd/pytest.ini @@ -0,0 +1,10 @@ +[pytest] +markers = + rpc: tests for rpc mode. + in-process: tests for rpc mode. + customCert: Supports custom certs. + unixsocket: Supports unixsockets. + events: Supports events. + sync: Supports sync. + caching: Supports caching. + offline: Supports offline. diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py index a393d270..1bb73ece 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py @@ -8,6 +8,13 @@ class ResolverType(Enum): IN_PROCESS = "in-process" +class CacheType(Enum): + LRU = "lru" + DISABLED = "disabled" + + +DEFAULT_CACHE = CacheType.LRU +DEFAULT_CACHE_SIZE = 1000 DEFAULT_DEADLINE = 500 DEFAULT_HOST = "localhost" DEFAULT_KEEP_ALIVE = 0 @@ -19,12 +26,14 @@ class ResolverType(Enum): DEFAULT_STREAM_DEADLINE = 600000 DEFAULT_TLS = False +ENV_VAR_CACHE_SIZE = "FLAGD_MAX_CACHE_SIZE" +ENV_VAR_CACHE_TYPE = "FLAGD_CACHE" ENV_VAR_DEADLINE_MS = "FLAGD_DEADLINE_MS" ENV_VAR_HOST = "FLAGD_HOST" ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS" ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH" ENV_VAR_PORT = "FLAGD_PORT" -ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER_TYPE" +ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER" ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS" ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS" ENV_VAR_TLS = "FLAGD_TLS" @@ -36,6 +45,14 @@ def str_to_bool(val: str) -> bool: return val.lower() == "true" +def convert_resolver_type(val: typing.Union[str, ResolverType]) -> ResolverType: + if isinstance(val, str): + v = val.lower() + return ResolverType(v) + else: + return ResolverType(val) + + def env_or_default( env_var: str, default: T, cast: typing.Optional[typing.Callable[[str], T]] = None ) -> typing.Union[str, T]: @@ -56,7 +73,9 @@ def __init__( # noqa: PLR0913 retry_backoff_ms: typing.Optional[int] = None, deadline: typing.Optional[int] = None, stream_deadline_ms: typing.Optional[int] = None, - keep_alive_time: typing.Optional[int] = None, + keep_alive: typing.Optional[int] = None, + cache_type: typing.Optional[CacheType] = None, + max_cache_size: typing.Optional[int] = None, ): self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host @@ -77,7 +96,9 @@ def __init__( # noqa: PLR0913 ) self.resolver_type = ( - ResolverType(env_or_default(ENV_VAR_RESOLVER_TYPE, DEFAULT_RESOLVER_TYPE)) + env_or_default( + ENV_VAR_RESOLVER_TYPE, DEFAULT_RESOLVER_TYPE, cast=convert_resolver_type + ) if resolver_type is None else resolver_type ) @@ -118,10 +139,22 @@ def __init__( # noqa: PLR0913 else stream_deadline_ms ) - self.keep_alive_time: int = ( + self.keep_alive: int = ( int( env_or_default(ENV_VAR_KEEP_ALIVE_TIME_MS, DEFAULT_KEEP_ALIVE, cast=int) ) - if keep_alive_time is None - else keep_alive_time + if keep_alive is None + else keep_alive + ) + + self.cache_type = ( + CacheType(env_or_default(ENV_VAR_CACHE_TYPE, DEFAULT_CACHE)) + if cache_type is None + else cache_type + ) + + self.max_cache_size: int = ( + int(env_or_default(ENV_VAR_CACHE_SIZE, DEFAULT_CACHE_SIZE, cast=int)) + if max_cache_size is None + else max_cache_size ) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index c45b4a86..35fe2059 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -29,7 +29,7 @@ from openfeature.provider.metadata import Metadata from openfeature.provider.provider import AbstractProvider -from .config import Config, ResolverType +from .config import CacheType, Config, ResolverType from .resolvers import AbstractResolver, GrpcResolver, InProcessResolver T = typing.TypeVar("T") @@ -50,6 +50,8 @@ def __init__( # noqa: PLR0913 offline_flag_source_path: typing.Optional[str] = None, stream_deadline_ms: typing.Optional[int] = None, keep_alive_time: typing.Optional[int] = None, + cache_type: typing.Optional[CacheType] = None, + max_cache_size: typing.Optional[int] = None, ): """ Create an instance of the FlagdProvider @@ -82,7 +84,9 @@ def __init__( # noqa: PLR0913 resolver_type=resolver_type, offline_flag_source_path=offline_flag_source_path, stream_deadline_ms=stream_deadline_ms, - keep_alive_time=keep_alive_time, + keep_alive=keep_alive_time, + cache_type=cache_type, + max_cache_size=max_cache_size, ) self.resolver = self.setup_resolver() diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index 9026e4b2..7e2f1600 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -4,6 +4,7 @@ import typing import grpc +from cachebox import BaseCacheImpl, LRUCache from google.protobuf.json_format import MessageToDict from google.protobuf.struct_pb2 import Struct @@ -18,13 +19,13 @@ ProviderNotReadyError, TypeMismatchError, ) -from openfeature.flag_evaluation import FlagResolutionDetails +from openfeature.flag_evaluation import FlagResolutionDetails, Reason from openfeature.schemas.protobuf.flagd.evaluation.v1 import ( evaluation_pb2, evaluation_pb2_grpc, ) -from ..config import Config +from ..config import CacheType, Config from ..flag_type import FlagType if typing.TYPE_CHECKING: @@ -57,6 +58,12 @@ def __init__( self.deadline = config.deadline * 0.001 self.connected = False + self._cache: typing.Optional[BaseCacheImpl] = ( + LRUCache(maxsize=self.config.max_cache_size) + if self.config.cache_type == CacheType.LRU + else None + ) + def _create_stub( self, ) -> typing.Tuple[evaluation_pb2_grpc.ServiceStub, grpc.Channel]: @@ -64,17 +71,27 @@ def _create_stub( channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel channel = channel_factory( f"{config.host}:{config.port}", - options=(("grpc.keepalive_time_ms", config.keep_alive_time),), + options=(("grpc.keepalive_time_ms", config.keep_alive),), ) stub = evaluation_pb2_grpc.ServiceStub(channel) return stub, channel def initialize(self, evaluation_context: EvaluationContext) -> None: self.connect() + self.retry_backoff_seconds = 0.1 + self.connected = False + + self._cache = ( + LRUCache(maxsize=self.config.max_cache_size) + if self.config.cache_type == CacheType.LRU + else None + ) def shutdown(self) -> None: self.active = False self.channel.close() + if self._cache: + self._cache.clear() def connect(self) -> None: self.active = True @@ -96,7 +113,6 @@ def connect(self) -> None: def listen(self) -> None: retry_delay = self.retry_backoff_seconds - call_args = ( {"timeout": self.streamline_deadline_seconds} if self.streamline_deadline_seconds > 0 @@ -148,6 +164,10 @@ def listen(self) -> None: def handle_changed_flags(self, data: typing.Any) -> None: changed_flags = list(data["flags"].keys()) + if self._cache: + for flag in changed_flags: + self._cache.pop(flag) + self.emit_provider_configuration_changed(ProviderEventDetails(changed_flags)) def resolve_boolean_details( @@ -190,13 +210,18 @@ def resolve_object_details( ) -> FlagResolutionDetails[typing.Union[dict, list]]: return self._resolve(key, FlagType.OBJECT, default_value, evaluation_context) - def _resolve( # noqa: PLR0915 + def _resolve( # noqa: PLR0915 C901 self, flag_key: str, flag_type: FlagType, default_value: T, evaluation_context: typing.Optional[EvaluationContext], ) -> FlagResolutionDetails[T]: + if self._cache is not None and flag_key in self._cache: + cached_flag: FlagResolutionDetails[T] = self._cache[flag_key] + cached_flag.reason = Reason.CACHED + return cached_flag + context = self._convert_context(evaluation_context) call_args = {"timeout": self.deadline} try: @@ -249,12 +274,17 @@ def _resolve( # noqa: PLR0915 raise GeneralError(message) from e # Got a valid flag and valid type. Return it. - return FlagResolutionDetails( + result = FlagResolutionDetails( value=value, reason=response.reason, variant=response.variant, ) + if response.reason == Reason.STATIC and self._cache is not None: + self._cache.insert(flag_key, result) + + return result + def _convert_context( self, evaluation_context: typing.Optional[EvaluationContext] ) -> Struct: diff --git a/providers/openfeature-provider-flagd/tests/e2e/config.feature b/providers/openfeature-provider-flagd/tests/e2e/config.feature new file mode 100644 index 00000000..f2bd715e --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/config.feature @@ -0,0 +1,189 @@ +Feature: Configuration Test + + @rpc @in-process + Scenario Outline: Default Config + When we initialize a config + Then the option "