Skip to content

Commit

Permalink
Added EvictAll method for batching eviction requests
Browse files Browse the repository at this point in the history
  • Loading branch information
MysteriousPotato committed Nov 6, 2023
1 parent ee90b08 commit dd95be8
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 75 deletions.
12 changes: 6 additions & 6 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func TestMultiNodeCacheTable(t *testing.T) {
tests := []struct {
op string
key string
keys []string
value string
}{
{op: "get", key: "1"},
Expand All @@ -217,8 +218,7 @@ func TestMultiNodeCacheTable(t *testing.T) {
{op: "evict", key: "2"},
{op: "get", key: "2"},
{op: "put", key: "2", value: "2"},
{op: "evict", key: "1"},
{op: "evict", key: "2"},
{op: "evictAll", keys: []string{"1", "2"}},
}

for _, table := range tables {
Expand All @@ -233,24 +233,24 @@ func TestMultiNodeCacheTable(t *testing.T) {
t.Fatal(err)
}
got = append(got, v)
break
case "put":
if err := table.Put(ctx, tt.key, tt.value, time.Hour); err != nil {
t.Fatal(err)
}
break
case "evict":
if err := table.Evict(ctx, tt.key); err != nil {
t.Fatal(err)
}
break
case "evictAll":
if err := table.EvictAll(ctx, tt.keys); err != nil {
t.Fatal(err)
}
case "call":
v, err := table.Call(ctx, tt.key, "procedure", []byte{})
if err != nil {
t.Fatal(err)
}
got = append(got, v)
break
}
}
if !reflect.DeepEqual(got, expected) {
Expand Down
4 changes: 2 additions & 2 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func incPut(ms ...*metrics) {
}
}

func incEvict(ms ...*metrics) {
func incEvict(delta int64, ms ...*metrics) {
for _, m := range ms {
m.Evict.Add(1)
m.Evict.Add(delta)
}
}

Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ nitecache is an embedded and distributed cache library for golang that supports:
- requires go version >= 1.21

```sh
go get github.com/MysteriousPotato/nitecache@v0.4.1
go get github.com/MysteriousPotato/nitecache@v0.4.2
```

### Usage
Expand Down
11 changes: 11 additions & 0 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,17 @@ func (s service) Evict(_ context.Context, r *servicepb.EvictRequest) (*servicepb
return &servicepb.Empty{}, t.evictLocally(r.Key)
}

func (s service) EvictAll(_ context.Context, r *servicepb.EvictAllRequest) (*servicepb.Empty, error) {
t, err := s.cache.getTable(r.Table)
if err != nil {
return nil, err
}

t.evictAllLocally(r.Keys)

return &servicepb.Empty{}, nil
}

func (s service) Call(ctx context.Context, r *servicepb.CallRequest) (*servicepb.CallResponse, error) {
t, err := s.cache.getTable(r.Table)
if err != nil {
Expand Down
200 changes: 139 additions & 61 deletions servicepb/service.pb.go

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions servicepb/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ service Service{
rpc Get(GetRequest) returns (GetResponse) {}
rpc Put(PutRequest) returns (Empty) {}
rpc Evict(EvictRequest) returns (Empty) {}
rpc EvictAll(EvictAllRequest) returns (Empty) {}
rpc Call(CallRequest) returns (CallResponse) {}
rpc HealthCheck(Empty) returns (Empty) {}
}
Expand Down Expand Up @@ -38,6 +39,11 @@ message EvictRequest{
string key = 2;
}

message EvictAllRequest{
string table = 1;
repeated string keys = 2;
}

message CallRequest{
string table = 1;
string key = 2;
Expand Down
37 changes: 37 additions & 0 deletions servicepb/service_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

125 changes: 120 additions & 5 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"github.com/MysteriousPotato/nitecache/inmem"
"strings"
"time"

"github.com/MysteriousPotato/nitecache/servicepb"
Expand All @@ -19,6 +20,14 @@ var (
// Procedure defines the type used for registering RPCs through [TableBuilder.WithProcedure].
type Procedure[T any] func(ctx context.Context, v T, args []byte) (T, time.Duration, error)

type (
BatchEvictionErrs []batchEvictionErr
batchEvictionErr struct {
keys []string
err error
}
)

type Table[T any] struct {
name string
store *inmem.Store[string, []byte]
Expand Down Expand Up @@ -51,7 +60,7 @@ func (t *Table[T]) Get(ctx context.Context, key string) (T, error) {
var item inmem.Item[[]byte]
var hit bool
if ownerID == t.cache.self.ID {
item, hit, err = t.getLocally(key)
item, hit, err = t.getLocally(ctx, key)
} else {
client, err := t.cache.getClient(ownerID)
if err != nil {
Expand Down Expand Up @@ -139,6 +148,74 @@ func (t *Table[T]) Evict(ctx context.Context, key string) error {
return nil
}

// EvictAll attempts to remove all entries from the Table for the given keys.
//
// Keys owned by the same client are batched together for efficiency.
//
// After the operation, a BatchEvictionErrs detailing which keys (if any) failed to be evicted can be retrieved when checking the returned error.
// Example:
//
// if errs, ok := err.(nitecache.BatchEvictionErrs); ok {
// // Note that keys that AffectedKeys may return keys that were actually evicted successfully.
// keysThatFailed := errs.AffectedKeys()
// }
func (t *Table[T]) EvictAll(ctx context.Context, keys []string) error {
if t.isZero() {
return ErrCacheDestroyed
}

type clientKeys struct {
client *client
keys []string
}

var selfKeys []string
clientKeysMap := map[string]*clientKeys{}
for _, key := range keys {
ownerID, err := t.cache.ring.GetOwner(key)
if err != nil {
return err
}

if ownerID == t.cache.self.ID {
selfKeys = append(selfKeys, key)
continue
}

if _, ok := clientKeysMap[ownerID]; !ok {
c, err := t.cache.getClient(ownerID)
if err != nil {
return err
}

clientKeysMap[ownerID] = &clientKeys{
client: c,
keys: []string{key},
}
continue
}

clientKeysMap[ownerID].keys = append(clientKeysMap[ownerID].keys, key)
}

t.evictAllLocally(selfKeys)

var errs BatchEvictionErrs
for _, c := range clientKeysMap {
if err := t.evictAllFromPeer(ctx, c.keys, c.client); err != nil {
errs = append(errs, batchEvictionErr{
keys: c.keys,
err: err,
})
}
}

if errs != nil {
return errs
}
return nil
}

// Call calls an RPC previously registered through [TableBuilder.WithProcedure] on the owner node to update the value for the given key.
//
// Call acquires a lock exclusive to the given key until the RPC has finished executing.
Expand Down Expand Up @@ -231,10 +308,10 @@ func (t *Table[T]) GetMetrics() (Metrics, error) {
return t.metrics.getCopy(), nil
}

func (t *Table[T]) getLocally(key string) (inmem.Item[[]byte], bool, error) {
func (t *Table[T]) getLocally(ctx context.Context, key string) (inmem.Item[[]byte], bool, error) {
incGet(t.metrics, t.cache.metrics)
sfRes, err, _ := t.getSF.Do(key, func() (any, error) {
item, hit, err := t.store.Get(key)
item, hit, err := t.store.Get(ctx, key)
if !hit {
incMiss(t.metrics, t.cache.metrics)
}
Expand All @@ -255,14 +332,19 @@ func (t *Table[T]) putLocally(key string, item inmem.Item[[]byte]) error {
}

func (t *Table[T]) evictLocally(key string) error {
incEvict(t.metrics, t.cache.metrics)
incEvict(1, t.metrics, t.cache.metrics)
_, _, _ = t.evictSF.Do(key, func() (any, error) {
t.store.Evict(key)
return nil, nil
})
return nil
}

func (t *Table[T]) evictAllLocally(keys []string) {
incEvict(int64(len(keys)), t.metrics, t.cache.metrics)
t.store.EvictAll(keys)
}

func (t *Table[T]) callLocally(ctx context.Context, key, procedure string, args []byte) (inmem.Item[[]byte], error) {
incCalls(procedure, t.metrics, t.cache.metrics)

Expand Down Expand Up @@ -362,6 +444,20 @@ func (t *Table[T]) evictFromPeer(ctx context.Context, key string, owner *client)
return err
}

func (t *Table[T]) evictAllFromPeer(ctx context.Context, keys []string, owner *client) error {
if _, err := owner.EvictAll(ctx, &servicepb.EvictAllRequest{
Table: t.name,
Keys: keys,
}); err != nil {
return err
}

if t.hotStore != nil {
t.hotStore.EvictAll(keys)
}
return nil
}

func (t *Table[T]) callFromPeer(
ctx context.Context,
key, procedure string,
Expand Down Expand Up @@ -394,7 +490,7 @@ func (t *Table[T]) getFromHotCache(key string) (inmem.Item[[]byte], bool, error)
if t.hotStore == nil {
return inmem.Item[[]byte]{}, false, fmt.Errorf("hot cache not enabled")
}
return t.hotStore.Get(key)
return t.hotStore.Get(context.Background(), key)
}

func (t *Table[T]) tearDown() {
Expand All @@ -411,3 +507,22 @@ func (t *Table[T]) getEmptyValue() T {
var v T
return v
}

func (b BatchEvictionErrs) Error() string {
var errs []string
for _, err := range b {
errs = append(errs, fmt.Sprintf("failed to evict keys %v: %v", err.keys, err.err))
}
return strings.Join(errs, ",")
}

// AffectedKeys returns a list of keys owned by clients who returned an error.
//
// As a result, the list may contain keys that were successfully evicted.
func (b BatchEvictionErrs) AffectedKeys() []string {
var keys []string
for _, err := range b {
keys = append(keys, err.keys...)
}
return keys
}

0 comments on commit dd95be8

Please sign in to comment.