Skip to content

Commit

Permalink
refactor(cache): Handling simple of multi load in the same way
Browse files Browse the repository at this point in the history
Signed-off-by: Vincent Boutour <bob@vibioh.fr>
  • Loading branch information
ViBiOh committed Sep 18, 2023
1 parent eed674b commit c87f5c8
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 75 deletions.
28 changes: 28 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/ViBiOh/httputils/v4/pkg/cntxt"
"github.com/ViBiOh/httputils/v4/pkg/concurrent"
"github.com/ViBiOh/httputils/v4/pkg/model"
"github.com/ViBiOh/httputils/v4/pkg/telemetry"
"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -156,6 +157,33 @@ func (c *Cache[K, V]) fetch(ctx context.Context, id K) (V, error) {
return value, err
}

func (c *Cache[K, V]) fetchAll(ctx context.Context, items []K) (map[K]V, error) {
if c.onMissMany != nil {
return c.onMissMany(ctx, items)
}

output := make(map[K]V, len(items))

wg := concurrent.NewLimiter(c.concurrency)

for _, item := range items {
item := item

wg.Go(func() {
value, err := c.fetch(ctx, item)
if err != nil {
slog.Error("fetch item", "err", err, "item", item)
}

output[item] = value
})
}

wg.Wait()

return output, nil
}

func (c *Cache[K, V]) decode(content []byte) (value V, ok bool, err error) {
if len(content) == 0 {
return
Expand Down
81 changes: 6 additions & 75 deletions pkg/cache/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"

"github.com/ViBiOh/httputils/v4/pkg/cntxt"
"github.com/ViBiOh/httputils/v4/pkg/concurrent"
"github.com/ViBiOh/httputils/v4/pkg/telemetry"
"go.opentelemetry.io/otel/trace"
)
Expand All @@ -26,34 +25,25 @@ func (ii IndexedItems[K]) Items() []K {
return output
}

// If onMissError returns false, List stops by returning an error
func (c *Cache[K, V]) List(ctx context.Context, onMissError func(K, error) bool, items ...K) (outputs []V, err error) {
func (c *Cache[K, V]) List(ctx context.Context, items ...K) (outputs []V, err error) {
if len(items) == 0 {
return nil, nil
}

if c.read == nil || IsBypassed(ctx) {
if c.onMissMany == nil {
return c.listRaw(ctx, onMissError, items...)
}

return c.listRawMany(ctx, items)
return c.listRaw(ctx, items)
}

ctx, end := telemetry.StartSpan(ctx, c.tracer, "list", trace.WithSpanKind(trace.SpanKindInternal))
defer end(&err)

keys, values := c.getValues(ctx, items)

if c.onMissMany == nil {
return c.handleListSingle(ctx, onMissError, items, keys, values)
}

return c.handleListMany(ctx, items, keys, values)
return c.handleList(ctx, items, keys, values)
}

func (c *Cache[K, V]) listRawMany(ctx context.Context, items []K) ([]V, error) {
values, err := c.onMissMany(ctx, items)
func (c *Cache[K, V]) listRaw(ctx context.Context, items []K) ([]V, error) {
values, err := c.fetchAll(ctx, items)
if err != nil {
return nil, err
}
Expand All @@ -66,66 +56,7 @@ func (c *Cache[K, V]) listRawMany(ctx context.Context, items []K) ([]V, error) {
return output, nil
}

func (c *Cache[K, V]) listRaw(ctx context.Context, onMissError func(K, error) bool, items ...K) ([]V, error) {
output := make([]V, len(items))

for index, item := range items {
value, err := c.fetch(ctx, item)
if err != nil {
if !onMissError(item, err) {
return nil, err
}

continue
}

output[index] = value
}

return output, nil
}

func (c *Cache[K, V]) handleListSingle(ctx context.Context, onMissError func(K, error) bool, items []K, keys, values []string) ([]V, error) {
output := make([]V, len(items))
wg := concurrent.NewFailFast(c.concurrency)
ctx = wg.WithContext(ctx)

var extendKeys []string

for index, item := range items {
index, item := index, item

wg.Go(func() error {
value, ok, err := c.decode([]byte(values[index]))
if ok {
output[index] = value

if c.ttl != 0 && c.extendOnHit {
extendKeys = append(extendKeys, keys[index])
}

return nil
}

if err != nil {
logUnmarshalError(ctx, c.toKey(item), err)
}

if output[index], err = c.fetch(ctx, item); err != nil && !onMissError(item, err) {
return err
}

return nil
})
}

c.extendTTL(ctx, extendKeys...)

return output, wg.Wait()
}

// Param fetchMany has to return the same number of values as requested and in the same order
func (c *Cache[K, V]) handleListMany(ctx context.Context, items []K, keys, values []string) ([]V, error) {
func (c *Cache[K, V]) handleList(ctx context.Context, items []K, keys, values []string) ([]V, error) {
var extendKeys []string

missingKeys := make(IndexedItems[K])
Expand Down

0 comments on commit c87f5c8

Please sign in to comment.