diff --git a/cache_test.go b/cache_test.go index 3837654..5485c52 100644 --- a/cache_test.go +++ b/cache_test.go @@ -201,6 +201,7 @@ func TestMultiNodeCacheTable(t *testing.T) { tests := []struct { op string key string + keys []string value string }{ {op: "get", key: "1"}, @@ -217,8 +218,7 @@ func TestMultiNodeCacheTable(t *testing.T) { {op: "evict", key: "2"}, {op: "get", key: "2"}, {op: "put", key: "2", value: "2"}, - {op: "evict", key: "1"}, - {op: "evict", key: "2"}, + {op: "evictAll", keys: []string{"1", "2"}}, } for _, table := range tables { @@ -233,24 +233,24 @@ func TestMultiNodeCacheTable(t *testing.T) { t.Fatal(err) } got = append(got, v) - break case "put": if err := table.Put(ctx, tt.key, tt.value, time.Hour); err != nil { t.Fatal(err) } - break case "evict": if err := table.Evict(ctx, tt.key); err != nil { t.Fatal(err) } - break + case "evictAll": + if err := table.EvictAll(ctx, tt.keys); err != nil { + t.Fatal(err) + } case "call": v, err := table.Call(ctx, tt.key, "procedure", []byte{}) if err != nil { t.Fatal(err) } got = append(got, v) - break } } if !reflect.DeepEqual(got, expected) { diff --git a/metrics.go b/metrics.go index 6724fdc..aba8e48 100644 --- a/metrics.go +++ b/metrics.go @@ -65,9 +65,9 @@ func incPut(ms ...*metrics) { } } -func incEvict(ms ...*metrics) { +func incEvict(delta int64, ms ...*metrics) { for _, m := range ms { - m.Evict.Add(1) + m.Evict.Add(delta) } } diff --git a/readme.md b/readme.md index dd5f05b..af5c3eb 100644 --- a/readme.md +++ b/readme.md @@ -42,7 +42,7 @@ nitecache is an embedded and distributed cache library for golang that supports: - requires go version >= 1.21 ```sh -go get github.com/MysteriousPotato/nitecache@v0.4.1 +go get github.com/MysteriousPotato/nitecache@v0.4.2 ``` ### Usage diff --git a/service.go b/service.go index f84e623..e42af31 100644 --- a/service.go +++ b/service.go @@ -132,6 +132,17 @@ func (s service) Evict(_ context.Context, r *servicepb.EvictRequest) (*servicepb return &servicepb.Empty{}, t.evictLocally(r.Key) } +func (s service) EvictAll(_ context.Context, r *servicepb.EvictAllRequest) (*servicepb.Empty, error) { + t, err := s.cache.getTable(r.Table) + if err != nil { + return nil, err + } + + t.evictAllLocally(r.Keys) + + return &servicepb.Empty{}, nil +} + func (s service) Call(ctx context.Context, r *servicepb.CallRequest) (*servicepb.CallResponse, error) { t, err := s.cache.getTable(r.Table) if err != nil { diff --git a/servicepb/service.pb.go b/servicepb/service.pb.go index 43d3daa..b8d5ec0 100644 --- a/servicepb/service.pb.go +++ b/servicepb/service.pb.go @@ -303,6 +303,61 @@ func (x *EvictRequest) GetKey() string { return "" } +type EvictAllRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Table string `protobuf:"bytes,1,opt,name=table,proto3" json:"table,omitempty"` + Keys []string `protobuf:"bytes,2,rep,name=keys,proto3" json:"keys,omitempty"` +} + +func (x *EvictAllRequest) Reset() { + *x = EvictAllRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_servicepb_service_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *EvictAllRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*EvictAllRequest) ProtoMessage() {} + +func (x *EvictAllRequest) ProtoReflect() protoreflect.Message { + mi := &file_servicepb_service_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use EvictAllRequest.ProtoReflect.Descriptor instead. +func (*EvictAllRequest) Descriptor() ([]byte, []int) { + return file_servicepb_service_proto_rawDescGZIP(), []int{5} +} + +func (x *EvictAllRequest) GetTable() string { + if x != nil { + return x.Table + } + return "" +} + +func (x *EvictAllRequest) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + type CallRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -317,7 +372,7 @@ type CallRequest struct { func (x *CallRequest) Reset() { *x = CallRequest{} if protoimpl.UnsafeEnabled { - mi := &file_servicepb_service_proto_msgTypes[5] + mi := &file_servicepb_service_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -330,7 +385,7 @@ func (x *CallRequest) String() string { func (*CallRequest) ProtoMessage() {} func (x *CallRequest) ProtoReflect() protoreflect.Message { - mi := &file_servicepb_service_proto_msgTypes[5] + mi := &file_servicepb_service_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -343,7 +398,7 @@ func (x *CallRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use CallRequest.ProtoReflect.Descriptor instead. func (*CallRequest) Descriptor() ([]byte, []int) { - return file_servicepb_service_proto_rawDescGZIP(), []int{5} + return file_servicepb_service_proto_rawDescGZIP(), []int{6} } func (x *CallRequest) GetTable() string { @@ -385,7 +440,7 @@ type CallResponse struct { func (x *CallResponse) Reset() { *x = CallResponse{} if protoimpl.UnsafeEnabled { - mi := &file_servicepb_service_proto_msgTypes[6] + mi := &file_servicepb_service_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -398,7 +453,7 @@ func (x *CallResponse) String() string { func (*CallResponse) ProtoMessage() {} func (x *CallResponse) ProtoReflect() protoreflect.Message { - mi := &file_servicepb_service_proto_msgTypes[6] + mi := &file_servicepb_service_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -411,7 +466,7 @@ func (x *CallResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use CallResponse.ProtoReflect.Descriptor instead. func (*CallResponse) Descriptor() ([]byte, []int) { - return file_servicepb_service_proto_rawDescGZIP(), []int{6} + return file_servicepb_service_proto_rawDescGZIP(), []int{7} } func (x *CallResponse) GetItem() *Item { @@ -430,7 +485,7 @@ type Empty struct { func (x *Empty) Reset() { *x = Empty{} if protoimpl.UnsafeEnabled { - mi := &file_servicepb_service_proto_msgTypes[7] + mi := &file_servicepb_service_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -443,7 +498,7 @@ func (x *Empty) String() string { func (*Empty) ProtoMessage() {} func (x *Empty) ProtoReflect() protoreflect.Message { - mi := &file_servicepb_service_proto_msgTypes[7] + mi := &file_servicepb_service_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -456,7 +511,7 @@ func (x *Empty) ProtoReflect() protoreflect.Message { // Deprecated: Use Empty.ProtoReflect.Descriptor instead. func (*Empty) Descriptor() ([]byte, []int) { - return file_servicepb_service_proto_rawDescGZIP(), []int{7} + return file_servicepb_service_proto_rawDescGZIP(), []int{8} } var File_servicepb_service_proto protoreflect.FileDescriptor @@ -484,37 +539,45 @@ var file_servicepb_service_proto_rawDesc = []byte{ 0x6d, 0x22, 0x36, 0x0a, 0x0c, 0x45, 0x76, 0x69, 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x67, 0x0a, 0x0b, 0x43, 0x61, 0x6c, - 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x62, 0x6c, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x10, - 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, - 0x12, 0x1c, 0x0a, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x64, 0x75, 0x72, 0x65, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x64, 0x75, 0x72, 0x65, 0x12, 0x12, - 0x0a, 0x04, 0x61, 0x72, 0x67, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x61, 0x72, - 0x67, 0x73, 0x22, 0x33, 0x0a, 0x0c, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x23, 0x0a, 0x04, 0x69, 0x74, 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x0f, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x49, 0x74, 0x65, - 0x6d, 0x52, 0x04, 0x69, 0x74, 0x65, 0x6d, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, - 0x32, 0x99, 0x02, 0x0a, 0x07, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x36, 0x0a, 0x03, - 0x47, 0x65, 0x74, 0x12, 0x15, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, - 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x73, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x12, 0x30, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x15, 0x2e, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x34, 0x0a, 0x05, 0x45, 0x76, 0x69, 0x63, 0x74, 0x12, - 0x17, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x45, 0x76, 0x69, 0x63, - 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x70, 0x62, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x04, - 0x43, 0x61, 0x6c, 0x6c, 0x12, 0x16, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, - 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, - 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x70, 0x62, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x70, 0x62, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x17, 0x5a, 0x15, - 0x2e, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2f, 0x73, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x3b, 0x0a, 0x0f, 0x45, 0x76, 0x69, + 0x63, 0x74, 0x41, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, + 0x74, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x61, 0x62, + 0x6c, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, + 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x22, 0x67, 0x0a, 0x0b, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, + 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c, 0x0a, + 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x64, 0x75, 0x72, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x70, 0x72, 0x6f, 0x63, 0x65, 0x64, 0x75, 0x72, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x61, + 0x72, 0x67, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x22, + 0x33, 0x0a, 0x0c, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x23, 0x0a, 0x04, 0x69, 0x74, 0x65, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, + 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x49, 0x74, 0x65, 0x6d, 0x52, 0x04, + 0x69, 0x74, 0x65, 0x6d, 0x22, 0x07, 0x0a, 0x05, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x32, 0xd5, 0x02, + 0x0a, 0x07, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x36, 0x0a, 0x03, 0x47, 0x65, 0x74, + 0x12, 0x15, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x00, 0x12, 0x30, 0x0a, 0x03, 0x50, 0x75, 0x74, 0x12, 0x15, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x70, 0x62, 0x2e, 0x50, 0x75, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x00, 0x12, 0x34, 0x0a, 0x05, 0x45, 0x76, 0x69, 0x63, 0x74, 0x12, 0x17, 0x2e, 0x73, + 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x45, 0x76, 0x69, 0x63, 0x74, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, + 0x62, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x3a, 0x0a, 0x08, 0x45, 0x76, 0x69, + 0x63, 0x74, 0x41, 0x6c, 0x6c, 0x12, 0x1a, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, + 0x62, 0x2e, 0x45, 0x76, 0x69, 0x63, 0x74, 0x41, 0x6c, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x39, 0x0a, 0x04, 0x43, 0x61, 0x6c, 0x6c, 0x12, 0x16, 0x2e, + 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, + 0x62, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x12, 0x33, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, + 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x1a, 0x10, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x17, 0x5a, 0x15, 0x2e, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, + 0x63, 0x65, 0x70, 0x62, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x70, 0x62, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -529,16 +592,17 @@ func file_servicepb_service_proto_rawDescGZIP() []byte { return file_servicepb_service_proto_rawDescData } -var file_servicepb_service_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_servicepb_service_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_servicepb_service_proto_goTypes = []interface{}{ - (*Item)(nil), // 0: servicepb.Item - (*GetRequest)(nil), // 1: servicepb.GetRequest - (*GetResponse)(nil), // 2: servicepb.GetResponse - (*PutRequest)(nil), // 3: servicepb.PutRequest - (*EvictRequest)(nil), // 4: servicepb.EvictRequest - (*CallRequest)(nil), // 5: servicepb.CallRequest - (*CallResponse)(nil), // 6: servicepb.CallResponse - (*Empty)(nil), // 7: servicepb.Empty + (*Item)(nil), // 0: servicepb.Item + (*GetRequest)(nil), // 1: servicepb.GetRequest + (*GetResponse)(nil), // 2: servicepb.GetResponse + (*PutRequest)(nil), // 3: servicepb.PutRequest + (*EvictRequest)(nil), // 4: servicepb.EvictRequest + (*EvictAllRequest)(nil), // 5: servicepb.EvictAllRequest + (*CallRequest)(nil), // 6: servicepb.CallRequest + (*CallResponse)(nil), // 7: servicepb.CallResponse + (*Empty)(nil), // 8: servicepb.Empty } var file_servicepb_service_proto_depIdxs = []int32{ 0, // 0: servicepb.GetResponse.item:type_name -> servicepb.Item @@ -547,15 +611,17 @@ var file_servicepb_service_proto_depIdxs = []int32{ 1, // 3: servicepb.Service.Get:input_type -> servicepb.GetRequest 3, // 4: servicepb.Service.Put:input_type -> servicepb.PutRequest 4, // 5: servicepb.Service.Evict:input_type -> servicepb.EvictRequest - 5, // 6: servicepb.Service.Call:input_type -> servicepb.CallRequest - 7, // 7: servicepb.Service.HealthCheck:input_type -> servicepb.Empty - 2, // 8: servicepb.Service.Get:output_type -> servicepb.GetResponse - 7, // 9: servicepb.Service.Put:output_type -> servicepb.Empty - 7, // 10: servicepb.Service.Evict:output_type -> servicepb.Empty - 6, // 11: servicepb.Service.Call:output_type -> servicepb.CallResponse - 7, // 12: servicepb.Service.HealthCheck:output_type -> servicepb.Empty - 8, // [8:13] is the sub-list for method output_type - 3, // [3:8] is the sub-list for method input_type + 5, // 6: servicepb.Service.EvictAll:input_type -> servicepb.EvictAllRequest + 6, // 7: servicepb.Service.Call:input_type -> servicepb.CallRequest + 8, // 8: servicepb.Service.HealthCheck:input_type -> servicepb.Empty + 2, // 9: servicepb.Service.Get:output_type -> servicepb.GetResponse + 8, // 10: servicepb.Service.Put:output_type -> servicepb.Empty + 8, // 11: servicepb.Service.Evict:output_type -> servicepb.Empty + 8, // 12: servicepb.Service.EvictAll:output_type -> servicepb.Empty + 7, // 13: servicepb.Service.Call:output_type -> servicepb.CallResponse + 8, // 14: servicepb.Service.HealthCheck:output_type -> servicepb.Empty + 9, // [9:15] is the sub-list for method output_type + 3, // [3:9] is the sub-list for method input_type 3, // [3:3] is the sub-list for extension type_name 3, // [3:3] is the sub-list for extension extendee 0, // [0:3] is the sub-list for field type_name @@ -628,7 +694,7 @@ func file_servicepb_service_proto_init() { } } file_servicepb_service_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CallRequest); i { + switch v := v.(*EvictAllRequest); i { case 0: return &v.state case 1: @@ -640,7 +706,7 @@ func file_servicepb_service_proto_init() { } } file_servicepb_service_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*CallResponse); i { + switch v := v.(*CallRequest); i { case 0: return &v.state case 1: @@ -652,6 +718,18 @@ func file_servicepb_service_proto_init() { } } file_servicepb_service_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CallResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_servicepb_service_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Empty); i { case 0: return &v.state @@ -670,7 +748,7 @@ func file_servicepb_service_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_servicepb_service_proto_rawDesc, NumEnums: 0, - NumMessages: 8, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/servicepb/service.proto b/servicepb/service.proto index b439f41..eddffa3 100644 --- a/servicepb/service.proto +++ b/servicepb/service.proto @@ -8,6 +8,7 @@ service Service{ rpc Get(GetRequest) returns (GetResponse) {} rpc Put(PutRequest) returns (Empty) {} rpc Evict(EvictRequest) returns (Empty) {} + rpc EvictAll(EvictAllRequest) returns (Empty) {} rpc Call(CallRequest) returns (CallResponse) {} rpc HealthCheck(Empty) returns (Empty) {} } @@ -38,6 +39,11 @@ message EvictRequest{ string key = 2; } +message EvictAllRequest{ + string table = 1; + repeated string keys = 2; +} + message CallRequest{ string table = 1; string key = 2; diff --git a/servicepb/service_grpc.pb.go b/servicepb/service_grpc.pb.go index 70af797..1fa1b49 100644 --- a/servicepb/service_grpc.pb.go +++ b/servicepb/service_grpc.pb.go @@ -22,6 +22,7 @@ const ( Service_Get_FullMethodName = "/servicepb.Service/Get" Service_Put_FullMethodName = "/servicepb.Service/Put" Service_Evict_FullMethodName = "/servicepb.Service/Evict" + Service_EvictAll_FullMethodName = "/servicepb.Service/EvictAll" Service_Call_FullMethodName = "/servicepb.Service/Call" Service_HealthCheck_FullMethodName = "/servicepb.Service/HealthCheck" ) @@ -33,6 +34,7 @@ type ServiceClient interface { Get(ctx context.Context, in *GetRequest, opts ...grpc.CallOption) (*GetResponse, error) Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*Empty, error) Evict(ctx context.Context, in *EvictRequest, opts ...grpc.CallOption) (*Empty, error) + EvictAll(ctx context.Context, in *EvictAllRequest, opts ...grpc.CallOption) (*Empty, error) Call(ctx context.Context, in *CallRequest, opts ...grpc.CallOption) (*CallResponse, error) HealthCheck(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) } @@ -72,6 +74,15 @@ func (c *serviceClient) Evict(ctx context.Context, in *EvictRequest, opts ...grp return out, nil } +func (c *serviceClient) EvictAll(ctx context.Context, in *EvictAllRequest, opts ...grpc.CallOption) (*Empty, error) { + out := new(Empty) + err := c.cc.Invoke(ctx, Service_EvictAll_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *serviceClient) Call(ctx context.Context, in *CallRequest, opts ...grpc.CallOption) (*CallResponse, error) { out := new(CallResponse) err := c.cc.Invoke(ctx, Service_Call_FullMethodName, in, out, opts...) @@ -97,6 +108,7 @@ type ServiceServer interface { Get(context.Context, *GetRequest) (*GetResponse, error) Put(context.Context, *PutRequest) (*Empty, error) Evict(context.Context, *EvictRequest) (*Empty, error) + EvictAll(context.Context, *EvictAllRequest) (*Empty, error) Call(context.Context, *CallRequest) (*CallResponse, error) HealthCheck(context.Context, *Empty) (*Empty, error) mustEmbedUnimplementedServiceServer() @@ -115,6 +127,9 @@ func (UnimplementedServiceServer) Put(context.Context, *PutRequest) (*Empty, err func (UnimplementedServiceServer) Evict(context.Context, *EvictRequest) (*Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Evict not implemented") } +func (UnimplementedServiceServer) EvictAll(context.Context, *EvictAllRequest) (*Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method EvictAll not implemented") +} func (UnimplementedServiceServer) Call(context.Context, *CallRequest) (*CallResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Call not implemented") } @@ -188,6 +203,24 @@ func _Service_Evict_Handler(srv interface{}, ctx context.Context, dec func(inter return interceptor(ctx, in, info, handler) } +func _Service_EvictAll_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(EvictAllRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ServiceServer).EvictAll(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Service_EvictAll_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ServiceServer).EvictAll(ctx, req.(*EvictAllRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Service_Call_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(CallRequest) if err := dec(in); err != nil { @@ -243,6 +276,10 @@ var Service_ServiceDesc = grpc.ServiceDesc{ MethodName: "Evict", Handler: _Service_Evict_Handler, }, + { + MethodName: "EvictAll", + Handler: _Service_EvictAll_Handler, + }, { MethodName: "Call", Handler: _Service_Call_Handler, diff --git a/table.go b/table.go index 621466a..0145789 100644 --- a/table.go +++ b/table.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "github.com/MysteriousPotato/nitecache/inmem" + "strings" "time" "github.com/MysteriousPotato/nitecache/servicepb" @@ -19,6 +20,14 @@ var ( // Procedure defines the type used for registering RPCs through [TableBuilder.WithProcedure]. type Procedure[T any] func(ctx context.Context, v T, args []byte) (T, time.Duration, error) +type ( + BatchEvictionErrs []batchEvictionErr + batchEvictionErr struct { + keys []string + err error + } +) + type Table[T any] struct { name string store *inmem.Store[string, []byte] @@ -51,7 +60,7 @@ func (t *Table[T]) Get(ctx context.Context, key string) (T, error) { var item inmem.Item[[]byte] var hit bool if ownerID == t.cache.self.ID { - item, hit, err = t.getLocally(key) + item, hit, err = t.getLocally(ctx, key) } else { client, err := t.cache.getClient(ownerID) if err != nil { @@ -139,6 +148,74 @@ func (t *Table[T]) Evict(ctx context.Context, key string) error { return nil } +// EvictAll attempts to remove all entries from the Table for the given keys. +// +// Keys owned by the same client are batched together for efficiency. +// +// After the operation, a BatchEvictionErrs detailing which keys (if any) failed to be evicted can be retrieved when checking the returned error. +// Example: +// +// if errs, ok := err.(nitecache.BatchEvictionErrs); ok { +// // Note that keys that AffectedKeys may return keys that were actually evicted successfully. +// keysThatFailed := errs.AffectedKeys() +// } +func (t *Table[T]) EvictAll(ctx context.Context, keys []string) error { + if t.isZero() { + return ErrCacheDestroyed + } + + type clientKeys struct { + client *client + keys []string + } + + var selfKeys []string + clientKeysMap := map[string]*clientKeys{} + for _, key := range keys { + ownerID, err := t.cache.ring.GetOwner(key) + if err != nil { + return err + } + + if ownerID == t.cache.self.ID { + selfKeys = append(selfKeys, key) + continue + } + + if _, ok := clientKeysMap[ownerID]; !ok { + c, err := t.cache.getClient(ownerID) + if err != nil { + return err + } + + clientKeysMap[ownerID] = &clientKeys{ + client: c, + keys: []string{key}, + } + continue + } + + clientKeysMap[ownerID].keys = append(clientKeysMap[ownerID].keys, key) + } + + t.evictAllLocally(selfKeys) + + var errs BatchEvictionErrs + for _, c := range clientKeysMap { + if err := t.evictAllFromPeer(ctx, c.keys, c.client); err != nil { + errs = append(errs, batchEvictionErr{ + keys: c.keys, + err: err, + }) + } + } + + if errs != nil { + return errs + } + return nil +} + // Call calls an RPC previously registered through [TableBuilder.WithProcedure] on the owner node to update the value for the given key. // // Call acquires a lock exclusive to the given key until the RPC has finished executing. @@ -231,10 +308,10 @@ func (t *Table[T]) GetMetrics() (Metrics, error) { return t.metrics.getCopy(), nil } -func (t *Table[T]) getLocally(key string) (inmem.Item[[]byte], bool, error) { +func (t *Table[T]) getLocally(ctx context.Context, key string) (inmem.Item[[]byte], bool, error) { incGet(t.metrics, t.cache.metrics) sfRes, err, _ := t.getSF.Do(key, func() (any, error) { - item, hit, err := t.store.Get(key) + item, hit, err := t.store.Get(ctx, key) if !hit { incMiss(t.metrics, t.cache.metrics) } @@ -255,7 +332,7 @@ func (t *Table[T]) putLocally(key string, item inmem.Item[[]byte]) error { } func (t *Table[T]) evictLocally(key string) error { - incEvict(t.metrics, t.cache.metrics) + incEvict(1, t.metrics, t.cache.metrics) _, _, _ = t.evictSF.Do(key, func() (any, error) { t.store.Evict(key) return nil, nil @@ -263,6 +340,11 @@ func (t *Table[T]) evictLocally(key string) error { return nil } +func (t *Table[T]) evictAllLocally(keys []string) { + incEvict(int64(len(keys)), t.metrics, t.cache.metrics) + t.store.EvictAll(keys) +} + func (t *Table[T]) callLocally(ctx context.Context, key, procedure string, args []byte) (inmem.Item[[]byte], error) { incCalls(procedure, t.metrics, t.cache.metrics) @@ -362,6 +444,20 @@ func (t *Table[T]) evictFromPeer(ctx context.Context, key string, owner *client) return err } +func (t *Table[T]) evictAllFromPeer(ctx context.Context, keys []string, owner *client) error { + if _, err := owner.EvictAll(ctx, &servicepb.EvictAllRequest{ + Table: t.name, + Keys: keys, + }); err != nil { + return err + } + + if t.hotStore != nil { + t.hotStore.EvictAll(keys) + } + return nil +} + func (t *Table[T]) callFromPeer( ctx context.Context, key, procedure string, @@ -394,7 +490,7 @@ func (t *Table[T]) getFromHotCache(key string) (inmem.Item[[]byte], bool, error) if t.hotStore == nil { return inmem.Item[[]byte]{}, false, fmt.Errorf("hot cache not enabled") } - return t.hotStore.Get(key) + return t.hotStore.Get(context.Background(), key) } func (t *Table[T]) tearDown() { @@ -411,3 +507,22 @@ func (t *Table[T]) getEmptyValue() T { var v T return v } + +func (b BatchEvictionErrs) Error() string { + var errs []string + for _, err := range b { + errs = append(errs, fmt.Sprintf("failed to evict keys %v: %v", err.keys, err.err)) + } + return strings.Join(errs, ",") +} + +// AffectedKeys returns a list of keys owned by clients who returned an error. +// +// As a result, the list may contain keys that were successfully evicted. +func (b BatchEvictionErrs) AffectedKeys() []string { + var keys []string + for _, err := range b { + keys = append(keys, err.keys...) + } + return keys +}