Skip to content

Commit

Permalink
fix(kuma-cp): don't cache filtered data (#5574)
Browse files Browse the repository at this point in the history
Signed-off-by: Lukasz Dziedziak <lukidzi@gmail.com>
  • Loading branch information
lukidzi authored Jan 10, 2023
1 parent 562a162 commit f5b8d76
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 5 deletions.
3 changes: 3 additions & 0 deletions pkg/core/resources/manager/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ func (c *cachedManager) Get(ctx context.Context, res model.Resource, fs ...store

func (c *cachedManager) List(ctx context.Context, list model.ResourceList, fs ...store.ListOptionsFunc) error {
opts := store.NewListOptions(fs...)
if !opts.IsCacheable() {
return fmt.Errorf("filter functions are not allowed for cached store")
}
cacheKey := fmt.Sprintf("LIST:%s:%s", list.GetItemType(), opts.HashCode())
obj, found := c.cache.Get(cacheKey)
if !found {
Expand Down
49 changes: 49 additions & 0 deletions pkg/core/resources/manager/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,53 @@ var _ = Describe("Cached Resource Manager", func() {
// then first request does not block request for other type
Expect(err).ToNot(HaveOccurred())
}))

It("should cache List() at different key when ordered", test.Within(5*time.Second, func() {
// when fetched resources multiple times
fetch := func(ordered bool) core_mesh.DataplaneResourceList {
fetched := core_mesh.DataplaneResourceList{}
var err error
if ordered {
err = cachedManager.List(context.Background(), &fetched, core_store.ListOrdered(), core_store.ListByMesh("default"))
} else {
err = cachedManager.List(context.Background(), &fetched, core_store.ListByMesh("default"))
}
Expect(err).ToNot(HaveOccurred())
return fetched
}

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
fetch(false)
wg.Done()
}()
}
wg.Wait()

// then real manager should be called only once
list := fetch(false)
Expect(list.Items).To(HaveLen(1))
Expect(list.Items[0].GetSpec()).To(MatchProto(res.Spec))
Expect(countingManager.listQueries).To(Equal(1))

// when call for ordered data
list = fetch(true)

// then real manager should be called
Expect(list.Items).To(HaveLen(1))
Expect(list.Items[0].GetSpec()).To(MatchProto(res.Spec))
Expect(countingManager.listQueries).To(Equal(2))

// and metrics are published
Expect(test_metrics.FindMetric(metrics, "store_cache", "operation", "list", "result", "miss").Counter.GetValue()).To(Equal(2.0))
hits := test_metrics.FindMetric(metrics, "store_cache", "operation", "list", "result", "hit").Counter.GetValue()
hitWaits := 0.0
hitWaitMetric := test_metrics.FindMetric(metrics, "store_cache", "operation", "list", "result", "hit-wait")
if hitWaitMetric != nil {
hitWaits = hitWaitMetric.Counter.GetValue()
}
Expect(hits + hitWaits).To(Equal(100.0))
}))
})
6 changes: 5 additions & 1 deletion pkg/core/resources/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ func ListOrdered() ListOptionsFunc {
}
}

func (l *ListOptions) IsCacheable() bool {
return l.FilterFunc == nil
}

func (l *ListOptions) HashCode() string {
return l.Mesh
return fmt.Sprintf("%s:%t:%s:%d:%s", l.Mesh, l.Ordered, l.NamePrefix, l.PageSize, l.PageOffset)
}
10 changes: 6 additions & 4 deletions pkg/xds/sync/ingress_proxy_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/kumahq/kuma/pkg/core/resources/manager"
core_model "github.com/kumahq/kuma/pkg/core/resources/model"
"github.com/kumahq/kuma/pkg/core/resources/registry"
"github.com/kumahq/kuma/pkg/core/resources/store"
core_store "github.com/kumahq/kuma/pkg/core/resources/store"
core_xds "github.com/kumahq/kuma/pkg/core/xds"
xds_cache "github.com/kumahq/kuma/pkg/xds/cache/mesh"
Expand Down Expand Up @@ -151,15 +152,16 @@ func (p *IngressProxyBuilder) updateIngress(ctx context.Context, zoneIngress *co

func (p *IngressProxyBuilder) getIngressExternalServices(ctx context.Context) (*core_mesh.ExternalServiceResourceList, error) {
meshList := &core_mesh.MeshResourceList{}
if err := p.ReadOnlyResManager.List(ctx, meshList, core_store.ListOrdered(), core_store.ListByFilterFunc(func(rs core_model.Resource) bool {
return rs.(*core_mesh.MeshResource).ZoneEgressEnabled()
})); err != nil {
if err := p.ReadOnlyResManager.List(ctx, meshList, store.ListOrdered()); err != nil {
return nil, err
}

allMeshExternalServices := &core_mesh.ExternalServiceResourceList{}
var externalServices []*core_mesh.ExternalServiceResource
for _, mesh := range meshList.GetItems() {
for _, mesh := range meshList.Items {
if !mesh.ZoneEgressEnabled() {
continue
}
meshName := mesh.GetMeta().GetName()

meshCtx, err := p.meshCache.GetMeshContext(ctx, syncLog, meshName)
Expand Down

0 comments on commit f5b8d76

Please sign in to comment.