Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: blocking provider mutator #251

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 68 additions & 44 deletions openfeature/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package openfeature

import (
"errors"
"fmt"
"sync"

"github.com/go-logr/logr"
Expand All @@ -14,7 +15,7 @@ type evaluationAPI struct {
defaultProvider FeatureProvider
namedProviders map[string]FeatureProvider
hks []Hook
evalCtx EvaluationContext
apiCtx EvaluationContext
logger logr.Logger
mu sync.RWMutex
eventExecutor *eventExecutor
Expand All @@ -28,29 +29,32 @@ func newEvaluationAPI() evaluationAPI {
defaultProvider: NoopProvider{},
namedProviders: map[string]FeatureProvider{},
hks: []Hook{},
evalCtx: EvaluationContext{},
apiCtx: EvaluationContext{},
logger: logger,
mu: sync.RWMutex{},
eventExecutor: newEventExecutor(logger),
}
}

// setProvider sets the default FeatureProvider of the evaluationAPI. Returns an error if FeatureProvider is nil
func (api *evaluationAPI) setProvider(provider FeatureProvider) error {
// setProvider sets the default FeatureProvider of the evaluationAPI.
// Returns an error if provider registration cause an error
func (api *evaluationAPI) setProvider(provider FeatureProvider, async bool) error {
api.mu.Lock()
defer api.mu.Unlock()

if provider == nil {
return errors.New("default provider cannot be set to nil")
}

// Initialize new default provider and shutdown the old one
// Provider update must be non-blocking, hence initialization & shutdown happens concurrently
oldProvider := api.defaultProvider
api.defaultProvider = provider

api.initNewAndShutdownOld(provider, oldProvider)
err := api.eventExecutor.registerDefaultProvider(provider)
err := api.initNewAndShutdownOld(provider, oldProvider, async)
if err != nil {
return err
}

err = api.eventExecutor.registerDefaultProvider(provider)
if err != nil {
return err
}
Expand All @@ -67,7 +71,7 @@ func (api *evaluationAPI) getProvider() FeatureProvider {
}

// setProvider sets a provider with client name. Returns an error if FeatureProvider is nil
func (api *evaluationAPI) setNamedProvider(clientName string, provider FeatureProvider) error {
func (api *evaluationAPI) setNamedProvider(clientName string, provider FeatureProvider, async bool) error {
api.mu.Lock()
defer api.mu.Unlock()

Expand All @@ -80,8 +84,12 @@ func (api *evaluationAPI) setNamedProvider(clientName string, provider FeaturePr
oldProvider := api.namedProviders[clientName]
api.namedProviders[clientName] = provider

api.initNewAndShutdownOld(provider, oldProvider)
err := api.eventExecutor.registerNamedEventingProvider(clientName, provider)
err := api.initNewAndShutdownOld(provider, oldProvider, async)
if err != nil {
return err
}

err = api.eventExecutor.registerNamedEventingProvider(clientName, provider)
if err != nil {
return err
}
Expand All @@ -97,11 +105,11 @@ func (api *evaluationAPI) getNamedProviders() map[string]FeatureProvider {
return api.namedProviders
}

func (api *evaluationAPI) setEvaluationContext(evalCtx EvaluationContext) {
func (api *evaluationAPI) setEvaluationContext(apiCtx EvaluationContext) {
api.mu.Lock()
defer api.mu.Unlock()

api.evalCtx = evalCtx
api.apiCtx = apiCtx
}

func (api *evaluationAPI) setLogger(l logr.Logger) {
Expand Down Expand Up @@ -163,52 +171,68 @@ func (api *evaluationAPI) forTransaction(clientName string) (FeatureProvider, []
provider = api.defaultProvider
}

return provider, api.hks, api.evalCtx
return provider, api.hks, api.apiCtx
}

// initNewAndShutdownOld is a helper to initialise new FeatureProvider and shutdown the old FeatureProvider.
// Operations happen concurrently.
func (api *evaluationAPI) initNewAndShutdownOld(newProvider FeatureProvider, oldProvider FeatureProvider) {
v, ok := newProvider.(StateHandler)
if ok && v.Status() == NotReadyState {
go func(provider FeatureProvider, stateHandler StateHandler, evalCtx EvaluationContext, eventChan chan eventPayload) {
err := stateHandler.Init(evalCtx)
// emit ready/error event once initialization is complete
if err != nil {
eventChan <- eventPayload{
Event{
ProviderName: provider.Metadata().Name,
EventType: ProviderError,
ProviderEventDetails: ProviderEventDetails{},
}, provider,
}
} else {
eventChan <- eventPayload{
Event{
ProviderName: provider.Metadata().Name,
EventType: ProviderReady,
ProviderEventDetails: ProviderEventDetails{},
}, provider,
}
}
}(newProvider, v, api.evalCtx, api.eventExecutor.eventChan)
}

v, ok = oldProvider.(StateHandler)
func (api *evaluationAPI) initNewAndShutdownOld(newProvider FeatureProvider, oldProvider FeatureProvider, async bool) error {
if async {
go func(executor *eventExecutor, ctx EvaluationContext) {
// for async initialization, error is conveyed as an event
event, _ := initializer(newProvider, ctx)
executor.triggerEvent(event, newProvider)
}(api.eventExecutor, api.apiCtx)
} else {
event, err := initializer(newProvider, api.apiCtx)
api.eventExecutor.triggerEvent(event, newProvider)
if err != nil {
return err
}
}

v, ok := oldProvider.(StateHandler)

// oldProvider can be nil or without state handling capability
if oldProvider == nil || !ok {
return
return nil
}

// check for multiple bindings
if oldProvider == api.defaultProvider || contains(oldProvider, maps.Values(api.namedProviders)) {
return
return nil
}

go func(forShutdown StateHandler) {
forShutdown.Shutdown()
}(v)

return nil
}

// initializer is a helper to execute provider initialization and generate appropriate event for the initialization
// It also returns an error if the initialization resulted in an error
func initializer(provider FeatureProvider, apiCtx EvaluationContext) (Event, error) {
var event = Event{
ProviderName: provider.Metadata().Name,
EventType: ProviderReady,
ProviderEventDetails: ProviderEventDetails{
Message: "Provider initialization successful",
},
}

handler, ok := provider.(StateHandler)
if !ok {
// Note - a provider without state handling capability can be assumed to be ready immediately.
return event, nil
}

err := handler.Init(apiCtx)
if err != nil {
event.EventType = ProviderError
event.Message = fmt.Sprintf("Provider initialization error, %v", err)
toddbaert marked this conversation as resolved.
Show resolved Hide resolved
}

return event, err
}

func contains(provider FeatureProvider, in []FeatureProvider) bool {
Expand Down
2 changes: 1 addition & 1 deletion openfeature/event_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (e *eventExecutor) triggerEvent(event Event, handler FeatureProvider) {
}
}

if e.defaultProviderReference.featureProvider != handler {
if !reflect.DeepEqual(e.defaultProviderReference.featureProvider, handler) {
return
}

Expand Down
29 changes: 16 additions & 13 deletions openfeature/event_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func TestEventHandler_Eventing(t *testing.T) {
rsp <- details
}

AddHandler(ProviderReady, &callBack)
eventType := ProviderConfigChange
AddHandler(eventType, &callBack)

fCh := []string{"flagA"}
meta := map[string]interface{}{
Expand All @@ -91,7 +92,7 @@ func TestEventHandler_Eventing(t *testing.T) {

// trigger event from provider implementation
eventingImpl.Invoke(Event{
EventType: ProviderReady,
EventType: eventType,
ProviderEventDetails: ProviderEventDetails{
Message: "ReadyMessage",
FlagChanges: fCh,
Expand Down Expand Up @@ -139,7 +140,7 @@ func TestEventHandler_Eventing(t *testing.T) {
// associated to client name
associatedName := "providerForClient"

err := SetNamedProvider(associatedName, eventingProvider)
err := SetNamedProviderAndWait(associatedName, eventingProvider)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -214,13 +215,13 @@ func TestEventHandler_clientAssociation(t *testing.T) {
}

// default provider
err := SetProvider(defaultProvider)
err := SetProviderAndWait(defaultProvider)
if err != nil {
t.Fatal(err)
}

// named provider(associated to name someClient)
err = SetNamedProvider("someClient", struct {
err = SetNamedProviderAndWait("someClient", struct {
FeatureProvider
EventHandler
}{
Expand Down Expand Up @@ -273,7 +274,7 @@ func TestEventHandler_ErrorHandling(t *testing.T) {
eventing,
}

errorCallback := func(e EventDetails) {
failingCallback := func(e EventDetails) {
panic("callback panic")
}

Expand All @@ -292,24 +293,26 @@ func TestEventHandler_ErrorHandling(t *testing.T) {
t.Fatal(err)
}

successEventType := ProviderStale

// api level handlers
AddHandler(ProviderReady, &errorCallback)
AddHandler(ProviderReady, &successAPICallback)
AddHandler(ProviderConfigChange, &failingCallback)
AddHandler(successEventType, &successAPICallback)

// provider association
providerName := "providerA"

client := NewClient(providerName)

// client level handlers
client.AddHandler(ProviderReady, &errorCallback)
client.AddHandler(ProviderReady, &successClientCallback)
client.AddHandler(ProviderConfigChange, &failingCallback)
client.AddHandler(successEventType, &successClientCallback)

// trigger events manually
go func() {
eventing.Invoke(Event{
ProviderName: providerName,
EventType: ProviderReady,
EventType: successEventType,
ProviderEventDetails: ProviderEventDetails{},
})
}()
Expand Down Expand Up @@ -967,7 +970,7 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
},
}

if err := SetProvider(provider); err != nil {
if err := SetProviderAndWait(provider); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -1001,7 +1004,7 @@ func TestEventHandler_HandlersRunImmediately(t *testing.T) {
},
}

if err := SetProvider(provider); err != nil {
if err := SetProviderAndWait(provider); err != nil {
t.Fatal(err)
}

Expand Down
16 changes: 14 additions & 2 deletions openfeature/openfeature.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,25 @@ func initSingleton() {
// SetProvider sets the default provider. Provider initialization is asynchronous and status can be checked from
// provider status
func SetProvider(provider FeatureProvider) error {
return api.setProvider(provider)
return api.setProvider(provider, true)
}

// SetProviderAndWait sets the default provider and waits for its initialization.
// Returns an error if initialization cause error
func SetProviderAndWait(provider FeatureProvider) error {
return api.setProvider(provider, false)
toddbaert marked this conversation as resolved.
Show resolved Hide resolved
}

// SetNamedProvider sets a provider mapped to the given Client name. Provider initialization is asynchronous and
// status can be checked from provider status
func SetNamedProvider(clientName string, provider FeatureProvider) error {
return api.setNamedProvider(clientName, provider)
return api.setNamedProvider(clientName, provider, true)
}

// SetNamedProviderAndWait sets a provider mapped to the given Client name and waits for its initialization.
// Returns an error if initialization cause error
func SetNamedProviderAndWait(clientName string, provider FeatureProvider) error {
return api.setNamedProvider(clientName, provider, false)
}

// SetEvaluationContext sets the global evaluation context.
Expand Down
Loading
Loading