diff --git a/internal/datasource/streaming_data_source_events.go b/internal/datasource/streaming_data_source_events.go index 4c03ac62..20408dce 100644 --- a/internal/datasource/streaming_data_source_events.go +++ b/internal/datasource/streaming_data_source_events.go @@ -16,7 +16,7 @@ var ( deleteDataRequiredProperties = []string{"path", "version"} //nolint:gochecknoglobals ) -// PutData is the logical representation of the data in the "put" event. In the JSON representation, +// putData is the logical representation of the data in the "put" event. In the JSON representation, // the "data" property is actually a map of maps, but the schema we use internally is a list of // lists instead. // @@ -37,12 +37,12 @@ var ( // } // } // } -type PutData struct { +type putData struct { Path string // we don't currently do anything with this Data []ldstoretypes.Collection } -// PatchData is the logical representation of the data in the "patch" event. In the JSON representation, +// patchData is the logical representation of the data in the "patch" event. In the JSON representation, // there is a "path" property in the format "/flags/key" or "/segments/key", which we convert into // Kind and Key when we parse it. The "data" property is the JSON representation of the flag or // segment, which we deserialize into an ItemDescriptor. @@ -56,13 +56,13 @@ type PutData struct { // "version": 2, ...etc. // } // } -type PatchData struct { +type patchData struct { Kind ldstoretypes.DataKind Key string Data ldstoretypes.ItemDescriptor } -// DeleteData is the logical representation of the data in the "delete" event. In the JSON representation, +// deleteData is the logical representation of the data in the "delete" event. In the JSON representation, // there is a "path" property in the format "/flags/key" or "/segments/key", which we convert into // Kind and Key when we parse it. // @@ -72,14 +72,14 @@ type PatchData struct { // "path": "/flags/flagkey", // "version": 3 // } -type DeleteData struct { +type deleteData struct { Kind ldstoretypes.DataKind Key string Version int } -func parsePutData(data []byte) (PutData, error) { - var ret PutData +func parsePutData(data []byte) (putData, error) { + var ret putData r := jreader.NewReader(data) for obj := r.Object().WithRequiredProperties(putDataRequiredProperties); obj.Next(); { switch string(obj.Name()) { @@ -92,15 +92,15 @@ func parsePutData(data []byte) (PutData, error) { return ret, r.Error() } -func parsePatchData(data []byte) (PatchData, error) { - var ret PatchData +func parsePatchData(data []byte) (patchData, error) { + var ret patchData r := jreader.NewReader(data) var kind datakinds.DataKindInternal var key string - parseItem := func() (PatchData, error) { + parseItem := func() (patchData, error) { item, err := kind.DeserializeFromJSONReader(&r) if err != nil { - return PatchData{}, err + return patchData{}, err } ret.Data = item return ret, nil @@ -126,7 +126,7 @@ func parsePatchData(data []byte) (PatchData, error) { } } if err := r.Error(); err != nil { - return PatchData{}, err + return patchData{}, err } // If we got here, it means we couldn't parse the data model object yet because we saw the // "data" property first. But we definitely saw both properties (otherwise we would've got @@ -138,13 +138,13 @@ func parsePatchData(data []byte) (PatchData, error) { } } if r.Error() != nil { - return PatchData{}, r.Error() + return patchData{}, r.Error() } - return PatchData{}, errors.New("patch event had no data property") + return patchData{}, errors.New("patch event had no data property") } -func parseDeleteData(data []byte) (DeleteData, error) { - var ret DeleteData +func parseDeleteData(data []byte) (deleteData, error) { + var ret deleteData r := jreader.NewReader(data) for obj := r.Object().WithRequiredProperties(deleteDataRequiredProperties); obj.Next(); { switch string(obj.Name()) { @@ -161,7 +161,7 @@ func parseDeleteData(data []byte) (DeleteData, error) { } } if r.Error() != nil { - return DeleteData{}, r.Error() + return deleteData{}, r.Error() } return ret, nil } diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index 131f2f4f..2311fd54 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -4,12 +4,12 @@ import ( "sync" "time" - "github.com/launchdarkly/go-server-sdk/v7/internal/datasource" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" "github.com/launchdarkly/go-sdk-common/v3/ldlog" "github.com/launchdarkly/go-server-sdk/v7/interfaces" "github.com/launchdarkly/go-server-sdk/v7/internal" + "github.com/launchdarkly/go-server-sdk/v7/internal/datasource" "github.com/launchdarkly/go-server-sdk/v7/subsystems" ) @@ -18,10 +18,10 @@ const ( pollingWillRetryMessage = "will retry at next scheduled poll interval" ) -// Requester allows PollingProcessor to delegate fetching data to another component. +// PollingRequester allows PollingProcessor to delegate fetching data to another component. // This is useful for testing the PollingProcessor without needing to set up a test HTTP server. -type Requester interface { - Request() (data []ldstoretypes.Collection, cached bool, err error) +type PollingRequester interface { + Request() (*PollingResponse, error) BaseURI() string FilterKey() string } @@ -34,7 +34,7 @@ type Requester interface { type PollingProcessor struct { dataDestination subsystems.DataDestination statusReporter subsystems.DataSourceStatusReporter - requester Requester + requester PollingRequester pollInterval time.Duration loggers ldlog.Loggers setInitializedOnce sync.Once @@ -58,7 +58,7 @@ func newPollingProcessor( context subsystems.ClientContext, dataDestination subsystems.DataDestination, statusReporter subsystems.DataSourceStatusReporter, - requester Requester, + requester PollingRequester, pollInterval time.Duration, ) *PollingProcessor { pp := &PollingProcessor{ @@ -142,16 +142,23 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) { } func (pp *PollingProcessor) poll() error { - allData, cached, err := pp.requester.Request() + response, err := pp.requester.Request() if err != nil { return err } - // We initialize the store only if the request wasn't cached - if !cached { - pp.dataDestination.Init(allData, nil) + if response.Cached() { + return nil } + + switch response.Intent() { + case fdv2proto.IntentTransferFull: + pp.dataDestination.SetBasis(response.Events(), response.Selector(), true) + case fdv2proto.IntentTransferChanges: + pp.dataDestination.ApplyDelta(response.Events(), response.Selector(), true) + } + return nil } diff --git a/internal/datasourcev2/polling_http_request.go b/internal/datasourcev2/polling_http_request.go index 447a03de..0cd81ddf 100644 --- a/internal/datasourcev2/polling_http_request.go +++ b/internal/datasourcev2/polling_http_request.go @@ -8,9 +8,10 @@ import ( "net/http" "net/url" - es "github.com/launchdarkly/eventsource" + "github.com/launchdarkly/go-jsonstream/v3/jreader" + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + "github.com/launchdarkly/go-sdk-common/v3/ldlog" - "github.com/launchdarkly/go-server-sdk/v7/internal/datasource" "github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" "github.com/launchdarkly/go-server-sdk/v7/subsystems" "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" @@ -68,42 +69,121 @@ func (r *pollingRequester) BaseURI() string { func (r *pollingRequester) FilterKey() string { return r.filterKey } -func (r *pollingRequester) Request() ([]ldstoretypes.Collection, bool, error) { + +func (r *pollingRequester) Request() (*PollingResponse, error) { if r.loggers.IsDebugEnabled() { r.loggers.Debug("Polling LaunchDarkly for feature flag updates") } body, cached, err := r.makeRequest(endpoints.PollingRequestPath) if err != nil { - return nil, false, err + return nil, err } if cached { - return nil, true, nil + return NewCachedPollingResponse(), nil } - var payload pollingPayload + var payload fdv2proto.PollingPayload if err = json.Unmarshal(body, &payload); err != nil { - return nil, false, malformedJSONError{err} + return nil, malformedJSONError{err} } - esEvents := make([]es.Event, 0, len(payload.Events)) - for _, event := range payload.Events { - esEvents = append(esEvents, event) + parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) { + dataKind, err := kind.ToFDV1() + if err != nil { + return ldstoretypes.ItemDescriptor{}, err + } + item, err := dataKind.DeserializeFromJSONReader(&r) + return item, err } - data, err := convertChangesetEventsToPutData(esEvents) - if err != nil { - return nil, false, malformedJSONError{err} - } else if len(data) != 1 { - return nil, false, malformedJSONError{errors.New("missing expected put event")} - } + updates := make([]fdv2proto.Event, 0, len(payload.Events)) - putData, ok := data[0].(datasource.PutData) - if !ok { - return nil, false, malformedJSONError{errors.New("payload is not a PutData")} - } + var intentCode fdv2proto.IntentCode - return putData.Data, cached, nil + for _, event := range payload.Events { + switch event.Name { + case fdv2proto.EventServerIntent: + { + var serverIntent fdv2proto.ServerIntent + err := json.Unmarshal(event.Data, &serverIntent) + if err != nil { + return nil, err + } else if len(serverIntent.Payloads) == 0 { + return nil, errors.New("server-intent event has no payloads") + } + + intentCode = serverIntent.Payloads[0].Code + if intentCode == fdv2proto.IntentNone { + return NewCachedPollingResponse(), nil + } + } + case fdv2proto.EventPutObject: + { + r := jreader.NewReader(event.Data) + + var ( + key string + kind fdv2proto.ObjectKind + item ldstoretypes.ItemDescriptor + err error + version int + ) + + for obj := r.Object().WithRequiredProperties([]string{ + versionField, kindField, keyField, objectField}); obj.Next(); { + switch string(obj.Name()) { + case versionField: + version = r.Int() + case kindField: + kind = fdv2proto.ObjectKind(r.String()) + case keyField: + key = r.String() + case objectField: + item, err = parseItem(r, kind) + if err != nil { + return nil, err + } + } + } + updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item, Version: version}) + } + case fdv2proto.EventDeleteObject: + { + r := jreader.NewReader(event.Data) + + var ( + version int + kind fdv2proto.ObjectKind + key string + ) + + for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); { + switch string(obj.Name()) { + case versionField: + version = r.Int() + case kindField: + kind = fdv2proto.ObjectKind(r.String()) + //nolint:godox + // TODO: An unrecognized kind should be ignored for forwards compat; the question is, + // do we throw out the DeleteObject here, or let the SDK's store handle it? + case keyField: + key = r.String() + } + } + updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version}) + } + case fdv2proto.EventPayloadTransferred: + //nolint:godox + // TODO: deserialize the state and create a fdv2proto.Selector. + } + } + + if intentCode == "" { + return nil, errors.New("no server-intent event found in polling response") + } + + return NewPollingResponse(intentCode, updates, fdv2proto.NoSelector()), nil } func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) { diff --git a/internal/datasourcev2/polling_response.go b/internal/datasourcev2/polling_response.go new file mode 100644 index 00000000..80b2c36f --- /dev/null +++ b/internal/datasourcev2/polling_response.go @@ -0,0 +1,48 @@ +package datasourcev2 + +import "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + +// PollingResponse represents the result of a polling request. +type PollingResponse struct { + events []fdv2proto.Event + cached bool + intent fdv2proto.IntentCode + selector *fdv2proto.Selector +} + +// NewCachedPollingResponse indicates that the response has not changed. +func NewCachedPollingResponse() *PollingResponse { + return &PollingResponse{ + cached: true, + } +} + +// NewPollingResponse indicates that data was received. +func NewPollingResponse(intent fdv2proto.IntentCode, events []fdv2proto.Event, + selector *fdv2proto.Selector) *PollingResponse { + return &PollingResponse{ + events: events, + intent: intent, + selector: selector, + } +} + +// Events returns the events in the response. +func (p *PollingResponse) Events() []fdv2proto.Event { + return p.events +} + +// Cached returns true if the response was cached, meaning data has not changed. +func (p *PollingResponse) Cached() bool { + return p.cached +} + +// Intent returns the server intent code of the response. +func (p *PollingResponse) Intent() fdv2proto.IntentCode { + return p.intent +} + +// Selector returns the Selector of the response. +func (p *PollingResponse) Selector() *fdv2proto.Selector { + return p.selector +} diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index 00b46598..3c02812d 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -5,17 +5,17 @@ import ( "errors" "net/http" "net/url" - "strings" "sync" "time" + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + "github.com/launchdarkly/go-jsonstream/v3/jreader" "github.com/launchdarkly/go-sdk-common/v3/ldlog" "github.com/launchdarkly/go-sdk-common/v3/ldtime" ldevents "github.com/launchdarkly/go-sdk-events/v3" "github.com/launchdarkly/go-server-sdk/v7/interfaces" "github.com/launchdarkly/go-server-sdk/v7/internal" - "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" "github.com/launchdarkly/go-server-sdk/v7/internal/datasource" "github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" "github.com/launchdarkly/go-server-sdk/v7/subsystems" @@ -30,9 +30,7 @@ const ( keyField = "key" kindField = "kind" versionField = "version" - - putEventName = "put-object" - deleteEventName = "delete-object" + objectField = "object" streamReadTimeout = 5 * time.Minute // the LaunchDarkly stream should send a heartbeat comment every 3 minutes streamMaxRetryDelay = 30 * time.Second @@ -44,6 +42,11 @@ const ( streamingWillRetryMessage = "will retry" ) +type changeSet struct { + intent *fdv2proto.ServerIntent + events []es.Event +} + // Implementation of the streaming data source, not including the lower-level SSE implementation which is in // the eventsource package. // @@ -86,6 +89,7 @@ type StreamProcessor struct { connectionAttemptLock sync.Mutex readyOnce sync.Once closeOnce sync.Once + persist bool } // NewStreamProcessor creates the internal implementation of the streaming data source. @@ -102,6 +106,7 @@ func NewStreamProcessor( loggers: context.GetLogging().Loggers, halt: make(chan struct{}), cfg: cfg, + persist: true, } if cci, ok := context.(*internal.ClientContextImpl); ok { sp.diagnosticsManager = cci.DiagnosticsManager @@ -128,9 +133,6 @@ func (sp *StreamProcessor) Start(closeWhenReady chan<- struct{}) { go sp.subscribe(closeWhenReady) } -// TODO: Remove this nolint once we have a better implementation. -// -//nolint:gocyclo,godox // this function is a stepping stone. It will get better over time. func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<- struct{}) { // Consume remaining Events and Errors so we can garbage collect defer func() { @@ -160,6 +162,9 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< sp.logConnectionResult(true) + //nolint:godox + // TODO(cwaldren/mkeeler): Should this actually be true by default? It means if we receive an event + // we don't understand then we go to the Valid state. processedEvent := true shouldRestart := false @@ -188,19 +193,13 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< processedEvent = false } - storeUpdateFailed := func(updateDesc string) { - sp.loggers.Errorf("Failed to store %s in data store; will restart stream until successful", updateDesc) - shouldRestart = true // scenario 2b - processedEvent = false - } - - switch event.Event() { - case "heart-beat": + switch fdv2proto.EventName(event.Event()) { + case fdv2proto.EventHeartbeat: // Swallow the event and move on. - case "server-intent": + case fdv2proto.EventServerIntent: //nolint: godox // TODO: Replace all this json unmarshalling with a nicer jreader implementation. - var serverIntent serverIntent + var serverIntent fdv2proto.ServerIntent err := json.Unmarshal([]byte(event.Data()), &serverIntent) if err != nil { gotMalformedEvent(event, err) @@ -217,12 +216,12 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< currentChangeSet = changeSet{events: make([]es.Event, 0), intent: &serverIntent} - case putEventName: + case fdv2proto.EventPutObject: currentChangeSet.events = append(currentChangeSet.events, event) - case deleteEventName: + case fdv2proto.EventDeleteObject: currentChangeSet.events = append(currentChangeSet.events, event) - case "goodbye": - var goodbye goodbye + case fdv2proto.EventGoodbye: + var goodbye fdv2proto.Goodbye err := json.Unmarshal([]byte(event.Data()), &goodbye) if err != nil { gotMalformedEvent(event, err) @@ -230,10 +229,10 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< } if !goodbye.Silent { - sp.loggers.Errorf("SSE server received error: %s (%s)", goodbye.Reason, goodbye.Catastrophe) + sp.loggers.Errorf("SSE server received error: %s (%v)", goodbye.Reason, goodbye.Catastrophe) } - case "error": - var errorData errorEvent + case fdv2proto.EventError: + var errorData fdv2proto.Error err := json.Unmarshal([]byte(event.Data()), &errorData) if err != nil { //nolint: godox @@ -249,36 +248,34 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< currentChangeSet = changeSet{events: make([]es.Event, 0)} //nolint: godox // TODO: Do we need to restart here? - case "payload-transferred": + case fdv2proto.EventPayloadTransferred: + + selector := &fdv2proto.Selector{} + + err := json.Unmarshal([]byte(event.Data()), selector) + if err != nil { + gotMalformedEvent(event, err) + break + } + currentChangeSet.events = append(currentChangeSet.events, event) - updates, err := processChangeset(currentChangeSet) + updates, err := deserializeEvents(currentChangeSet.events) if err != nil { sp.loggers.Errorf("Error processing changeset: %s", err) gotMalformedEvent(nil, err) break } - for _, update := range updates { - switch u := update.(type) { - case datasource.PatchData: - if !sp.dataDestination.Upsert(u.Kind, u.Key, u.Data) { - storeUpdateFailed("streaming update of " + u.Key) - } - case datasource.PutData: - if sp.dataDestination.Init(u.Data, nil) { - sp.setInitializedAndNotifyClient(true, closeWhenReady) - } else { - storeUpdateFailed("initial streaming data") - } - case datasource.DeleteData: - deletedItem := ldstoretypes.ItemDescriptor{Version: u.Version, Item: nil} - if !sp.dataDestination.Upsert(u.Kind, u.Key, deletedItem) { - storeUpdateFailed("streaming deletion of " + u.Key) - } - - default: - sp.loggers.Infof("Unexpected update found in changeset: %s", update) + + switch currentChangeSet.intent.Payloads[0].Code { + case fdv2proto.IntentTransferFull: + { + sp.dataDestination.SetBasis(updates, selector, true) + sp.setInitializedAndNotifyClient(true, closeWhenReady) } + case fdv2proto.IntentTransferChanges: + sp.dataDestination.ApplyDelta(updates, selector, true) } + currentChangeSet = changeSet{events: make([]es.Event, 0)} default: sp.loggers.Infof("Unexpected event found in stream: %s", event.Event()) @@ -449,154 +446,77 @@ func (sp *StreamProcessor) GetFilterKey() string { return sp.cfg.FilterKey } -func processChangeset(changeSet changeSet) ([]any, error) { - if changeSet.intent == nil || changeSet.intent.Payloads[0].Code != "xfer-full" { - return convertChangesetEventsToPatchData(changeSet.events) - } - - return convertChangesetEventsToPutData(changeSet.events) -} - -func convertChangesetEventsToPatchData(events []es.Event) ([]any, error) { - updates := make([]interface{}, 0, len(events)) +func deserializeEvents(events []es.Event) ([]fdv2proto.Event, error) { + updates := make([]fdv2proto.Event, 0, len(events)) - parseItem := func(r jreader.Reader, kind datakinds.DataKindInternal) (ldstoretypes.ItemDescriptor, error) { - item, err := kind.DeserializeFromJSONReader(&r) + parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) { + dataKind, err := kind.ToFDV1() + if err != nil { + return ldstoretypes.ItemDescriptor{}, err + } + item, err := dataKind.DeserializeFromJSONReader(&r) return item, err } for _, event := range events { - switch event.Event() { - case putEventName: + switch fdv2proto.EventName(event.Event()) { + case fdv2proto.EventPutObject: r := jreader.NewReader([]byte(event.Data())) - // var version int - var dataKind datakinds.DataKindInternal - var key string - var item ldstoretypes.ItemDescriptor - var err error - for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField, "object"}); obj.Next(); { + var ( + kind fdv2proto.ObjectKind + key string + version int + item ldstoretypes.ItemDescriptor + err error + ) + + for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField, objectField}); obj.Next(); { switch string(obj.Name()) { case versionField: - // version = r.Int() + version = r.Int() case kindField: - kind := r.String() - dataKind = dataKindFromKind(kind) - if dataKind == nil { - //nolint: godox - // TODO: We are skipping here without showing a warning. Need to address that later. - continue - } + kind = fdv2proto.ObjectKind(r.String()) + //nolint:godox + // TODO: An unrecognized kind should be ignored for forwards compat; the question is, + // do we throw out the DeleteObject here, or let the SDK's store handle it? case keyField: key = r.String() - case "object": - item, err = parseItem(r, dataKind) + case objectField: + item, err = parseItem(r, kind) if err != nil { return updates, err } } } - - patchData := datasource.PatchData{Kind: dataKind, Key: key, Data: item} - updates = append(updates, patchData) - case deleteEventName: + updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item.Item, Version: version}) + case fdv2proto.EventDeleteObject: r := jreader.NewReader([]byte(event.Data())) - var version int - var dataKind datakinds.DataKindInternal - var kind, key string + + var ( + version int + kind fdv2proto.ObjectKind + key string + ) for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); { switch string(obj.Name()) { case versionField: version = r.Int() case kindField: - kind = strings.TrimRight(r.String(), "s") - dataKind = dataKindFromKind(kind) - if dataKind == nil { - //nolint: godox - // TODO: We are skipping here without showing a warning. Need to address that later. - continue - } + kind = fdv2proto.ObjectKind(r.String()) + //nolint:godox + // TODO: An unrecognized kind should be ignored for forwards compat; the question is, + // do we throw out the DeleteObject here, or let the SDK's store handle it? case keyField: key = r.String() } } - patchData := datasource.DeleteData{Kind: dataKind, Key: key, Version: version} - updates = append(updates, patchData) + updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version}) } } return updates, nil } -func convertChangesetEventsToPutData(events []es.Event) ([]any, error) { - segmentCollection := ldstoretypes.Collection{ - Kind: datakinds.Segments, - Items: make([]ldstoretypes.KeyedItemDescriptor, 0)} - flagCollection := ldstoretypes.Collection{ - Kind: datakinds.Features, - Items: make([]ldstoretypes.KeyedItemDescriptor, 0)} - - parseItem := func(r jreader.Reader, kind datakinds.DataKindInternal) (ldstoretypes.ItemDescriptor, error) { - item, err := kind.DeserializeFromJSONReader(&r) - return item, err - } - - for _, event := range events { - switch event.Event() { - case putEventName: - r := jreader.NewReader([]byte(event.Data())) - // var version int - var kind, key string - var item ldstoretypes.ItemDescriptor - var err error - var dataKind datakinds.DataKindInternal - - for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, "key", "object"}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - // version = r.Int() - case kindField: - kind = strings.TrimRight(r.String(), "s") - dataKind = dataKindFromKind(kind) - case "key": - key = r.String() - case "object": - item, err = parseItem(r, dataKind) - if err != nil { - return []any{}, err - } - } - } - - //nolint: godox - // TODO: What is the actual name we should use here? - if kind == "flag" { - flagCollection.Items = append(flagCollection.Items, ldstoretypes.KeyedItemDescriptor{Key: key, Item: item}) - } else if kind == "segment" { - segmentCollection.Items = append(segmentCollection.Items, ldstoretypes.KeyedItemDescriptor{Key: key, Item: item}) - } - case deleteEventName: - // NOTE: We can skip this. We are replacing everything in the - // store so who cares if something was deleted. This shouldn't - // even occur really. - } - } - - putData := datasource.PutData{Path: "/", Data: []ldstoretypes.Collection{flagCollection, segmentCollection}} - - return []any{putData}, nil -} - -func dataKindFromKind(kind string) datakinds.DataKindInternal { - switch kind { - case "flag": - return datakinds.Features - case "segment": - return datakinds.Segments - default: - return nil - } -} - // vim: foldmethod=marker foldlevel=0 diff --git a/internal/datasourcev2/types.go b/internal/datasourcev2/types.go deleted file mode 100644 index a3e912e5..00000000 --- a/internal/datasourcev2/types.go +++ /dev/null @@ -1,87 +0,0 @@ -package datasourcev2 - -import ( - "encoding/json" - - es "github.com/launchdarkly/eventsource" -) - -type pollingPayload struct { - Events []event `json:"events"` -} - -type event struct { - Name string `json:"name"` - EventData json.RawMessage `json:"data"` -} - -// Begin es.Event interface implementation - -// Id returns the id of the event. -func (e event) Id() string { //nolint:stylecheck // The interface requires this method. - return "" -} - -// Event returns the name of the event. -func (e event) Event() string { - return e.Name -} - -// Data returns the raw data of the event. -func (e event) Data() string { - return string(e.EventData) -} - -// En es.Event interface implementation - -type changeSet struct { - intent *serverIntent - events []es.Event -} - -type serverIntent struct { - Payloads []payload `json:"payloads"` -} - -type payload struct { - // The id here doesn't seem to match the state that is included in the - // payload transferred object. - - // It would be nice if we had the same value available in both so we could - // use that as the key consistently throughout the the process. - ID string `json:"id"` - Target int `json:"target"` - Code string `json:"code"` - Reason string `json:"reason"` -} - -// This is the general shape of a put-object event. The delete-object is the same, with the object field being nil. -// type baseObject struct { -// Version int `json:"version"` -// Kind string `json:"kind"` -// Key string `json:"key"` -// Object json.RawMessage `json:"object"` -// } - -// type payloadTransferred struct { -// State string `json:"state"` -// Version int `json:"version"` -// } - -// TODO: Todd doesn't have this in his spec. What are we going to do here? -// -//nolint:godox -type errorEvent struct { - PayloadID string `json:"payloadId"` - Reason string `json:"reason"` -} - -// type heartBeat struct{} - -type goodbye struct { - Reason string `json:"reason"` - Silent bool `json:"silent"` - Catastrophe bool `json:"catastrophe"` - //nolint:godox - // TODO: Might later include some advice or backoff information -} diff --git a/internal/datasystem/store.go b/internal/datasystem/store.go new file mode 100644 index 00000000..2cb88318 --- /dev/null +++ b/internal/datasystem/store.go @@ -0,0 +1,252 @@ +package datasystem + +import ( + "sync" + + "github.com/launchdarkly/go-server-sdk/v7/internal/toposort" + + "github.com/launchdarkly/go-server-sdk/v7/internal/memorystorev2" + + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + + "github.com/launchdarkly/go-sdk-common/v3/ldlog" + "github.com/launchdarkly/go-server-sdk/v7/interfaces" + "github.com/launchdarkly/go-server-sdk/v7/subsystems" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" +) + +// Store is a dual-mode persistent/in-memory store that serves requests for data from the evaluation +// algorithm. +// +// At any given moment one of two stores is active: in-memory, or persistent. This doesn't preclude a caller +// from holding on to a reference to the persistent store even when we swap to the in-memory store. +// +// Once the in-memory store has data (either from initializers or a synchronizer), the persistent +// store is no longer read from. From that point forward, calls to Get will serve data from the memory +// store. +// +// One motivation behind using persistent stores is to offer a way to immediately start evaluating +// flags before a connection is made to LD (or even in the moment before an initializer has run). +// +// The persistent store has TTL caching logic which can result in inconsistent/stale date being returned. Therefore, +// once we have fresh data from LD, we don't want to use the persistent store for reads any longer. +// +// One complication is that persistent stores have historically operated in multiple regimes. The first, "daemon mode", +// is when the SDK is effectively using the store in read-only mode, with the store being populated by Relay +// or another SDK. +// +// The second is normal persistent store mode, where it is both read and written to. In the FDv2 system, we explicitly +// differentiate these cases using a read/read-write mode switch. In all cases, the in-memory store is used once it +// has data available, although in read-write mode the persistent store may still be written to when data updates +// arrive, even though the memory store is serving reads. +// +// This contrasts from FDv1 where even if data from LD is available, that data may fall out of memory due to the +// persistent store's caching logic ("sparse mode", when the TTL is non-infinite). This was because the SDK's main Store +// implementation was a wrapper around the persistent store, rather than entirely independent. +// +// We have found the previous behavior to almost always be undesirable for users. By keeping the persistent and memory +// stores distinct, it should be much clearer where exactly data is coming from and the behavior should be more +// predictable at runtime. +type Store struct { + // Source of truth for flag evals (before initialization), or permanently if there are + // no initializers/synchronizers configured. Optional; if not defined, only memoryStore is used. + persistentStore *persistentStore + + // Source of truth for flag evaluations (once initialized). Before initialization, + // the persistentStore may be used if configured. + memoryStore *memorystorev2.Store + + // True if the data in the memory store may be persisted to the persistent store. + // + // This may be false if an initializer/synchronizer has received data that shouldn't propagate memory to the + // persistent store, such as another database or untrusted file. + // + // Generally only LD data sources should request data persistence. + persist bool + + // Points to the active store. Swapped upon initialization. + active subsystems.ReadOnlyStore + + // Identifies the current data. + selector *fdv2proto.Selector + + mu sync.RWMutex + + loggers ldlog.Loggers +} + +type persistentStore struct { + // Contains the actual store implementation. + impl subsystems.DataStore + // The persistentStore is read-only, or read-write. In read-only mode, the store + // is *never* written to, and only read before the in-memory store is initialized. + // This is equivalent to the concept of "daemon mode". + // + // In read-write mode, data from initializers/synchronizers is written to the store + // as it is received. This is equivalent to the normal "persistent store" configuration + // that an SDK can use to collaborate with zero or more other SDKs with a (possibly shared) database. + mode subsystems.DataStoreMode + // This exists as a quirk of the DataSourceUpdateSink interface, which store implements. The DataSourceUpdateSink + // has a method to return a DataStoreStatusProvider so that a DataSource can monitor the state of the store. This + // was originally used in fdv1 to know when the store went offline/online, so that data could be committed back + // to the store when it came back online. In fdv2 system, this is handled by the FDv2 struct itself, so the + // data source doesn't need any knowledge of it. We can delete this piece of infrastructure when we no longer + // need to support fdv1 (or we could refactor the fdv2 data sources to use a different set of interfaces that don't + // require this.) + statusProvider interfaces.DataStoreStatusProvider +} + +func (p *persistentStore) writable() bool { + return p != nil && p.mode == subsystems.DataStoreModeReadWrite +} + +// NewStore creates a new store. If a persistent store needs to be configured, call WithPersistence before any other +// method is called. +func NewStore(loggers ldlog.Loggers) *Store { + s := &Store{ + persistentStore: nil, + memoryStore: memorystorev2.New(loggers), + loggers: loggers, + selector: fdv2proto.NoSelector(), + persist: false, + } + s.active = s.memoryStore + return s +} + +// WithPersistence exists to accommodate the SDK's configuration builders. We need a ClientContext +// before we can call Build to actually get the persistent store. That ClientContext requires the +// DataDestination, which is what this store struct implements. Therefore, the call to NewStore and +// WithPersistence need to be separate. +func (s *Store) WithPersistence(persistent subsystems.DataStore, mode subsystems.DataStoreMode, + statusProvider interfaces.DataStoreStatusProvider) *Store { + s.mu.Lock() + defer s.mu.Unlock() + + s.persistentStore = &persistentStore{ + impl: persistent, + mode: mode, + statusProvider: statusProvider, + } + + s.active = s.persistentStore.impl + return s +} + +// Selector returns the current selector. +func (s *Store) Selector() *fdv2proto.Selector { + s.mu.RLock() + defer s.mu.RUnlock() + return s.selector +} + +// Close closes the store. If there is a persistent store configured, it will be closed. +func (s *Store) Close() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.persistentStore != nil { + return s.persistentStore.impl.Close() + } + return nil +} + +// SetBasis sets the basis of the store. Any existing data is discarded. To request data persistence, +// set persist to true. +func (s *Store) SetBasis(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) { + collections := fdv2proto.ToStorableItems(events) + s.mu.Lock() + defer s.mu.Unlock() + + s.memoryStore.SetBasis(collections) + + s.persist = persist + s.selector = selector + + s.active = s.memoryStore + + if s.shouldPersist() { + //nolint:godox + // TODO: figure out where to handle/report the error. + _ = s.persistentStore.impl.Init(toposort.Sort(collections)) + } +} + +func (s *Store) shouldPersist() bool { + return s.persist && s.persistentStore.writable() +} + +// ApplyDelta applies a delta update to the store. ApplyDelta should not be called until SetBasis has been called. +// To request data persistence, set persist to true. +func (s *Store) ApplyDelta(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) { + collections := fdv2proto.ToStorableItems(events) + + s.mu.Lock() + defer s.mu.Unlock() + + s.memoryStore.ApplyDelta(collections) + + s.persist = persist + s.selector = selector + + // The process for applying the delta to the memory store is different than the persistent store + // because persistent stores are not yet transactional in regards to payload version. This means + // we still need to apply a series of upserts, so the state of the store may be inconsistent when that + // is happening. In practice, we often don't receive more than one event at a time, but this may change + // in the future. + if s.shouldPersist() { + //nolint:godox + // TODO: figure out where to handle/report the error. + for _, coll := range toposort.Sort(collections) { + for _, item := range coll.Items { + _, err := s.persistentStore.impl.Upsert(coll.Kind, item.Key, item.Item) + if err != nil { + s.loggers.Errorf("Failed to apply delta to persistent store: %s", err) + } + } + } + } +} + +// GetDataStoreStatusProvider returns the status provider for the persistent store, if one is configured, otherwise +// nil. +func (s *Store) GetDataStoreStatusProvider() interfaces.DataStoreStatusProvider { + s.mu.RLock() + defer s.mu.RUnlock() + if s.persistentStore == nil { + return nil + } + return s.persistentStore.statusProvider +} + +// Commit persists the data in the memory store to the persistent store, if configured. The persistent store +// must also be in write mode, and the last call to SetBasis or ApplyDelta must have had persist set to true. +func (s *Store) Commit() error { + s.mu.RLock() + defer s.mu.RUnlock() + + if s.shouldPersist() { + return s.persistentStore.impl.Init(s.memoryStore.GetAllKinds()) + } + return nil +} + +func (s *Store) getActive() subsystems.ReadOnlyStore { + s.mu.RLock() + defer s.mu.RUnlock() + return s.active +} + +//nolint:revive // Implementation for ReadOnlyStore. +func (s *Store) GetAll(kind ldstoretypes.DataKind) ([]ldstoretypes.KeyedItemDescriptor, error) { + return s.getActive().GetAll(kind) +} + +//nolint:revive // Implementation for ReadOnlyStore. +func (s *Store) Get(kind ldstoretypes.DataKind, key string) (ldstoretypes.ItemDescriptor, error) { + return s.getActive().Get(kind, key) +} + +//nolint:revive // Implementation for ReadOnlyStore. +func (s *Store) IsInitialized() bool { + return s.getActive().IsInitialized() +} diff --git a/internal/datasystem/store_test.go b/internal/datasystem/store_test.go new file mode 100644 index 00000000..f6701527 --- /dev/null +++ b/internal/datasystem/store_test.go @@ -0,0 +1,343 @@ +package datasystem + +import ( + "errors" + "math/rand" + "sync" + "testing" + "time" + + "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldmodel" + + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + + "github.com/launchdarkly/go-server-sdk/v7/subsystems" + "github.com/stretchr/testify/require" + + "github.com/launchdarkly/go-sdk-common/v3/ldlogtest" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" + "github.com/stretchr/testify/assert" +) + +func TestStore_New(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + store := NewStore(logCapture.Loggers) + assert.NoError(t, store.Close()) +} + +func TestStore_NoSelector(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + store := NewStore(logCapture.Loggers) + defer store.Close() + assert.Equal(t, fdv2proto.NoSelector(), store.Selector()) +} + +func TestStore_NoPersistence_NewStore_IsNotInitialized(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + store := NewStore(logCapture.Loggers) + defer store.Close() + assert.False(t, store.IsInitialized()) +} + +func TestStore_NoPersistence_MemoryStore_IsInitialized(t *testing.T) { + v1 := fdv2proto.NewSelector("foo", 1) + none := fdv2proto.NoSelector() + tests := []struct { + name string + selector *fdv2proto.Selector + persist bool + }{ + {"with selector, persist", v1, true}, + {"with selector, do not persist", v1, false}, + {"no selector, persist", none, true}, + {"no selector, do not persist", none, false}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + store := NewStore(logCapture.Loggers) + defer store.Close() + store.SetBasis([]fdv2proto.Event{}, tt.selector, tt.persist) + assert.True(t, store.IsInitialized()) + }) + } +} + +func TestStore_Commit(t *testing.T) { + t.Run("absence of persistent store doesn't cause error when committing", func(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + store := NewStore(logCapture.Loggers) + defer store.Close() + assert.NoError(t, store.Commit()) + }) + + t.Run("persist-marked memory items are copied to persistent store in r/w mode", func(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + + // isDown causes the fake to reject updates (until flipped to false). + spy := &fakeStore{isDown: true} + + store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeReadWrite, nil) + defer store.Close() + + // The store receives data as a list of events, but the persistent store receives them as an + // []ldstoretypes.Collection. + input := []fdv2proto.Event{ + fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldmodel.FeatureFlag{}}, + fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: ldmodel.Segment{}}, + } + + output := []ldstoretypes.Collection{ + { + Kind: ldstoreimpl.Features(), + Items: []ldstoretypes.KeyedItemDescriptor{ + {Key: "foo", Item: ldstoretypes.ItemDescriptor{Version: 1, Item: ldmodel.FeatureFlag{}}}, + }, + }, + { + Kind: ldstoreimpl.Segments(), + Items: []ldstoretypes.KeyedItemDescriptor{ + {Key: "bar", Item: ldstoretypes.ItemDescriptor{Version: 2, Item: ldmodel.Segment{}}}, + }, + }} + + // There should be an error since writing to the store will fail. + store.SetBasis(input, fdv2proto.NoSelector(), true) + + // Since writing should have failed, there should be no data in the persistent store. + require.Empty(t, spy.initPayload) + + spy.isDown = false + + // This time, the data should be stored properly. + require.NoError(t, store.Commit()) + + requireCollectionsMatch(t, output, spy.initPayload) + }) + + t.Run("non-persist memory items are not copied to persistent store in r/w mode", func(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + + // The fake should accept updates. + spy := &fakeStore{isDown: false} + store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeReadWrite, nil) + defer store.Close() + + input := []fdv2proto.Event{ + fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Object: ldstoretypes.ItemDescriptor{Version: 1}}, + fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Object: ldstoretypes.ItemDescriptor{Version: 2}}, + } + + store.SetBasis(input, fdv2proto.NoSelector(), false) + + // Since SetBasis will immediately mirror the data if persist == true, we can check this is empty now. + require.Empty(t, spy.initPayload) + + require.NoError(t, store.Commit()) + + // Commit should be a no-op. This tests that the persist status was saved. + assert.Empty(t, spy.initPayload) + }) + + t.Run("persist-marked memory items are not copied to persistent store in r-only mode", func(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + + // The fake should accept updates. + spy := &fakeStore{isDown: false} + store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeRead, nil) + defer store.Close() + + input := []fdv2proto.Event{ + fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Object: ldstoretypes.ItemDescriptor{Version: 1}}, + fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Object: ldstoretypes.ItemDescriptor{Version: 2}}, + } + + // Even though persist is true, the store was marked as read-only, so it shouldn't be written to. + store.SetBasis(input, fdv2proto.NoSelector(), true) + + require.Empty(t, spy.initPayload) + + require.NoError(t, store.Commit()) + + // Same with commit. + assert.Empty(t, spy.initPayload) + }) +} + +func TestStore_GetActive(t *testing.T) { + t.Run("memory store is active if no persistent store configured", func(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + store := NewStore(logCapture.Loggers) + defer store.Close() + foo, err := store.Get(ldstoreimpl.Features(), "foo") + assert.NoError(t, err) + assert.Equal(t, foo, ldstoretypes.ItemDescriptor{}.NotFound()) + + input := []fdv2proto.Event{ + fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldstoretypes.ItemDescriptor{}}, + } + + store.SetBasis(input, fdv2proto.NoSelector(), false) + + foo, err = store.Get(ldstoreimpl.Features(), "foo") + assert.NoError(t, err) + assert.Equal(t, 1, foo.Version) + }) + + t.Run("persistent store is active if configured", func(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + + store := NewStore(logCapture.Loggers).WithPersistence(&fakeStore{}, subsystems.DataStoreModeReadWrite, nil) + defer store.Close() + + _, err := store.Get(ldstoreimpl.Features(), "foo") + + // The fakeStore should return a specific error when Get is called. + assert.Equal(t, errImAPersistentStore, err) + }) + + t.Run("active store swaps from persistent to memory", func(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + store := NewStore(logCapture.Loggers).WithPersistence(&fakeStore{}, subsystems.DataStoreModeReadWrite, nil) + defer store.Close() + + // Before there's any data, if we call Get the persistent store should be accessed. + _, err := store.Get(ldstoreimpl.Features(), "foo") + assert.Equal(t, errImAPersistentStore, err) + + input := []fdv2proto.Event{ + fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldstoretypes.ItemDescriptor{}}, + } + + store.SetBasis(input, fdv2proto.NoSelector(), false) + + // Now that there's memory data, the persistent store should no longer be accessed. + foo, err := store.Get(ldstoreimpl.Features(), "foo") + assert.NoError(t, err) + assert.Equal(t, 1, foo.Version) + }) +} + +func TestStore_SelectorIsRemembered(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + store := NewStore(logCapture.Loggers) + defer store.Close() + + selector1 := fdv2proto.NewSelector("foo", 1) + selector2 := fdv2proto.NewSelector("bar", 2) + selector3 := fdv2proto.NewSelector("baz", 3) + selector4 := fdv2proto.NewSelector("qux", 4) + selector5 := fdv2proto.NewSelector("this better be the last one", 5) + + store.SetBasis([]fdv2proto.Event{}, selector1, false) + assert.Equal(t, selector1, store.Selector()) + + store.SetBasis([]fdv2proto.Event{}, selector2, false) + assert.Equal(t, selector2, store.Selector()) + + store.ApplyDelta([]fdv2proto.Event{}, selector3, false) + assert.Equal(t, selector3, store.Selector()) + + store.ApplyDelta([]fdv2proto.Event{}, selector4, false) + assert.Equal(t, selector4, store.Selector()) + + assert.NoError(t, store.Commit()) + assert.Equal(t, selector4, store.Selector()) + + store.SetBasis([]fdv2proto.Event{}, selector5, false) +} + +func TestStore_Concurrency(t *testing.T) { + t.Run("methods using the active store", func(t *testing.T) { + logCapture := ldlogtest.NewMockLog() + store := NewStore(logCapture.Loggers) + defer store.Close() + + var wg sync.WaitGroup + + run := func(f func()) { + wg.Add(1) + defer wg.Done() + for i := 0; i < 100; i++ { + f() + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + } + } + + go run(func() { + _, _ = store.Get(ldstoreimpl.Features(), "foo") + }) + go run(func() { + _, _ = store.GetAll(ldstoreimpl.Features()) + }) + go run(func() { + _ = store.GetDataStoreStatusProvider() + }) + go run(func() { + _ = store.IsInitialized() + }) + go run(func() { + store.SetBasis([]fdv2proto.Event{}, fdv2proto.NoSelector(), true) + }) + go run(func() { + store.ApplyDelta([]fdv2proto.Event{}, fdv2proto.NoSelector(), true) + }) + go run(func() { + _ = store.Selector() + }) + }) +} + +type fakeStore struct { + initPayload []ldstoretypes.Collection + isDown bool +} + +var errImAPersistentStore = errors.New("i'm a persistent store") + +func (f *fakeStore) GetAll(kind ldstoretypes.DataKind) ([]ldstoretypes.KeyedItemDescriptor, error) { + return nil, nil +} + +func (f *fakeStore) Get(kind ldstoretypes.DataKind, key string) (ldstoretypes.ItemDescriptor, error) { + return ldstoretypes.ItemDescriptor{}, errImAPersistentStore +} + +func (f *fakeStore) IsInitialized() bool { + return false +} + +func (f *fakeStore) Init(allData []ldstoretypes.Collection) error { + if f.isDown { + return errors.New("store is down") + } + f.initPayload = allData + return nil +} + +func (f *fakeStore) Upsert(kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor) (bool, error) { + return false, nil +} + +func (f *fakeStore) IsStatusMonitoringEnabled() bool { + return false +} + +func (f *fakeStore) Close() error { + return nil +} + +// This matcher is required instead of calling ElementsMatch directly on two slices of collections because +// the order of the collections, or the order within each collection, is not defined. +func requireCollectionsMatch(t *testing.T, expected []ldstoretypes.Collection, actual []ldstoretypes.Collection) { + require.Equal(t, len(expected), len(actual)) + for _, expectedCollection := range expected { + for _, actualCollection := range actual { + if expectedCollection.Kind == actualCollection.Kind { + require.ElementsMatch(t, expectedCollection.Items, actualCollection.Items) + break + } + } + } +} diff --git a/internal/fdv2proto/event_to_storable_item.go b/internal/fdv2proto/event_to_storable_item.go new file mode 100644 index 00000000..379ff12d --- /dev/null +++ b/internal/fdv2proto/event_to_storable_item.go @@ -0,0 +1,59 @@ +package fdv2proto + +import ( + "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" +) + +// ToStorableItems converts a list of FDv2 events to a list of collections suitable for insertion +// into a data store. +func ToStorableItems(events []Event) []ldstoretypes.Collection { + flagCollection := ldstoretypes.Collection{ + Kind: datakinds.Features, + Items: make([]ldstoretypes.KeyedItemDescriptor, 0), + } + + segmentCollection := ldstoretypes.Collection{ + Kind: datakinds.Segments, + Items: make([]ldstoretypes.KeyedItemDescriptor, 0), + } + + for _, event := range events { + switch e := event.(type) { + case PutObject: + switch e.Kind { + case FlagKind: + flagCollection.Items = append(flagCollection.Items, ldstoretypes.KeyedItemDescriptor{ + Key: e.Key, + Item: ldstoretypes.ItemDescriptor{Version: e.Version, Item: e.Object}, + }) + case SegmentKind: + segmentCollection.Items = append(segmentCollection.Items, ldstoretypes.KeyedItemDescriptor{ + Key: e.Key, + Item: ldstoretypes.ItemDescriptor{Version: e.Version, Item: e.Object}, + }) + } + case DeleteObject: + switch e.Kind { + case FlagKind: + flagCollection.Items = append(flagCollection.Items, ldstoretypes.KeyedItemDescriptor{ + Key: e.Key, + Item: ldstoretypes.ItemDescriptor{ + Version: e.Version, + Item: nil, + }, + }) + case SegmentKind: + segmentCollection.Items = append(segmentCollection.Items, ldstoretypes.KeyedItemDescriptor{ + Key: e.Key, + Item: ldstoretypes.ItemDescriptor{ + Version: e.Version, + Item: nil, + }, + }) + } + } + } + + return []ldstoretypes.Collection{flagCollection, segmentCollection} +} diff --git a/internal/fdv2proto/events.go b/internal/fdv2proto/events.go new file mode 100644 index 00000000..a28fc986 --- /dev/null +++ b/internal/fdv2proto/events.go @@ -0,0 +1,164 @@ +package fdv2proto + +import ( + "errors" + "fmt" + + "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" +) + +// IntentCode represents the various intents that can be sent by the server. +type IntentCode string + +const ( + // IntentTransferFull means the server intends to send a full data set. + IntentTransferFull = IntentCode("xfer-full") + // IntentTransferChanges means the server intends to send only the necessary changes to bring + // an existing data set up-to-date. + IntentTransferChanges = IntentCode("xfer-changes") + // IntentNone means the server intends to send no data (payload is up to date). + IntentNone = IntentCode("none") +) + +// Event represents an event that can be sent by the server. +type Event interface { + // Name returns the name of the event. + Name() EventName +} + +// EventName is the name of the event. +type EventName string + +const ( + // EventPutObject specifies that an object should be added to the data set with upsert semantics. + EventPutObject = EventName("put-object") + + // EventDeleteObject specifies that an object should be removed from the data set. + EventDeleteObject = EventName("delete-object") + + // EventServerIntent specifies the server's intent. + EventServerIntent = EventName("server-intent") + + // EventPayloadTransferred specifies that that all data required to bring the existing data set to + // a new version has been transferred. + EventPayloadTransferred = EventName("payload-transferred") + + // EventHeartbeat keeps the connection alive. + EventHeartbeat = EventName("heart-beat") + + // EventGoodbye specifies that the server is about to close the connection. + EventGoodbye = EventName("goodbye") + + // EventError specifies that an error occurred while serving the connection. + EventError = EventName("error") +) + +// ObjectKind represents the kind of object. +type ObjectKind string + +const ( + // FlagKind is a flag. + FlagKind = ObjectKind("flag") + // SegmentKind is a segment. + SegmentKind = ObjectKind("segment") +) + +// ErrUnknownKind represents that a given ObjectKind had no FDv1 equivalent +// DataKind. +type ErrUnknownKind struct { + kind ObjectKind +} + +// Is returns true if the error is an ErrUnknownKind. +func (e *ErrUnknownKind) Is(err error) bool { + var errUnknownKind *ErrUnknownKind + ok := errors.As(err, &errUnknownKind) + return ok +} + +func (e *ErrUnknownKind) Error() string { + return fmt.Sprintf("unknown object kind: %s", e.kind) +} + +// ToFDV1 converts the object kind to an FDv1 data kind. If there is no equivalent, it returns +// an ErrUnknownKind. +func (o ObjectKind) ToFDV1() (datakinds.DataKindInternal, error) { + switch o { + case FlagKind: + return datakinds.Features, nil + case SegmentKind: + return datakinds.Segments, nil + default: + return nil, &ErrUnknownKind{o} + } +} + +// ServerIntent represents the server's intent. +type ServerIntent struct { + // Payloads is a list of payloads, defined to be at least length 1. + Payloads []Payload `json:"payloads"` +} + +//nolint:revive // Event method. +func (ServerIntent) Name() EventName { + return EventServerIntent +} + +// PayloadTransferred represents the fact that all payload objects have been sent. +type PayloadTransferred struct { + State string `json:"state"` + Version int `json:"version"` +} + +//nolint:revive // Event method. +func (p PayloadTransferred) Name() EventName { + return EventPayloadTransferred +} + +// DeleteObject specifies the deletion of a particular object. +type DeleteObject struct { + Version int `json:"version"` + Kind ObjectKind `json:"kind"` + Key string `json:"key"` +} + +//nolint:revive // Event method. +func (d DeleteObject) Name() EventName { + return EventDeleteObject +} + +// PutObject specifies the addition of a particular object with upsert semantics. +type PutObject struct { + Version int `json:"version"` + Kind ObjectKind `json:"kind"` + Key string `json:"key"` + Object any `json:"object"` +} + +//nolint:revive // Event method. +func (p PutObject) Name() EventName { + return EventPutObject +} + +// Error represents an error event. +type Error struct { + PayloadID string `json:"payloadId"` + Reason string `json:"reason"` +} + +//nolint:revive // Event method. +func (e Error) Name() EventName { + return EventError +} + +// Goodbye represents a goodbye event. +type Goodbye struct { + Reason string `json:"reason"` + Silent bool `json:"silent"` + Catastrophe bool `json:"catastrophe"` +} + +//nolint:revive // Event method. +func (g Goodbye) Name() EventName { + return EventGoodbye +} diff --git a/internal/fdv2proto/package.go b/internal/fdv2proto/package.go new file mode 100644 index 00000000..c8116173 --- /dev/null +++ b/internal/fdv2proto/package.go @@ -0,0 +1,2 @@ +// Package fdv2proto defines protocol-level data types for Flag Delivery V2. +package fdv2proto diff --git a/internal/fdv2proto/payloads.go b/internal/fdv2proto/payloads.go new file mode 100644 index 00000000..d635e0a2 --- /dev/null +++ b/internal/fdv2proto/payloads.go @@ -0,0 +1,20 @@ +package fdv2proto + +// Payload represents a payload delivered in a streaming response. +type Payload struct { + // The id here doesn't seem to match the state that is included in the + // Payload transferred object. + + // It would be nice if we had the same value available in both so we could + // use that as the key consistently throughout the the process. + ID string `json:"id"` + Target int `json:"target"` + Code IntentCode `json:"code"` + Reason string `json:"reason"` +} + +// PollingPayload represents a payload that is delivered in a polling response. +type PollingPayload struct { + // Note: the first event in a PollingPayload should be a Payload. + Events []RawEvent `json:"events"` +} diff --git a/internal/fdv2proto/raw_event.go b/internal/fdv2proto/raw_event.go new file mode 100644 index 00000000..c5d2ba5b --- /dev/null +++ b/internal/fdv2proto/raw_event.go @@ -0,0 +1,12 @@ +package fdv2proto + +import ( + "encoding/json" +) + +// RawEvent is a partially deserialized event that allows the the event name to be extracted before +// the rest of the event is deserialized. +type RawEvent struct { + Name EventName `json:"name"` + Data json.RawMessage `json:"data"` +} diff --git a/internal/fdv2proto/selector.go b/internal/fdv2proto/selector.go new file mode 100644 index 00000000..9e7878fe --- /dev/null +++ b/internal/fdv2proto/selector.go @@ -0,0 +1,58 @@ +package fdv2proto + +import ( + "encoding/json" + "errors" +) + +// Selector represents a particular snapshot of data. +type Selector struct { + state string + version int +} + +// NoSelector returns a nil Selector, representing the lack of one. It is +// here only for readability at call sites. +func NoSelector() *Selector { + return nil +} + +// NewSelector creates a new Selector from a state string and version. +func NewSelector(state string, version int) *Selector { + return &Selector{state: state, version: version} +} + +// IsSet returns true if the Selector is not nil. +func (s *Selector) IsSet() bool { + return s != nil +} + +// State returns the state string of the Selector. This cannot be called if the Selector is nil. +func (s *Selector) State() string { + return s.state +} + +// Version returns the version of the Selector. This cannot be called if the Selector is nil. +func (s *Selector) Version() int { + return s.version +} + +// UnmarshalJSON unmarshals a Selector from JSON. +func (s *Selector) UnmarshalJSON(data []byte) error { + var raw map[string]interface{} + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + if state, ok := raw["state"].(string); ok { + s.state = state + } else { + return errors.New("unmarshal selector: missing state field") + } + if version, ok := raw["version"].(float64); ok { + s.version = int(version) + } else { + return errors.New("unmarshal selector: missing version field") + } + return nil +} diff --git a/internal/sharedtest/mocks/mock_data_destination.go b/internal/sharedtest/mocks/mock_data_destination.go index 4ecc78eb..faa01cb6 100644 --- a/internal/sharedtest/mocks/mock_data_destination.go +++ b/internal/sharedtest/mocks/mock_data_destination.go @@ -5,10 +5,12 @@ import ( "testing" "time" + "github.com/launchdarkly/go-server-sdk/v7/internal/toposort" + + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + "github.com/launchdarkly/go-server-sdk/v7/interfaces" "github.com/launchdarkly/go-server-sdk/v7/subsystems" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" - th "github.com/launchdarkly/go-test-helpers/v3" "github.com/stretchr/testify/assert" @@ -41,26 +43,37 @@ func NewMockDataDestination(realStore subsystems.DataStore) *MockDataDestination } } -// Init in this test implementation, delegates to d.DataStore.CapturedUpdates. -func (d *MockDataDestination) Init(allData []ldstoretypes.Collection, _ *int) bool { - // For now, the payloadVersion is ignored. When the data sources start making use of it, it should be +// SetBasis in this test implementation, delegates to d.DataStore.CapturedUpdates. +func (d *MockDataDestination) SetBasis(events []fdv2proto.Event, _ *fdv2proto.Selector, _ bool) { + // For now, the selector is ignored. When the data sources start making use of it, it should be // stored so that assertions can be made. - for _, coll := range allData { + + collections := fdv2proto.ToStorableItems(events) + + for _, coll := range collections { AssertNotNil(coll.Kind) } - err := d.DataStore.Init(allData) - return err == nil + _ = d.DataStore.Init(toposort.Sort(collections)) } -// Upsert in this test implementation, delegates to d.DataStore.CapturedUpdates. -func (d *MockDataDestination) Upsert( - kind ldstoretypes.DataKind, - key string, - newItem ldstoretypes.ItemDescriptor, -) bool { - AssertNotNil(kind) - _, err := d.DataStore.Upsert(kind, key, newItem) - return err == nil +// ApplyDelta in this test implementation, delegates to d.DataStore.CapturedUpdates. +func (d *MockDataDestination) ApplyDelta(events []fdv2proto.Event, _ *fdv2proto.Selector, _ bool) { + // For now, the selector is ignored. When the data sources start making use of it, it should be + // stored so that assertions can be made. + + collections := fdv2proto.ToStorableItems(events) + + for _, coll := range collections { + AssertNotNil(coll.Kind) + } + + for _, coll := range toposort.Sort(collections) { + for _, item := range coll.Items { + if _, err := d.DataStore.Upsert(coll.Kind, item.Key, item.Item); err != nil { + return + } + } + } } // UpdateStatus in this test implementation, pushes a value onto the Statuses channel. diff --git a/internal/toposort/toposort.go b/internal/toposort/toposort.go index 7b5fc70e..f021d3cf 100644 --- a/internal/toposort/toposort.go +++ b/internal/toposort/toposort.go @@ -7,7 +7,6 @@ import ( "github.com/launchdarkly/go-sdk-common/v3/ldvalue" "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldmodel" "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl" st "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" ) @@ -122,12 +121,12 @@ func GetNeighbors(kind st.DataKind, fromItem st.ItemDescriptor) Neighbors { } } switch kind { - case ldstoreimpl.Features(): + case datakinds.Features: if flag, ok := fromItem.Item.(*ldmodel.FeatureFlag); ok { if len(flag.Prerequisites) > 0 { ret = make(Neighbors, len(flag.Prerequisites)) for _, p := range flag.Prerequisites { - ret.Add(Vertex{ldstoreimpl.Features(), p.Key}) + ret.Add(Vertex{datakinds.Features, p.Key}) } } for _, r := range flag.Rules { @@ -136,7 +135,7 @@ func GetNeighbors(kind st.DataKind, fromItem st.ItemDescriptor) Neighbors { return ret } - case ldstoreimpl.Segments(): + case datakinds.Segments: if segment, ok := fromItem.Item.(*ldmodel.Segment); ok { for _, r := range segment.Rules { checkClauses(r.Clauses) @@ -149,18 +148,18 @@ func GetNeighbors(kind st.DataKind, fromItem st.ItemDescriptor) Neighbors { // Sort performs a topological sort on the given data collections, so that the items can be inserted into a // persistent store to minimize the risk of evaluating a flag before its prerequisites/segments have been stored. func Sort(allData []st.Collection) []st.Collection { - colls := make([]st.Collection, 0, len(allData)) + collections := make([]st.Collection, 0, len(allData)) for _, coll := range allData { if doesDataKindSupportDependencies(coll.Kind) { itemsOut := make([]st.KeyedItemDescriptor, 0, len(coll.Items)) addItemsInDependencyOrder(coll.Kind, coll.Items, &itemsOut) - colls = append(colls, st.Collection{Kind: coll.Kind, Items: itemsOut}) + collections = append(collections, st.Collection{Kind: coll.Kind, Items: itemsOut}) } else { - colls = append(colls, coll) + collections = append(collections, coll) } } - sort.Slice(colls, func(i, j int) bool { - return dataKindPriority(colls[i].Kind) < dataKindPriority(colls[j].Kind) + sort.Slice(collections, func(i, j int) bool { + return dataKindPriority(collections[i].Kind) < dataKindPriority(collections[j].Kind) }) - return colls + return collections } diff --git a/subsystems/data_destination.go b/subsystems/data_destination.go index 7f7dbc0a..b74e3305 100644 --- a/subsystems/data_destination.go +++ b/subsystems/data_destination.go @@ -1,7 +1,7 @@ package subsystems import ( - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" ) // DataDestination represents a sink for data obtained from a data source. @@ -11,24 +11,22 @@ import ( // Do not use it. // You have been warned. type DataDestination interface { - // Init overwrites the current contents of the data store with a set of items for each collection. + // SetBasis defines a new basis for the data store. This means the store must + // be emptied of any existing data before applying the events. This operation should be + // atomic with respect to any other operations that modify the store. // - // If the underlying data store returns an error during this operation, the SDK will log it, - // and set the data source state to DataSourceStateInterrupted with an error of - // DataSourceErrorKindStoreError. It will not return the error to the data source, but will - // return false to indicate that the operation failed. - Init(allData []ldstoretypes.Collection, payloadVersion *int) bool + // The selector defines the version of the basis. + // + // If persist is true, it indicates that the data should be propagated to any connected persistent + // store. + SetBasis(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) - // Upsert updates or inserts an item in the specified collection. For updates, the object will only be - // updated if the existing version is less than the new version. + // ApplyDelta applies a set of changes to an existing basis. This operation should be atomic with + // respect to any other operations that modify the store. // - // To mark an item as deleted, pass an ItemDescriptor with a nil Item and a nonzero version - // number. Deletions must be versioned so that they do not overwrite a later update in case updates - // are received out of order. + // The selector defines the new version of the basis. // - // If the underlying data store returns an error during this operation, the SDK will log it, - // and set the data source state to DataSourceStateInterrupted with an error of - // DataSourceErrorKindStoreError. It will not return the error to the data source, but will - // return false to indicate that the operation failed. - Upsert(kind ldstoretypes.DataKind, key string, item ldstoretypes.ItemDescriptor) bool + // If persist is true, it indicates that the changes should be propagated to any connected persistent + // store. + ApplyDelta(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) } diff --git a/subsystems/data_store_mode.go b/subsystems/data_store_mode.go new file mode 100644 index 00000000..056750e9 --- /dev/null +++ b/subsystems/data_store_mode.go @@ -0,0 +1,19 @@ +package subsystems + +// DataStoreMode represents the mode of operation of a Data Store in FDV2 mode. +// +// This enum is not stable, and not subject to any backwards +// compatibility guarantees or semantic versioning. It is not suitable for production usage. +// +// Do not use it. +// You have been warned. +type DataStoreMode int + +const ( + // DataStoreModeRead indicates that the data store is read-only. Data will never be written back to the store by + // the SDK. + DataStoreModeRead = 0 + // DataStoreModeReadWrite indicates that the data store is read-write. Data from initializers/synchronizers may be + // written to the store as necessary. + DataStoreModeReadWrite = 1 +)