Skip to content

Commit

Permalink
chore: add fdv2 store & update sources to use fdv2 protocol definitio…
Browse files Browse the repository at this point in the history
…ns (#192)

This adds the FDv2 dual-mode store, which supports serving data requests
from an in-memory store or from a persistent store (with cache).
  • Loading branch information
cwaldren-ld authored Oct 8, 2024
1 parent 1c400a1 commit ece9cab
Show file tree
Hide file tree
Showing 18 changed files with 1,251 additions and 344 deletions.
36 changes: 18 additions & 18 deletions internal/datasource/streaming_data_source_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var (
deleteDataRequiredProperties = []string{"path", "version"} //nolint:gochecknoglobals
)

// PutData is the logical representation of the data in the "put" event. In the JSON representation,
// putData is the logical representation of the data in the "put" event. In the JSON representation,
// the "data" property is actually a map of maps, but the schema we use internally is a list of
// lists instead.
//
Expand All @@ -37,12 +37,12 @@ var (
// }
// }
// }
type PutData struct {
type putData struct {
Path string // we don't currently do anything with this
Data []ldstoretypes.Collection
}

// PatchData is the logical representation of the data in the "patch" event. In the JSON representation,
// patchData is the logical representation of the data in the "patch" event. In the JSON representation,
// there is a "path" property in the format "/flags/key" or "/segments/key", which we convert into
// Kind and Key when we parse it. The "data" property is the JSON representation of the flag or
// segment, which we deserialize into an ItemDescriptor.
Expand All @@ -56,13 +56,13 @@ type PutData struct {
// "version": 2, ...etc.
// }
// }
type PatchData struct {
type patchData struct {
Kind ldstoretypes.DataKind
Key string
Data ldstoretypes.ItemDescriptor
}

// DeleteData is the logical representation of the data in the "delete" event. In the JSON representation,
// deleteData is the logical representation of the data in the "delete" event. In the JSON representation,
// there is a "path" property in the format "/flags/key" or "/segments/key", which we convert into
// Kind and Key when we parse it.
//
Expand All @@ -72,14 +72,14 @@ type PatchData struct {
// "path": "/flags/flagkey",
// "version": 3
// }
type DeleteData struct {
type deleteData struct {
Kind ldstoretypes.DataKind
Key string
Version int
}

func parsePutData(data []byte) (PutData, error) {
var ret PutData
func parsePutData(data []byte) (putData, error) {
var ret putData
r := jreader.NewReader(data)
for obj := r.Object().WithRequiredProperties(putDataRequiredProperties); obj.Next(); {
switch string(obj.Name()) {
Expand All @@ -92,15 +92,15 @@ func parsePutData(data []byte) (PutData, error) {
return ret, r.Error()
}

func parsePatchData(data []byte) (PatchData, error) {
var ret PatchData
func parsePatchData(data []byte) (patchData, error) {
var ret patchData
r := jreader.NewReader(data)
var kind datakinds.DataKindInternal
var key string
parseItem := func() (PatchData, error) {
parseItem := func() (patchData, error) {
item, err := kind.DeserializeFromJSONReader(&r)
if err != nil {
return PatchData{}, err
return patchData{}, err
}
ret.Data = item
return ret, nil
Expand All @@ -126,7 +126,7 @@ func parsePatchData(data []byte) (PatchData, error) {
}
}
if err := r.Error(); err != nil {
return PatchData{}, err
return patchData{}, err
}
// If we got here, it means we couldn't parse the data model object yet because we saw the
// "data" property first. But we definitely saw both properties (otherwise we would've got
Expand All @@ -138,13 +138,13 @@ func parsePatchData(data []byte) (PatchData, error) {
}
}
if r.Error() != nil {
return PatchData{}, r.Error()
return patchData{}, r.Error()
}
return PatchData{}, errors.New("patch event had no data property")
return patchData{}, errors.New("patch event had no data property")
}

func parseDeleteData(data []byte) (DeleteData, error) {
var ret DeleteData
func parseDeleteData(data []byte) (deleteData, error) {
var ret deleteData
r := jreader.NewReader(data)
for obj := r.Object().WithRequiredProperties(deleteDataRequiredProperties); obj.Next(); {
switch string(obj.Name()) {
Expand All @@ -161,7 +161,7 @@ func parseDeleteData(data []byte) (DeleteData, error) {
}
}
if r.Error() != nil {
return DeleteData{}, r.Error()
return deleteData{}, r.Error()
}
return ret, nil
}
Expand Down
29 changes: 18 additions & 11 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"sync"
"time"

"github.com/launchdarkly/go-server-sdk/v7/internal/datasource"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"
"github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/internal"
"github.com/launchdarkly/go-server-sdk/v7/internal/datasource"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
)

Expand All @@ -18,10 +18,10 @@ const (
pollingWillRetryMessage = "will retry at next scheduled poll interval"
)

// Requester allows PollingProcessor to delegate fetching data to another component.
// PollingRequester allows PollingProcessor to delegate fetching data to another component.
// This is useful for testing the PollingProcessor without needing to set up a test HTTP server.
type Requester interface {
Request() (data []ldstoretypes.Collection, cached bool, err error)
type PollingRequester interface {
Request() (*PollingResponse, error)
BaseURI() string
FilterKey() string
}
Expand All @@ -34,7 +34,7 @@ type Requester interface {
type PollingProcessor struct {
dataDestination subsystems.DataDestination
statusReporter subsystems.DataSourceStatusReporter
requester Requester
requester PollingRequester
pollInterval time.Duration
loggers ldlog.Loggers
setInitializedOnce sync.Once
Expand All @@ -58,7 +58,7 @@ func newPollingProcessor(
context subsystems.ClientContext,
dataDestination subsystems.DataDestination,
statusReporter subsystems.DataSourceStatusReporter,
requester Requester,
requester PollingRequester,
pollInterval time.Duration,
) *PollingProcessor {
pp := &PollingProcessor{
Expand Down Expand Up @@ -142,16 +142,23 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
}

func (pp *PollingProcessor) poll() error {
allData, cached, err := pp.requester.Request()
response, err := pp.requester.Request()

if err != nil {
return err
}

// We initialize the store only if the request wasn't cached
if !cached {
pp.dataDestination.Init(allData, nil)
if response.Cached() {
return nil
}

switch response.Intent() {
case fdv2proto.IntentTransferFull:
pp.dataDestination.SetBasis(response.Events(), response.Selector(), true)
case fdv2proto.IntentTransferChanges:
pp.dataDestination.ApplyDelta(response.Events(), response.Selector(), true)
}

return nil
}

Expand Down
122 changes: 101 additions & 21 deletions internal/datasourcev2/polling_http_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"net/http"
"net/url"

es "github.com/launchdarkly/eventsource"
"github.com/launchdarkly/go-jsonstream/v3/jreader"
"github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-server-sdk/v7/internal/datasource"
"github.com/launchdarkly/go-server-sdk/v7/internal/endpoints"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"
Expand Down Expand Up @@ -68,42 +69,121 @@ func (r *pollingRequester) BaseURI() string {
func (r *pollingRequester) FilterKey() string {
return r.filterKey
}
func (r *pollingRequester) Request() ([]ldstoretypes.Collection, bool, error) {

func (r *pollingRequester) Request() (*PollingResponse, error) {
if r.loggers.IsDebugEnabled() {
r.loggers.Debug("Polling LaunchDarkly for feature flag updates")
}

body, cached, err := r.makeRequest(endpoints.PollingRequestPath)
if err != nil {
return nil, false, err
return nil, err
}
if cached {
return nil, true, nil
return NewCachedPollingResponse(), nil
}

var payload pollingPayload
var payload fdv2proto.PollingPayload
if err = json.Unmarshal(body, &payload); err != nil {
return nil, false, malformedJSONError{err}
return nil, malformedJSONError{err}
}

esEvents := make([]es.Event, 0, len(payload.Events))
for _, event := range payload.Events {
esEvents = append(esEvents, event)
parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) {
dataKind, err := kind.ToFDV1()
if err != nil {
return ldstoretypes.ItemDescriptor{}, err
}
item, err := dataKind.DeserializeFromJSONReader(&r)
return item, err
}

data, err := convertChangesetEventsToPutData(esEvents)
if err != nil {
return nil, false, malformedJSONError{err}
} else if len(data) != 1 {
return nil, false, malformedJSONError{errors.New("missing expected put event")}
}
updates := make([]fdv2proto.Event, 0, len(payload.Events))

putData, ok := data[0].(datasource.PutData)
if !ok {
return nil, false, malformedJSONError{errors.New("payload is not a PutData")}
}
var intentCode fdv2proto.IntentCode

return putData.Data, cached, nil
for _, event := range payload.Events {
switch event.Name {
case fdv2proto.EventServerIntent:
{
var serverIntent fdv2proto.ServerIntent
err := json.Unmarshal(event.Data, &serverIntent)
if err != nil {
return nil, err
} else if len(serverIntent.Payloads) == 0 {
return nil, errors.New("server-intent event has no payloads")
}

intentCode = serverIntent.Payloads[0].Code
if intentCode == fdv2proto.IntentNone {
return NewCachedPollingResponse(), nil
}
}
case fdv2proto.EventPutObject:
{
r := jreader.NewReader(event.Data)

var (
key string
kind fdv2proto.ObjectKind
item ldstoretypes.ItemDescriptor
err error
version int
)

for obj := r.Object().WithRequiredProperties([]string{
versionField, kindField, keyField, objectField}); obj.Next(); {
switch string(obj.Name()) {
case versionField:
version = r.Int()
case kindField:
kind = fdv2proto.ObjectKind(r.String())
case keyField:
key = r.String()
case objectField:
item, err = parseItem(r, kind)
if err != nil {
return nil, err
}
}
}
updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item, Version: version})
}
case fdv2proto.EventDeleteObject:
{
r := jreader.NewReader(event.Data)

var (
version int
kind fdv2proto.ObjectKind
key string
)

for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); {
switch string(obj.Name()) {
case versionField:
version = r.Int()
case kindField:
kind = fdv2proto.ObjectKind(r.String())
//nolint:godox
// TODO: An unrecognized kind should be ignored for forwards compat; the question is,
// do we throw out the DeleteObject here, or let the SDK's store handle it?
case keyField:
key = r.String()
}
}
updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version})
}
case fdv2proto.EventPayloadTransferred:
//nolint:godox
// TODO: deserialize the state and create a fdv2proto.Selector.
}
}

if intentCode == "" {
return nil, errors.New("no server-intent event found in polling response")
}

return NewPollingResponse(intentCode, updates, fdv2proto.NoSelector()), nil
}

func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) {
Expand Down
48 changes: 48 additions & 0 deletions internal/datasourcev2/polling_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package datasourcev2

import "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

// PollingResponse represents the result of a polling request.
type PollingResponse struct {
events []fdv2proto.Event
cached bool
intent fdv2proto.IntentCode
selector *fdv2proto.Selector
}

// NewCachedPollingResponse indicates that the response has not changed.
func NewCachedPollingResponse() *PollingResponse {
return &PollingResponse{
cached: true,
}
}

// NewPollingResponse indicates that data was received.
func NewPollingResponse(intent fdv2proto.IntentCode, events []fdv2proto.Event,
selector *fdv2proto.Selector) *PollingResponse {
return &PollingResponse{
events: events,
intent: intent,
selector: selector,
}
}

// Events returns the events in the response.
func (p *PollingResponse) Events() []fdv2proto.Event {
return p.events
}

// Cached returns true if the response was cached, meaning data has not changed.
func (p *PollingResponse) Cached() bool {
return p.cached
}

// Intent returns the server intent code of the response.
func (p *PollingResponse) Intent() fdv2proto.IntentCode {
return p.intent
}

// Selector returns the Selector of the response.
func (p *PollingResponse) Selector() *fdv2proto.Selector {
return p.selector
}
Loading

0 comments on commit ece9cab

Please sign in to comment.