From c87f5c82eee3d3f69d6cefb4345e6756cf9b3997 Mon Sep 17 00:00:00 2001 From: Vincent Boutour Date: Mon, 18 Sep 2023 13:58:41 +0200 Subject: [PATCH] refactor(cache): Handling simple of multi load in the same way Signed-off-by: Vincent Boutour --- pkg/cache/cache.go | 28 ++++++++++++++++ pkg/cache/list.go | 81 ++++------------------------------------------ 2 files changed, 34 insertions(+), 75 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 9067f8b9..2e1c505a 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -7,6 +7,7 @@ import ( "time" "github.com/ViBiOh/httputils/v4/pkg/cntxt" + "github.com/ViBiOh/httputils/v4/pkg/concurrent" "github.com/ViBiOh/httputils/v4/pkg/model" "github.com/ViBiOh/httputils/v4/pkg/telemetry" "github.com/redis/go-redis/v9" @@ -156,6 +157,33 @@ func (c *Cache[K, V]) fetch(ctx context.Context, id K) (V, error) { return value, err } +func (c *Cache[K, V]) fetchAll(ctx context.Context, items []K) (map[K]V, error) { + if c.onMissMany != nil { + return c.onMissMany(ctx, items) + } + + output := make(map[K]V, len(items)) + + wg := concurrent.NewLimiter(c.concurrency) + + for _, item := range items { + item := item + + wg.Go(func() { + value, err := c.fetch(ctx, item) + if err != nil { + slog.Error("fetch item", "err", err, "item", item) + } + + output[item] = value + }) + } + + wg.Wait() + + return output, nil +} + func (c *Cache[K, V]) decode(content []byte) (value V, ok bool, err error) { if len(content) == 0 { return diff --git a/pkg/cache/list.go b/pkg/cache/list.go index f7208e3c..1a3600ef 100644 --- a/pkg/cache/list.go +++ b/pkg/cache/list.go @@ -7,7 +7,6 @@ import ( "strconv" "github.com/ViBiOh/httputils/v4/pkg/cntxt" - "github.com/ViBiOh/httputils/v4/pkg/concurrent" "github.com/ViBiOh/httputils/v4/pkg/telemetry" "go.opentelemetry.io/otel/trace" ) @@ -26,18 +25,13 @@ func (ii IndexedItems[K]) Items() []K { return output } -// If onMissError returns false, List stops by returning an error -func (c *Cache[K, V]) List(ctx context.Context, onMissError func(K, error) bool, items ...K) (outputs []V, err error) { +func (c *Cache[K, V]) List(ctx context.Context, items ...K) (outputs []V, err error) { if len(items) == 0 { return nil, nil } if c.read == nil || IsBypassed(ctx) { - if c.onMissMany == nil { - return c.listRaw(ctx, onMissError, items...) - } - - return c.listRawMany(ctx, items) + return c.listRaw(ctx, items) } ctx, end := telemetry.StartSpan(ctx, c.tracer, "list", trace.WithSpanKind(trace.SpanKindInternal)) @@ -45,15 +39,11 @@ func (c *Cache[K, V]) List(ctx context.Context, onMissError func(K, error) bool, keys, values := c.getValues(ctx, items) - if c.onMissMany == nil { - return c.handleListSingle(ctx, onMissError, items, keys, values) - } - - return c.handleListMany(ctx, items, keys, values) + return c.handleList(ctx, items, keys, values) } -func (c *Cache[K, V]) listRawMany(ctx context.Context, items []K) ([]V, error) { - values, err := c.onMissMany(ctx, items) +func (c *Cache[K, V]) listRaw(ctx context.Context, items []K) ([]V, error) { + values, err := c.fetchAll(ctx, items) if err != nil { return nil, err } @@ -66,66 +56,7 @@ func (c *Cache[K, V]) listRawMany(ctx context.Context, items []K) ([]V, error) { return output, nil } -func (c *Cache[K, V]) listRaw(ctx context.Context, onMissError func(K, error) bool, items ...K) ([]V, error) { - output := make([]V, len(items)) - - for index, item := range items { - value, err := c.fetch(ctx, item) - if err != nil { - if !onMissError(item, err) { - return nil, err - } - - continue - } - - output[index] = value - } - - return output, nil -} - -func (c *Cache[K, V]) handleListSingle(ctx context.Context, onMissError func(K, error) bool, items []K, keys, values []string) ([]V, error) { - output := make([]V, len(items)) - wg := concurrent.NewFailFast(c.concurrency) - ctx = wg.WithContext(ctx) - - var extendKeys []string - - for index, item := range items { - index, item := index, item - - wg.Go(func() error { - value, ok, err := c.decode([]byte(values[index])) - if ok { - output[index] = value - - if c.ttl != 0 && c.extendOnHit { - extendKeys = append(extendKeys, keys[index]) - } - - return nil - } - - if err != nil { - logUnmarshalError(ctx, c.toKey(item), err) - } - - if output[index], err = c.fetch(ctx, item); err != nil && !onMissError(item, err) { - return err - } - - return nil - }) - } - - c.extendTTL(ctx, extendKeys...) - - return output, wg.Wait() -} - -// Param fetchMany has to return the same number of values as requested and in the same order -func (c *Cache[K, V]) handleListMany(ctx context.Context, items []K, keys, values []string) ([]V, error) { +func (c *Cache[K, V]) handleList(ctx context.Context, items []K, keys, values []string) ([]V, error) { var extendKeys []string missingKeys := make(IndexedItems[K])