Skip to content

Commit

Permalink
feat: use sync.Map
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored and flemzord committed May 12, 2023
1 parent 2434ab1 commit 1be7a5a
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 47 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/Masterminds/semver/v3 v3.2.0
github.com/ThreeDotsLabs/watermill v1.2.0
github.com/alitto/pond v1.8.3
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10
github.com/bluele/gcache v0.0.2
github.com/formancehq/stack/libs/go-libs v0.0.0-20230222164357-55840b21a337
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alitto/pond v1.8.3 h1:ydIqygCLVPqIX/USe5EaV/aSRXTRXDEI9JwuDdu+/xs=
github.com/alitto/pond v1.8.3/go.mod h1:CmvIIGd5jKLasGI3D87qDkQxjzChdKMmnXMg3fG6M6Q=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 h1:yL7+Jz0jTC6yykIK/Wh74gnTJnrGr5AyrNMXuA0gves=
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY=
Expand Down
126 changes: 80 additions & 46 deletions pkg/ledger/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,22 @@ import (

"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/opentelemetry/metrics"
"github.com/formancehq/ledger/pkg/storage"
"github.com/formancehq/stack/libs/go-libs/metadata"
"golang.org/x/sync/singleflight"
"github.com/pkg/errors"
)

type Store interface {
GetAccountWithVolumes(ctx context.Context, addr string) (*core.AccountWithVolumes, error)
}
type AccountComputerFn func(ctx context.Context, address string) (*core.AccountWithVolumes, error)

func (fn AccountComputerFn) ComputeAccount(ctx context.Context, address string) (*core.AccountWithVolumes, error) {
return fn(ctx, address)
}

type cacheEntry struct {
sync.Mutex
account *core.AccountWithVolumes
lastUsed time.Time
inUse atomic.Int64
ready chan struct{}
evicted chan struct{}
}

type Release func()
Expand All @@ -36,50 +34,58 @@ type Release func()
// and update metrics:
// c.metricsRegistry.CacheNumberEntries.Add(ctx, -1) for all evicted entries
type Cache struct {
mu sync.RWMutex
cache map[string]*cacheEntry
cache sync.Map
store Store
sg singleflight.Group
metricsRegistry metrics.PerLedgerMetricsRegistry
counter atomic.Int64
}

func (c *Cache) getEntry(ctx context.Context, address string) (*cacheEntry, error) {
item, err, _ := c.sg.Do(address, func() (interface{}, error) {
c.mu.RLock()
entry, ok := c.cache[address]
c.mu.RUnlock()
if !ok {
// cache miss
c.metricsRegistry.CacheMisses().Add(ctx, 1)

account, err := c.store.GetAccountWithVolumes(ctx, address)
if err != nil {
return nil, err
}

entry = &cacheEntry{
account: account,
lastUsed: time.Now(),
}
c.mu.Lock()
c.cache[address] = entry
c.mu.Unlock()

if c.metricsRegistry != nil {
c.metricsRegistry.CacheNumberEntries().Add(ctx, +1)
}

return entry, nil
func (c *Cache) loadEntry(ctx context.Context, address string, inUse bool) (*cacheEntry, error) {

ce := &cacheEntry{
ready: make(chan struct{}),
evicted: make(chan struct{}),
}
entry, loaded := c.cache.LoadOrStore(address, ce)
if !loaded {
// cache miss
c.metricsRegistry.CacheMisses().Add(ctx, 1)

account, err := c.store.GetAccountWithVolumes(ctx, address)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
panic(err)
}
return entry, nil
})
return item.(*cacheEntry), err
if errors.Is(err, storage.ErrNotFound) {
account = core.NewAccountWithVolumes(address)
}
ce.account = account

close(ce.ready)
c.metricsRegistry.CacheNumberEntries().Add(ctx, 1)
}

ce = entry.(*cacheEntry)

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ce.ready:
case <-ce.evicted:
return c.loadEntry(ctx, address, inUse)
}

ce.lastUsed = time.Now()
if inUse {
ce.inUse.Add(1)
}

return ce, nil
}

func (c *Cache) LockAccounts(ctx context.Context, address ...string) (Release, error) {
entries := make([]*cacheEntry, 0)
for _, address := range address {
entry, err := c.getEntry(ctx, address)
entry, err := c.loadEntry(ctx, address, true)
if err != nil {
return nil, err
}
Expand All @@ -102,7 +108,7 @@ func (c *Cache) LockAccounts(ctx context.Context, address ...string) (Release, e
func (c *Cache) GetAccountWithVolumes(ctx context.Context, address string) (*core.AccountWithVolumes, error) {
address = strings.TrimPrefix(address, "@")

entry, err := c.getEntry(ctx, address)
entry, err := c.loadEntry(ctx, address, false)
if err != nil {
return nil, err
}
Expand All @@ -114,12 +120,11 @@ func (c *Cache) GetAccountWithVolumes(ctx context.Context, address string) (*cor
}

func (c *Cache) withLockOnAccount(address string, callback func(account *core.AccountWithVolumes)) {
c.mu.Lock()
entry, ok := c.cache[address]
c.mu.Unlock()
e, ok := c.cache.Load(address)
if !ok {
panic("cache empty for address: " + address)
}
entry := e.(*cacheEntry)
entry.Lock()
defer entry.Unlock()

Expand Down Expand Up @@ -157,10 +162,39 @@ func (c *Cache) UpdateAccountMetadata(address string, m metadata.Metadata) error
return nil
}

//func (c *Cache) runEviction() {
// for {
// select {
// case <-time.After(time.Minute):
// c.cache.Range(func(key, value any) bool {
// cacheEntry := value.(*cacheEntry)
// if cacheEntry.inUse.Load() == 0 {
//
// }
// return true
// })
// }
// }
//}

func New(store Store, metricsRegistry metrics.PerLedgerMetricsRegistry) *Cache {
return &Cache{
if metricsRegistry == nil {
metricsRegistry = metrics.NewNoOpMetricsRegistry()
}
c := &Cache{
store: store,
cache: map[string]*cacheEntry{},
metricsRegistry: metricsRegistry,
}
go func() {
for {
c.cache.Range(func(key, value any) bool {
entry := value.(*cacheEntry)
if entry.inUse.Load() == 0 && entry.lastUsed.Before(time.Now().Add(-time.Minute)) {
entry.
}
})
}
}()

return c
}
40 changes: 39 additions & 1 deletion pkg/ledger/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package cache

import (
"context"
"fmt"
"math/big"
"testing"
"time"

"github.com/alitto/pond"
"github.com/formancehq/ledger/pkg/core"
"github.com/formancehq/ledger/pkg/opentelemetry/metrics"
"github.com/formancehq/ledger/pkg/storage"
"github.com/formancehq/stack/libs/go-libs/metadata"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
Expand All @@ -22,6 +26,8 @@ type mockAccountComputer struct {
}

func (c *mockAccountComputer) GetAccountWithVolumes(ctx context.Context, address string) (*core.AccountWithVolumes, error) {
// Simulate a real process for benchs
<-time.After(5 * time.Millisecond)
c.calls = append(c.calls, mockCall{
address: address,
})
Expand All @@ -30,7 +36,7 @@ func (c *mockAccountComputer) GetAccountWithVolumes(ctx context.Context, address
return accountWithVolumes, nil
}
}
return nil, nil
return nil, storage.ErrNotFound
}

func TestParallelRead(t *testing.T) {
Expand Down Expand Up @@ -161,3 +167,35 @@ func TestUpdateVolumes(t *testing.T) {
},
}, *worldAccount)
}

func BenchmarkCache(b *testing.B) {
accounts := []*core.AccountWithVolumes{
{
Account: core.Account{
Address: "world",
},
Volumes: map[string]core.Volumes{},
},
}
accountsNumber := 100
for i := 0; i < accountsNumber; i++ {
accounts = append(accounts, &core.AccountWithVolumes{
Account: core.NewAccount(fmt.Sprintf("account:%d", i)),
Volumes: map[string]core.Volumes{},
})
}
mock := &mockAccountComputer{
accounts: accounts,
}
cache := New(mock, metrics.NewNoOpMetricsRegistry())

pool := pond.New(accountsNumber, accountsNumber)
b.ResetTimer()
for i := 0; i < b.N; i++ {
pool.Submit(func() {
_, err := cache.GetAccountWithVolumes(context.Background(), fmt.Sprintf("account:%d", i%accountsNumber))
require.NoError(b, err)
})
}
pool.StopAndWait()
}

0 comments on commit 1be7a5a

Please sign in to comment.