diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py index 299a004b..16c958c5 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/proto/flagd/evaluation/v1/evaluation_pb2_grpc.py @@ -2,7 +2,7 @@ """Client and server classes corresponding to protobuf-defined services.""" import grpc -from flagd.evaluation.v1 import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2 +from . import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2 class ServiceStub(object): diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py index 76307475..b569a077 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py @@ -69,7 +69,12 @@ def __init__( # noqa: PLR0913 def setup_resolver(self) -> AbstractResolver: if self.config.resolver_type == ResolverType.GRPC: - return GrpcResolver(self.config) + return GrpcResolver( + self.config, + self.emit_provider_ready, + self.emit_provider_error, + self.emit_provider_configuration_changed, + ) elif self.config.resolver_type == ResolverType.IN_PROCESS: return InProcessResolver(self.config, self) else: @@ -77,6 +82,9 @@ def setup_resolver(self) -> AbstractResolver: f"`resolver_type` parameter invalid: {self.config.resolver_type}" ) + def initialize(self, evaluation_context: EvaluationContext) -> None: + self.resolver.initialize(evaluation_context) + def shutdown(self) -> None: if self.resolver: self.resolver.shutdown() diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py index d0d46f59..73923abb 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py @@ -8,6 +8,8 @@ class AbstractResolver(typing.Protocol): + def initialize(self, evaluation_context: EvaluationContext) -> None: ... + def shutdown(self) -> None: ... def resolve_boolean_details( diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py index 194dc558..1511fc7b 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py @@ -1,3 +1,6 @@ +import logging +import threading +import time import typing import grpc @@ -5,7 +8,9 @@ from google.protobuf.struct_pb2 import Struct from openfeature.evaluation_context import EvaluationContext +from openfeature.event import ProviderEventDetails from openfeature.exception import ( + ErrorCode, FlagNotFoundError, GeneralError, InvalidContextError, @@ -16,23 +21,99 @@ from ..config import Config from ..flag_type import FlagType -from ..proto.schema.v1 import schema_pb2, schema_pb2_grpc +from ..proto.flagd.evaluation.v1 import evaluation_pb2, evaluation_pb2_grpc T = typing.TypeVar("T") +logger = logging.getLogger("openfeature.contrib") + class GrpcResolver: - def __init__(self, config: Config): + MAX_BACK_OFF = 120 + + def __init__( + self, + config: Config, + emit_provider_ready: typing.Callable[[ProviderEventDetails], None], + emit_provider_error: typing.Callable[[ProviderEventDetails], None], + emit_provider_configuration_changed: typing.Callable[ + [ProviderEventDetails], None + ], + ): self.config = config + self.emit_provider_ready = emit_provider_ready + self.emit_provider_error = emit_provider_error + self.emit_provider_configuration_changed = emit_provider_configuration_changed channel_factory = ( grpc.secure_channel if self.config.tls else grpc.insecure_channel ) self.channel = channel_factory(f"{self.config.host}:{self.config.port}") - self.stub = schema_pb2_grpc.ServiceStub(self.channel) + self.stub = evaluation_pb2_grpc.ServiceStub(self.channel) + self.retry_backoff_seconds = 0.1 + self.connected = False + + def initialize(self, evaluation_context: EvaluationContext) -> None: + self.connect() def shutdown(self) -> None: + self.active = False self.channel.close() + def connect(self) -> None: + self.active = True + self.thread = threading.Thread( + target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread" + ) + self.thread.start() + + def listen(self) -> None: + retry_delay = self.retry_backoff_seconds + while self.active: + request = evaluation_pb2.EventStreamRequest() # type:ignore[attr-defined] + try: + logger.debug("Setting up gRPC sync flags connection") + for message in self.stub.EventStream(request): + if message.type == "provider_ready": + if not self.connected: + self.emit_provider_ready( + ProviderEventDetails( + message="gRPC sync connection established" + ) + ) + self.connected = True + # reset retry delay after successsful read + retry_delay = self.retry_backoff_seconds + + elif message.type == "configuration_change": + data = MessageToDict(message)["data"] + self.handle_changed_flags(data) + + if not self.active: + logger.info("Terminating gRPC sync thread") + return + except grpc.RpcError as e: + logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}") + except ParseError: + logger.exception( + f"Could not parse flag data using flagd syntax: {message=}" + ) + + self.connected = False + self.emit_provider_error( + ProviderEventDetails( + message=f"gRPC sync disconnected, reconnecting in {retry_delay}s", + error_code=ErrorCode.GENERAL, + ) + ) + logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s") + time.sleep(retry_delay) + retry_delay = min(2 * retry_delay, self.MAX_BACK_OFF) + + def handle_changed_flags(self, data: typing.Any) -> None: + changed_flags = list(data["flags"].keys()) + + self.emit_provider_configuration_changed(ProviderEventDetails(changed_flags)) + def resolve_boolean_details( self, key: str, @@ -84,19 +165,19 @@ def _resolve( # noqa: PLR0915 call_args = {"timeout": self.config.timeout} try: if flag_type == FlagType.BOOLEAN: - request = schema_pb2.ResolveBooleanRequest( # type:ignore[attr-defined] + request = evaluation_pb2.ResolveBooleanRequest( # type:ignore[attr-defined] flag_key=flag_key, context=context ) response = self.stub.ResolveBoolean(request, **call_args) value = response.value elif flag_type == FlagType.STRING: - request = schema_pb2.ResolveStringRequest( # type:ignore[attr-defined] + request = evaluation_pb2.ResolveStringRequest( # type:ignore[attr-defined] flag_key=flag_key, context=context ) response = self.stub.ResolveString(request, **call_args) value = response.value elif flag_type == FlagType.OBJECT: - request = schema_pb2.ResolveObjectRequest( # type:ignore[attr-defined] + request = evaluation_pb2.ResolveObjectRequest( # type:ignore[attr-defined] flag_key=flag_key, context=context ) response = self.stub.ResolveObject(request, **call_args) @@ -104,13 +185,13 @@ def _resolve( # noqa: PLR0915 "value" ] elif flag_type == FlagType.FLOAT: - request = schema_pb2.ResolveFloatRequest( # type:ignore[attr-defined] + request = evaluation_pb2.ResolveFloatRequest( # type:ignore[attr-defined] flag_key=flag_key, context=context ) response = self.stub.ResolveFloat(request, **call_args) value = response.value elif flag_type == FlagType.INTEGER: - request = schema_pb2.ResolveIntRequest( # type:ignore[attr-defined] + request = evaluation_pb2.ResolveIntRequest( # type:ignore[attr-defined] flag_key=flag_key, context=context ) response = self.stub.ResolveInt(request, **call_args) diff --git a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py index 69b4989a..ff207b1e 100644 --- a/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py +++ b/providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/in_process.py @@ -3,7 +3,7 @@ from openfeature.evaluation_context import EvaluationContext from openfeature.exception import FlagNotFoundError, ParseError from openfeature.flag_evaluation import FlagResolutionDetails, Reason -from openfeature.provider.provider import AbstractProvider +from openfeature.provider import AbstractProvider from ..config import Config from .process.file_watcher import FileWatcherFlagStore @@ -26,6 +26,9 @@ def __init__(self, config: Config, provider: AbstractProvider): self.config.offline_poll_interval_seconds, ) + def initialize(self, evaluation_context: EvaluationContext) -> None: + pass + def shutdown(self) -> None: self.flag_store.shutdown() diff --git a/providers/openfeature-provider-flagd/tests/e2e/conftest.py b/providers/openfeature-provider-flagd/tests/e2e/conftest.py index 2aae58d9..25501b41 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/conftest.py +++ b/providers/openfeature-provider-flagd/tests/e2e/conftest.py @@ -5,29 +5,22 @@ from tests.e2e.flagd_container import FlagdContainer from tests.e2e.steps import * # noqa: F403 -from openfeature import api -from openfeature.contrib.provider.flagd import FlagdProvider - JsonPrimitive = typing.Union[str, bool, float, int] -@pytest.fixture(autouse=True, scope="package") -def setup(request, port, image, resolver_type): +@pytest.fixture(autouse=True, scope="module") +def setup(request, port, image): container: DockerContainer = FlagdContainer( image=image, port=port, ) # Setup code c = container.start() - api.set_provider( - FlagdProvider( - resolver_type=resolver_type, - port=int(container.get_exposed_port(port)), - ) - ) def fin(): c.stop() # Teardown code request.addfinalizer(fin) + + return c.get_exposed_port(port) diff --git a/providers/openfeature-provider-flagd/tests/e2e/steps.py b/providers/openfeature-provider-flagd/tests/e2e/steps.py index fe490c5f..21dc3ef4 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/steps.py +++ b/providers/openfeature-provider-flagd/tests/e2e/steps.py @@ -1,3 +1,4 @@ +import logging import time import typing @@ -8,8 +9,9 @@ from openfeature import api from openfeature.client import OpenFeatureClient +from openfeature.contrib.provider.flagd import FlagdProvider from openfeature.evaluation_context import EvaluationContext -from openfeature.event import EventDetails, ProviderEvent +from openfeature.event import ProviderEvent from openfeature.flag_evaluation import ErrorCode, FlagEvaluationDetails, Reason from openfeature.provider import ProviderStatus @@ -24,8 +26,15 @@ def evaluation_context() -> EvaluationContext: @given("a flagd provider is set", target_fixture="client") @given("a provider is registered", target_fixture="client") -def setup_provider() -> OpenFeatureClient: - client = api.get_client() +def setup_provider(setup, resolver_type, client_name) -> OpenFeatureClient: + api.set_provider( + FlagdProvider( + resolver_type=resolver_type, + port=setup, + ), + client_name, + ) + client = api.get_client(client_name) wait_for(lambda: client.get_provider_status() == ProviderStatus.READY) return client @@ -491,27 +500,35 @@ def assert_reason( assert_equal(evaluation_result.reason, reason) -@when(parsers.cfparse("a PROVIDER_READY handler is added")) -def provider_ready_add(client: OpenFeatureClient, context): - def provider_ready_handler(event_details: EventDetails): - context["provider_ready_ran"] = True +@pytest.fixture() +def event_handles() -> list: + return [] - client.add_handler(ProviderEvent.PROVIDER_READY, provider_ready_handler) +@pytest.fixture() +def error_handles() -> list: + return [] -@then(parsers.cfparse("the PROVIDER_READY handler must run")) -def provider_ready_was_executed(client: OpenFeatureClient, context): - assert_true(context["provider_ready_ran"]) +@when( + parsers.cfparse( + "a {event_type:ProviderEvent} handler is added", + extra_types={"ProviderEvent": ProviderEvent}, + ), +) +def add_event_handler( + client: OpenFeatureClient, event_type: ProviderEvent, event_handles: list +): + def handler(event): + logging.debug((event_type, event)) + event_handles.append( + { + "type": event_type, + "event": event, + } + ) -@when(parsers.cfparse("a PROVIDER_CONFIGURATION_CHANGED handler is added")) -def provider_changed_add(client: OpenFeatureClient, context): - def provider_changed_handler(event_details: EventDetails): - context["provider_changed_ran"] = True - - client.add_handler( - ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, provider_changed_handler - ) + client.add_handler(event_type, handler) @pytest.fixture(scope="function") @@ -519,30 +536,100 @@ def context(): return {} -@when(parsers.cfparse('a flag with key "{flag_key}" is modified')) -def assert_reason2( +@when( + parsers.cfparse( + "a {event_type:ProviderEvent} handler and a {event_type2:ProviderEvent} handler are added", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def add_event_handlers( client: OpenFeatureClient, - context, - flag_key: str, + event_type: ProviderEvent, + event_type2: ProviderEvent, + event_handles, + error_handles, ): - context["flag_key"] = flag_key + add_event_handler(client, event_type, event_handles) + add_event_handler(client, event_type2, error_handles) +def assert_handlers( + handles, event_type: ProviderEvent, max_wait: int = 2, num_events: int = 1 +): + poll_interval = 1 + while max_wait > 0: + if sum([h["type"] == event_type for h in handles]) < num_events: + max_wait -= poll_interval + time.sleep(poll_interval) + continue + break + + logging.info(f"asserting num({event_type}) >= {num_events}: {handles}") + actual_num_events = sum([h["type"] == event_type for h in handles]) + assert ( + num_events <= actual_num_events + ), f"Expected {num_events} but got {actual_num_events}: {handles}" + + +@then( + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) @then( - parsers.cfparse("the PROVIDER_CONFIGURATION_CHANGED handler must run"), + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run when the provider connects", + extra_types={"ProviderEvent": ProviderEvent}, + ) ) -def provider_changed_was_executed(client: OpenFeatureClient, context): - wait_for(lambda: context.get("provider_changed_ran")) - assert_equal(context["provider_changed_ran"], True) +def assert_handler_run(event_type: ProviderEvent, event_handles): + assert_handlers(event_handles, event_type, max_wait=6) -@then(parsers.cfparse('the event details must indicate "{flag_name}" was altered')) -def flag_was_changed( - flag_name: str, - context, +@then( + parsers.cfparse( + "the {event_type:ProviderEvent} handler must run when the provider's connection is lost", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def assert_disconnect_handler(error_handles, event_type: ProviderEvent): + # docker sync upstream restarts every 5s, waiting 2 cycles reduces test noise + assert_handlers(error_handles, event_type, max_wait=30) + + +@when( + parsers.cfparse('a flag with key "{flag_key}" is modified'), + target_fixture="changed_flag", +) +def changed_flag( + flag_key: str, +): + return flag_key + + +@then( + parsers.cfparse( + "when the connection is reestablished the {event_type:ProviderEvent} handler must run again", + extra_types={"ProviderEvent": ProviderEvent}, + ) +) +def assert_disconnect_error( + client: OpenFeatureClient, event_type: ProviderEvent, event_handles: list ): - wait_for(lambda: flag_name in context.get("changed_flags")) - assert_in(flag_name, context.get("changed_flags")) + assert_handlers(event_handles, event_type, max_wait=30, num_events=2) + + +@then(parsers.cfparse('the event details must indicate "{key}" was altered')) +def assert_flag_changed(event_handles, key): + handle = None + for h in event_handles: + if h["type"] == ProviderEvent.PROVIDER_CONFIGURATION_CHANGED: + handle = h + break + + assert handle is not None + assert key in handle["event"].flags_changed def wait_for(pred, poll_sec=2, timeout_sec=10): @@ -551,3 +638,26 @@ def wait_for(pred, poll_sec=2, timeout_sec=10): time.sleep(poll_sec) assert_true(pred()) return ok + + +@given("flagd is unavailable", target_fixture="client") +def flagd_unavailable(resolver_type): + api.set_provider( + FlagdProvider( + resolver_type=resolver_type, + port=99999, + ), + "unavailable", + ) + return api.get_client("unavailable") + + +@when("a flagd provider is set and initialization is awaited") +def flagd_init(client: OpenFeatureClient, event_handles, error_handles): + add_event_handler(client, ProviderEvent.PROVIDER_ERROR, error_handles) + add_event_handler(client, ProviderEvent.PROVIDER_READY, event_handles) + + +@then("an error should be indicated within the configured deadline") +def flagd_error(error_handles): + assert_handlers(error_handles, ProviderEvent.PROVIDER_ERROR) diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_in-process-file.py b/providers/openfeature-provider-flagd/tests/e2e/test_in-process-file.py index 2d09ca11..60f708bd 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_in-process-file.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_in-process-file.py @@ -5,11 +5,14 @@ import pytest import yaml -from pytest_bdd import scenario, scenarios +from pytest_bdd import given, scenario, scenarios +from tests.e2e.steps import wait_for from openfeature import api +from openfeature.client import OpenFeatureClient from openfeature.contrib.provider.flagd import FlagdProvider from openfeature.contrib.provider.flagd.config import ResolverType +from openfeature.provider import ProviderStatus KEY_EVALUATORS = "$evaluators" @@ -18,7 +21,7 @@ MERGED_FILE = "merged_file" -@pytest.fixture(params=["json", "yaml"], scope="package") +@pytest.fixture(params=["json", "yaml"], scope="module") def file_name(request): extension = request.param result = {KEY_FLAGS: {}, KEY_EVALUATORS: {}} @@ -48,17 +51,37 @@ def file_name(request): return outfile -@pytest.fixture(autouse=True, scope="package") -def setup(request, file_name): - """`file_name` tests""" +@pytest.fixture(autouse=True, scope="module") +def client_name() -> str: + return "in-process" + + +@pytest.fixture(autouse=True, scope="module") +def resolver_type() -> ResolverType: + return ResolverType.IN_PROCESS + + +@pytest.fixture(autouse=True, scope="module") +def setup(request, client_name, file_name, resolver_type): + """nothing to boot""" api.set_provider( FlagdProvider( - resolver_type=ResolverType.IN_PROCESS, + resolver_type=resolver_type, offline_flag_source_path=file_name.name, - ) + timeout=0.5, + ), + client_name, ) +@given("a flagd provider is set", target_fixture="client") +@given("a provider is registered", target_fixture="client") +def setup_provider(client_name) -> OpenFeatureClient: + client = api.get_client(client_name) + wait_for(lambda: client.get_provider_status() == ProviderStatus.READY) + return client + + @pytest.mark.skip(reason="Eventing not implemented") @scenario("../../test-harness/gherkin/flagd.feature", "Flag change event") def test_flag_change_event(): diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py b/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py index d2fe57e9..d39d90fc 100644 --- a/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py +++ b/providers/openfeature-provider-flagd/tests/e2e/test_rpc.py @@ -1,30 +1,29 @@ import pytest -from pytest_bdd import scenario, scenarios +from pytest_bdd import scenarios from openfeature.contrib.provider.flagd.config import ResolverType -@pytest.fixture(autouse=True, scope="package") +@pytest.fixture(autouse=True, scope="module") +def client_name() -> str: + return "rpc" + + +@pytest.fixture(autouse=True, scope="module") def resolver_type() -> ResolverType: return ResolverType.GRPC -@pytest.fixture(autouse=True, scope="package") +@pytest.fixture(autouse=True, scope="module") def port(): return 8013 -@pytest.fixture(autouse=True, scope="package") +@pytest.fixture(autouse=True, scope="module") def image(): return "ghcr.io/open-feature/flagd-testbed:v0.5.13" -@pytest.mark.skip(reason="Eventing not implemented") -@scenario("../../test-harness/gherkin/flagd.feature", "Flag change event") -def test_flag_change_event(): - """not implemented""" - - scenarios( "../../test-harness/gherkin/flagd.feature", "../../test-harness/gherkin/flagd-json-evaluator.feature", diff --git a/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py new file mode 100644 index 00000000..8a1dec6d --- /dev/null +++ b/providers/openfeature-provider-flagd/tests/e2e/test_rpc_reconnect.py @@ -0,0 +1,35 @@ +import pytest +from pytest_bdd import scenarios + +from openfeature.contrib.provider.flagd.config import ResolverType + + +@pytest.fixture(autouse=True, scope="module") +def client_name() -> str: + return "rpc-reconnect" + + +@pytest.fixture(autouse=True, scope="module") +def resolver_type() -> ResolverType: + return ResolverType.GRPC + + +@pytest.fixture(autouse=True, scope="module") +def port(): + return 8013 + + +@pytest.fixture(autouse=True, scope="module") +def image(): + return "ghcr.io/open-feature/flagd-testbed-unstable:v0.5.13" + + +# @pytest.mark.skip(reason="Reconnect seems to be flacky") +# @scenario("../../test-harness/gherkin/flagd-reconnect.feature", "Provider reconnection") +# def test_flag_change_event(): +# """not implemented""" + + +scenarios( + "../../test-harness/gherkin/flagd-reconnect.feature", +)