Skip to content

Commit

Permalink
fix(flagd): adding events for rpc mode
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Schrottner <simon.schrottner@dynatrace.com>
  • Loading branch information
aepfli committed Nov 22, 2024
1 parent 038a343 commit 5055533
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Client and server classes corresponding to protobuf-defined services."""
import grpc

from flagd.evaluation.v1 import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2
from . import evaluation_pb2 as flagd_dot_evaluation_dot_v1_dot_evaluation__pb2


class ServiceStub(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,22 @@ def __init__( # noqa: PLR0913

def setup_resolver(self) -> AbstractResolver:
if self.config.resolver_type == ResolverType.GRPC:
return GrpcResolver(self.config)
return GrpcResolver(
self.config,
self.emit_provider_ready,
self.emit_provider_error,
self.emit_provider_configuration_changed,
)
elif self.config.resolver_type == ResolverType.IN_PROCESS:
return InProcessResolver(self.config, self)
else:
raise ValueError(
f"`resolver_type` parameter invalid: {self.config.resolver_type}"
)

def initialize(self, evaluation_context: EvaluationContext) -> None:
self.resolver.initialize(evaluation_context)

def shutdown(self) -> None:
if self.resolver:
self.resolver.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@


class AbstractResolver(typing.Protocol):
def initialize(self, evaluation_context: EvaluationContext) -> None: ...

def shutdown(self) -> None: ...

def resolve_boolean_details(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import logging
import threading
import time
import typing

import grpc
from google.protobuf.json_format import MessageToDict
from google.protobuf.struct_pb2 import Struct

from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEventDetails
from openfeature.exception import (
ErrorCode,
FlagNotFoundError,
GeneralError,
InvalidContextError,
Expand All @@ -16,23 +21,99 @@

from ..config import Config
from ..flag_type import FlagType
from ..proto.schema.v1 import schema_pb2, schema_pb2_grpc
from ..proto.flagd.evaluation.v1 import evaluation_pb2, evaluation_pb2_grpc

T = typing.TypeVar("T")

logger = logging.getLogger("openfeature.contrib")


class GrpcResolver:
def __init__(self, config: Config):
MAX_BACK_OFF = 120

def __init__(
self,
config: Config,
emit_provider_ready: typing.Callable[[ProviderEventDetails], None],
emit_provider_error: typing.Callable[[ProviderEventDetails], None],
emit_provider_configuration_changed: typing.Callable[
[ProviderEventDetails], None
],
):
self.config = config
self.emit_provider_ready = emit_provider_ready
self.emit_provider_error = emit_provider_error
self.emit_provider_configuration_changed = emit_provider_configuration_changed
channel_factory = (
grpc.secure_channel if self.config.tls else grpc.insecure_channel
)
self.channel = channel_factory(f"{self.config.host}:{self.config.port}")
self.stub = schema_pb2_grpc.ServiceStub(self.channel)
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)
self.retry_backoff_seconds = 0.1
self.connected = False

def initialize(self, evaluation_context: EvaluationContext) -> None:
self.connect()

def shutdown(self) -> None:
self.active = False
self.channel.close()

def connect(self) -> None:
self.active = True
self.thread = threading.Thread(
target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread"
)
self.thread.start()

def listen(self) -> None:
retry_delay = self.retry_backoff_seconds
while self.active:
request = evaluation_pb2.EventStreamRequest() # type:ignore[attr-defined]
try:
logger.debug("Setting up gRPC sync flags connection")
for message in self.stub.EventStream(request):
if message.type == "provider_ready":
if not self.connected:
self.emit_provider_ready(
ProviderEventDetails(
message="gRPC sync connection established"
)
)
self.connected = True
# reset retry delay after successsful read
retry_delay = self.retry_backoff_seconds

elif message.type == "configuration_change":
data = MessageToDict(message)["data"]
self.handle_changed_flags(data)

if not self.active:
logger.info("Terminating gRPC sync thread")
return

Check warning on line 93 in providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

View check run for this annotation

Codecov / codecov/patch

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py#L92-L93

Added lines #L92 - L93 were not covered by tests
except grpc.RpcError as e:
logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}")
except ParseError:
logger.exception(

Check warning on line 97 in providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

View check run for this annotation

Codecov / codecov/patch

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py#L96-L97

Added lines #L96 - L97 were not covered by tests
f"Could not parse flag data using flagd syntax: {message=}"
)

self.connected = False
self.emit_provider_error(
ProviderEventDetails(
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
error_code=ErrorCode.GENERAL,
)
)
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
time.sleep(retry_delay)
retry_delay = min(2 * retry_delay, self.MAX_BACK_OFF)

def handle_changed_flags(self, data: typing.Any) -> None:
changed_flags = list(data["flags"].keys())

self.emit_provider_configuration_changed(ProviderEventDetails(changed_flags))

def resolve_boolean_details(
self,
key: str,
Expand Down Expand Up @@ -84,33 +165,33 @@ def _resolve( # noqa: PLR0915
call_args = {"timeout": self.config.timeout}
try:
if flag_type == FlagType.BOOLEAN:
request = schema_pb2.ResolveBooleanRequest( # type:ignore[attr-defined]
request = evaluation_pb2.ResolveBooleanRequest( # type:ignore[attr-defined]
flag_key=flag_key, context=context
)
response = self.stub.ResolveBoolean(request, **call_args)
value = response.value
elif flag_type == FlagType.STRING:
request = schema_pb2.ResolveStringRequest( # type:ignore[attr-defined]
request = evaluation_pb2.ResolveStringRequest( # type:ignore[attr-defined]
flag_key=flag_key, context=context
)
response = self.stub.ResolveString(request, **call_args)
value = response.value
elif flag_type == FlagType.OBJECT:
request = schema_pb2.ResolveObjectRequest( # type:ignore[attr-defined]
request = evaluation_pb2.ResolveObjectRequest( # type:ignore[attr-defined]
flag_key=flag_key, context=context
)
response = self.stub.ResolveObject(request, **call_args)
value = MessageToDict(response, preserving_proto_field_name=True)[
"value"
]
elif flag_type == FlagType.FLOAT:
request = schema_pb2.ResolveFloatRequest( # type:ignore[attr-defined]
request = evaluation_pb2.ResolveFloatRequest( # type:ignore[attr-defined]
flag_key=flag_key, context=context
)
response = self.stub.ResolveFloat(request, **call_args)
value = response.value
elif flag_type == FlagType.INTEGER:
request = schema_pb2.ResolveIntRequest( # type:ignore[attr-defined]
request = evaluation_pb2.ResolveIntRequest( # type:ignore[attr-defined]
flag_key=flag_key, context=context
)
response = self.stub.ResolveInt(request, **call_args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from openfeature.evaluation_context import EvaluationContext
from openfeature.exception import FlagNotFoundError, ParseError
from openfeature.flag_evaluation import FlagResolutionDetails, Reason
from openfeature.provider.provider import AbstractProvider
from openfeature.provider import AbstractProvider

from ..config import Config
from .process.file_watcher import FileWatcherFlagStore
Expand All @@ -26,6 +26,9 @@ def __init__(self, config: Config, provider: AbstractProvider):
self.config.offline_poll_interval_seconds,
)

def initialize(self, evaluation_context: EvaluationContext) -> None:
pass

def shutdown(self) -> None:
self.flag_store.shutdown()

Expand Down
15 changes: 4 additions & 11 deletions providers/openfeature-provider-flagd/tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,22 @@
from tests.e2e.flagd_container import FlagdContainer
from tests.e2e.steps import * # noqa: F403

from openfeature import api
from openfeature.contrib.provider.flagd import FlagdProvider

JsonPrimitive = typing.Union[str, bool, float, int]


@pytest.fixture(autouse=True, scope="package")
def setup(request, port, image, resolver_type):
@pytest.fixture(autouse=True, scope="module")
def setup(request, port, image):
container: DockerContainer = FlagdContainer(
image=image,
port=port,
)
# Setup code
c = container.start()
api.set_provider(
FlagdProvider(
resolver_type=resolver_type,
port=int(container.get_exposed_port(port)),
)
)

def fin():
c.stop()

# Teardown code
request.addfinalizer(fin)

return c.get_exposed_port(port)
Loading

0 comments on commit 5055533

Please sign in to comment.