diff --git a/go.mod b/go.mod index 6e67e01d..cc4f60f8 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.22.0 toolchain go1.22.7 -replace github.com/rancher/lasso => github.com/tomleb/rancher-lasso v0.0.0-20240906163555-e567f1ea2d53 +replace github.com/rancher/lasso => github.com/tomleb/rancher-lasso v0.0.0-20240916204456-7d2f79a45cda replace ( github.com/crewjam/saml => github.com/rancher/saml v0.2.0 diff --git a/go.sum b/go.sum index b7bee9e2..116232e3 100644 --- a/go.sum +++ b/go.sum @@ -223,8 +223,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/tomleb/rancher-lasso v0.0.0-20240906163555-e567f1ea2d53 h1:2PYvKjEB664bZn/+1ztEIMpvyDtuj6bUvQvpVva54NU= -github.com/tomleb/rancher-lasso v0.0.0-20240906163555-e567f1ea2d53/go.mod h1:Efx/+BbH3ivmnTPLu5cA3Gc9wT5oyGS0LBcqEuYTx+A= +github.com/tomleb/rancher-lasso v0.0.0-20240916204456-7d2f79a45cda h1:kzbjaj+1ccTE5242HLlGdab7h8TIs0GqbH5jgX6Um94= +github.com/tomleb/rancher-lasso v0.0.0-20240916204456-7d2f79a45cda/go.mod h1:Efx/+BbH3ivmnTPLu5cA3Gc9wT5oyGS0LBcqEuYTx+A= github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk= github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA= github.com/urfave/cli/v2 v2.27.4 h1:o1owoI+02Eb+K107p27wEX9Bb8eqIoZCfLXloLUSWJ8= diff --git a/pkg/stores/sqlpartition/partition_mocks_test.go b/pkg/stores/sqlpartition/partition_mocks_test.go index bf39c16d..af2f109a 100644 --- a/pkg/stores/sqlpartition/partition_mocks_test.go +++ b/pkg/stores/sqlpartition/partition_mocks_test.go @@ -11,7 +11,6 @@ import ( types "github.com/rancher/apiserver/pkg/types" partition "github.com/rancher/lasso/pkg/cache/sql/partition" unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - watch "k8s.io/apimachinery/pkg/watch" ) // MockPartitioner is a mock of Partitioner interface. @@ -138,10 +137,10 @@ func (mr *MockUnstructuredStoreMockRecorder) Delete(arg0, arg1, arg2 interface{} } // ListByPartitions mocks base method. -func (m *MockUnstructuredStore) ListByPartitions(arg0 *types.APIRequest, arg1 *types.APISchema, arg2 []partition.Partition) ([]unstructured.Unstructured, int, string, error) { +func (m *MockUnstructuredStore) ListByPartitions(arg0 *types.APIRequest, arg1 *types.APISchema, arg2 []partition.Partition) (*unstructured.UnstructuredList, int, string, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListByPartitions", arg0, arg1, arg2) - ret0, _ := ret[0].([]unstructured.Unstructured) + ret0, _ := ret[0].(*unstructured.UnstructuredList) ret1, _ := ret[1].(int) ret2, _ := ret[2].(string) ret3, _ := ret[3].(error) @@ -171,10 +170,10 @@ func (mr *MockUnstructuredStoreMockRecorder) Update(arg0, arg1, arg2, arg3 inter } // WatchByPartitions mocks base method. -func (m *MockUnstructuredStore) WatchByPartitions(arg0 *types.APIRequest, arg1 *types.APISchema, arg2 types.WatchRequest, arg3 []partition.Partition) (chan watch.Event, error) { +func (m *MockUnstructuredStore) WatchByPartitions(arg0 *types.APIRequest, arg1 *types.APISchema, arg2 types.WatchRequest, arg3 []partition.Partition) (chan struct{}, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "WatchByPartitions", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].(chan watch.Event) + ret0, _ := ret[0].(chan struct{}) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/pkg/stores/sqlpartition/partitioner.go b/pkg/stores/sqlpartition/partitioner.go index 5cd3331c..f9bb308b 100644 --- a/pkg/stores/sqlpartition/partitioner.go +++ b/pkg/stores/sqlpartition/partitioner.go @@ -27,7 +27,7 @@ type UnstructuredStore interface { Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, []types.Warning, error) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, []types.Warning, error) - ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) ([]unstructured.Unstructured, int, string, error) + ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) (*unstructured.UnstructuredList, int, string, error) WatchByPartitions(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest, partitions []partition.Partition) (chan struct{}, error) } diff --git a/pkg/stores/sqlpartition/store.go b/pkg/stores/sqlpartition/store.go index 2be849bc..74aa3f81 100644 --- a/pkg/stores/sqlpartition/store.go +++ b/pkg/stores/sqlpartition/store.go @@ -92,13 +92,13 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP result.Count = total - for _, item := range list { + for _, item := range list.Items { item := item.DeepCopy() // the sql cache automatically adds the ID through a transformFunc. Because of this, we have a different set of reserved fields for the SQL cache result.Objects = append(result.Objects, partition.ToAPI(schema, item, nil, s.sqlReservedFields)) } - result.Revision = "" + result.Revision = list.GetResourceVersion() result.Continue = continueToken return result, nil } @@ -145,6 +145,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types for range c { response <- types.APIEvent{ + Name: "resource.change", ResourceType: schema.ID, } } diff --git a/pkg/stores/sqlproxy/proxy_store.go b/pkg/stores/sqlproxy/proxy_store.go index 7c0001ac..b790c6ed 100644 --- a/pkg/stores/sqlproxy/proxy_store.go +++ b/pkg/stores/sqlproxy/proxy_store.go @@ -9,6 +9,7 @@ import ( "io" "io/ioutil" "net/http" + "strconv" "strings" "sync" "time" @@ -507,7 +508,7 @@ func (s *Store) CacheFor(client dynamic.ResourceInterface, schema *types.APISche // - the total number of resources (returned list might be a subset depending on pagination options in apiOp) // - a continue token, if there are more pages after the returned one // - an error instead of all of the above if anything went wrong -func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) ([]unstructured.Unstructured, int, string, error) { +func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchema, partitions []partition.Partition) (*unstructured.UnstructuredList, int, string, error) { opts, err := listprocessor.ParseQuery(apiOp, s.namespaceCache) if err != nil { return nil, 0, "", err @@ -532,13 +533,22 @@ func (s *Store) ListByPartitions(apiOp *types.APIRequest, schema *types.APISchem return nil, 0, "", err } - return list.Items, total, continueToken, nil + return list, total, continueToken, nil } // WatchByPartitions returns a channel of events for a list or resource belonging to any of the specified partitions func (s *Store) WatchByPartitions(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest, partitions []partition.Partition) (chan struct{}, error) { ctx := apiOp.Context() + revision := 0 + if wr.Revision != "" { + parsedRevision, err := strconv.ParseInt(wr.Revision, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid revision %q: %w", wr.Revision, err) + } + revision = int(parsedRevision) + } + // XXX: Why was this needed at all?? apiOp = apiOp.Clone().WithContext(ctx) @@ -555,7 +565,10 @@ func (s *Store) WatchByPartitions(apiOp *types.APIRequest, schema *types.APISche } debounceListener := newDebounceListener(5 * time.Second) - inf.Watch(ctx, debounceListener) + latestRevision := inf.Watch(ctx, debounceListener) + if latestRevision > revision { + debounceListener.NotifyNow() + } go debounceListener.Run(ctx) return debounceListener.ch, nil diff --git a/pkg/stores/sqlproxy/watchers.go b/pkg/stores/sqlproxy/watchers.go index d33c7449..6eff37e6 100644 --- a/pkg/stores/sqlproxy/watchers.go +++ b/pkg/stores/sqlproxy/watchers.go @@ -2,7 +2,7 @@ package sqlproxy import ( "context" - "sync/atomic" + "sync" "time" "github.com/rancher/lasso/pkg/cache/sql/informer" @@ -11,8 +11,10 @@ import ( var _ informer.Listener = (*debounceListener)(nil) type debounceListener struct { + lock sync.Mutex + notified bool + debounceRate time.Duration - notified atomic.Bool ch chan struct{} } @@ -25,26 +27,32 @@ func newDebounceListener(debounceRate time.Duration) *debounceListener { } func (d *debounceListener) Run(ctx context.Context) { - d.ch <- struct{}{} ticker := time.NewTicker(d.debounceRate) defer ticker.Stop() for { select { case <-ctx.Done(): + close(d.ch) return case <-ticker.C: - if !d.notified.Swap(false) { - continue + d.lock.Lock() + if d.notified { + d.ch <- struct{}{} } - d.ch <- struct{}{} + d.notified = false + d.lock.Unlock() } } } -func (d *debounceListener) Notify() { - d.notified.Store(true) +func (d *debounceListener) NotifyNow() { + d.lock.Lock() + defer d.lock.Unlock() + d.ch <- struct{}{} } -func (d *debounceListener) Close() { - close(d.ch) +func (d *debounceListener) Notify() { + d.lock.Lock() + defer d.lock.Unlock() + d.notified = true }