diff --git a/pkg/state/conformance/state.go b/pkg/state/conformance/state.go index 279f9353..a15db970 100644 --- a/pkg/state/conformance/state.go +++ b/pkg/state/conformance/state.go @@ -180,6 +180,47 @@ func (suite *StateSuite) TestWatchKind() { case <-time.After(time.Second): suite.FailNow("timed out waiting for event") } + + chWithBootstrap := make(chan state.Event) + + suite.Require().NoError(suite.State.WatchKind(ctx, path1.Metadata(), chWithBootstrap, state.WithBootstrapContents(true))) + + resources, err := suite.State.List(ctx, path1.Metadata()) + suite.Require().NoError(err) + + for _, res := range resources.Items { + select { + case event := <-chWithBootstrap: + suite.Assert().Equal(state.Created, event.Type) + suite.Assert().Equal(res.String(), event.Resource.String()) + suite.Assert().Equal(res.Metadata().Version(), event.Resource.Metadata().Version()) + case <-time.After(time.Second): + suite.FailNow("timed out waiting for event") + } + } + + oldVersion = path2.Metadata().Version() + path2.Metadata().BumpVersion() + + suite.Require().NoError(suite.State.Update(ctx, oldVersion, path2)) + + select { + case event := <-ch: + suite.Assert().Equal(state.Updated, event.Type) + suite.Assert().Equal(path2.String(), event.Resource.String()) + suite.Assert().Equal(path2.Metadata().Version(), event.Resource.Metadata().Version()) + case <-time.After(time.Second): + suite.FailNow("timed out waiting for event") + } + + select { + case event := <-chWithBootstrap: + suite.Assert().Equal(state.Updated, event.Type) + suite.Assert().Equal(path2.String(), event.Resource.String()) + suite.Assert().Equal(path2.Metadata().Version(), event.Resource.Metadata().Version()) + case <-time.After(time.Second): + suite.FailNow("timed out waiting for event") + } } // TestConcurrentFinalizers perform concurrent finalizer updates. diff --git a/pkg/state/impl/inmem/collection.go b/pkg/state/impl/inmem/collection.go index dd1a4d06..6b247784 100644 --- a/pkg/state/impl/inmem/collection.go +++ b/pkg/state/impl/inmem/collection.go @@ -254,13 +254,47 @@ func (collection *ResourceCollection) Watch(ctx context.Context, id resource.ID, } // WatchAll for any resource change stored in this collection. -func (collection *ResourceCollection) WatchAll(ctx context.Context, ch chan<- state.Event) error { +func (collection *ResourceCollection) WatchAll(ctx context.Context, ch chan<- state.Event, opts ...state.WatchKindOption) error { + var options state.WatchKindOptions + + for _, opt := range opts { + opt(&options) + } + collection.mu.Lock() defer collection.mu.Unlock() pos := collection.writePos + var bootstrapList []resource.Resource + + if options.BootstrapContents { + bootstrapList = make([]resource.Resource, 0, len(collection.storage)) + + for _, res := range collection.storage { + bootstrapList = append(bootstrapList, res.DeepCopy()) + } + + sort.Slice(bootstrapList, func(i, j int) bool { + return bootstrapList[i].Metadata().ID() < bootstrapList[j].Metadata().ID() + }) + } + go func() { + // send initial contents if they were captured + for _, res := range bootstrapList { + select { + case ch <- state.Event{ + Type: state.Created, + Resource: res, + }: + case <-ctx.Done(): + return + } + } + + bootstrapList = nil + for { collection.mu.Lock() // while there's no data to consume (pos == e.writePos), wait for Condition variable signal, diff --git a/pkg/state/impl/inmem/inmem.go b/pkg/state/impl/inmem/inmem.go index f5bbd114..7d8f2429 100644 --- a/pkg/state/impl/inmem/inmem.go +++ b/pkg/state/impl/inmem/inmem.go @@ -69,6 +69,6 @@ func (state *State) Watch(ctx context.Context, resourcePointer resource.Pointer, } // WatchKind all resources by type. -func (state *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch chan<- state.Event) error { - return state.getCollection(resourceKind.Type()).WatchAll(ctx, ch) +func (state *State) WatchKind(ctx context.Context, resourceKind resource.Kind, ch chan<- state.Event, opts ...state.WatchKindOption) error { + return state.getCollection(resourceKind.Type()).WatchAll(ctx, ch, opts...) } diff --git a/pkg/state/impl/namespaced/namespaced.go b/pkg/state/impl/namespaced/namespaced.go index 95a11275..857fc30f 100644 --- a/pkg/state/impl/namespaced/namespaced.go +++ b/pkg/state/impl/namespaced/namespaced.go @@ -86,6 +86,6 @@ func (st *State) Watch(ctx context.Context, ptr resource.Pointer, ch chan<- stat } // WatchKind watches resources of specific kind (namespace and type). -func (st *State) WatchKind(ctx context.Context, kind resource.Kind, ch chan<- state.Event) error { - return st.getNamespace(kind.Namespace()).WatchKind(ctx, kind, ch) +func (st *State) WatchKind(ctx context.Context, kind resource.Kind, ch chan<- state.Event, opts ...state.WatchKindOption) error { + return st.getNamespace(kind.Namespace()).WatchKind(ctx, kind, ch, opts...) } diff --git a/pkg/state/options.go b/pkg/state/options.go index 872c8f9d..9ff61436 100644 --- a/pkg/state/options.go +++ b/pkg/state/options.go @@ -45,3 +45,18 @@ type WatchOptions struct{} // WatchOption builds WatchOptions. type WatchOption func(*WatchOptions) + +// WatchKindOptions for the CoreState.WatchKind function. +type WatchKindOptions struct { + BootstrapContents bool +} + +// WatchKindOption builds WatchOptions. +type WatchKindOption func(*WatchKindOptions) + +// WithBootstrapContents enables loading initial list of resources as 'created' events for WatchKind API. +func WithBootstrapContents(enable bool) WatchKindOption { + return func(opts *WatchKindOptions) { + opts.BootstrapContents = enable + } +} diff --git a/pkg/state/state.go b/pkg/state/state.go index 5d799696..ed783389 100644 --- a/pkg/state/state.go +++ b/pkg/state/state.go @@ -70,7 +70,7 @@ type CoreState interface { Watch(context.Context, resource.Pointer, chan<- Event, ...WatchOption) error // WatchKind watches resources of specific kind (namespace and type). - WatchKind(context.Context, resource.Kind, chan<- Event) error + WatchKind(context.Context, resource.Kind, chan<- Event, ...WatchKindOption) error } // UpdaterFunc is called on resource to update it to the desired state.