Skip to content

Commit

Permalink
Merge pull request #5 from MysteriousPotato/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
MysteriousPotato authored Apr 22, 2023
2 parents adb1a95 + b56cc5b commit 1773ecc
Show file tree
Hide file tree
Showing 9 changed files with 246 additions and 177 deletions.
27 changes: 27 additions & 0 deletions .github/workflows/go.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
name: Pipeline

on:
push:
branches:
- main
pull_request:
branches:
- main

jobs:

test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3

- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: "1.20"

- name: Test
run: |
go get ./...
go vet ./...
go test -v -race ./...
81 changes: 54 additions & 27 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,21 @@ var (
ErrMissingMembers = errors.New("peers must contain at least one member")
)

type Cache struct {
ring *hashring
selfID string
clients clients
mu *sync.RWMutex
tables map[string]itable
metrics *Metrics
opts CacheOpts
closeCh chan bool
}
type CacheOpt func(c *Cache)

type CacheOpts struct {
//Defaults to 32
VirtualNodes int
type Cache struct {
ring *hashring
selfID string
clients clients
mu *sync.RWMutex
tables map[string]itable
metrics *Metrics
closeCh chan bool
virtualNodes int
//Defaults to FNV-1
HashFunc HashFunc
hashFunc HashFunc
//Defaults to 2 seconds
Timeout time.Duration
timeout time.Duration
//opt to skip server start
testMode bool
}
Expand All @@ -50,15 +47,20 @@ type itable interface {
// NewCache Creates a new [Cache] instance
// This should only be called once for a same set of peers, so that gRPC connections can be reused
// Create a new [Table] instead if you need to store different values
func NewCache(selfID string, peers []Member, opts CacheOpts) (*Cache, error) {
func NewCache(selfID string, peers []Member, opts ...CacheOpt) (*Cache, error) {
c := &Cache{
selfID: selfID,
tables: make(map[string]itable),
mu: &sync.RWMutex{},
clients: clients{},
metrics: newMetrics(),
opts: opts,
closeCh: make(chan bool),
selfID: selfID,
tables: make(map[string]itable),
mu: &sync.RWMutex{},
clients: clients{},
metrics: newMetrics(),
closeCh: make(chan bool),
virtualNodes: 32,
hashFunc: defaultHashFunc,
}

for _, opt := range opts {
opt(c)
}

var self Member
Expand All @@ -72,7 +74,7 @@ func NewCache(selfID string, peers []Member, opts CacheOpts) (*Cache, error) {
return nil, ErrMissingSelfInPeers
}

if !opts.testMode {
if !c.testMode {
server, start, err := newServer(self.Addr, c)
if err != nil {
return nil, fmt.Errorf("unable to create cache server: %w", err)
Expand Down Expand Up @@ -100,6 +102,31 @@ func NewCache(selfID string, peers []Member, opts CacheOpts) (*Cache, error) {
return c, nil
}

// VirtualNodeOpt sets the number of points on the hashring per node
func VirtualNodeOpt(nodes int) func(c *Cache) {
return func(c *Cache) {
c.virtualNodes = nodes
}
}

// TimeoutOpt sets the timeout for grpc client timeout
func TimeoutOpt(timeout time.Duration) func(c *Cache) {
return func(c *Cache) {
c.timeout = timeout
}
}

// HashFuncOpt sets the hash function used to determine hashring keys
func HashFuncOpt(hashFunc HashFunc) func(c *Cache) {
return func(c *Cache) {
c.hashFunc = hashFunc
}
}

func testModeOpt(c *Cache) {
c.testMode = true
}

// GetMetrics Can safely be called from a goroutine, returns a copy of the current cache Metrics.
// For Metrics specific to a [Table], refer to [Table.GetMetrics]
func (c *Cache) GetMetrics() Metrics {
Expand Down Expand Up @@ -144,8 +171,8 @@ func (c *Cache) SetPeers(peers []Member) error {
c.ring, err = newRing(
ringCfg{
Members: members,
VirtualNodes: c.opts.VirtualNodes,
HashFunc: c.opts.HashFunc,
VirtualNodes: c.virtualNodes,
HashFunc: c.hashFunc,
},
)
if err != nil {
Expand All @@ -157,7 +184,7 @@ func (c *Cache) SetPeers(peers []Member) error {
}
}

if err := c.clients.set(peers, c.opts.Timeout); err != nil {
if err := c.clients.set(peers, c.timeout); err != nil {
return err
}

Expand Down
44 changes: 23 additions & 21 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestCache_SetPeers(t *testing.T) {
c, err := NewCache(
"potato",
[]Member{{ID: "potato", Addr: test.GetUniqueAddr()}},
CacheOpts{testMode: true},
testModeOpt,
)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestSingleNodeCacheTable(t *testing.T) {
Addr: test.GetUniqueAddr(),
}

c, err := NewCache(self.ID, []Member{self}, CacheOpts{})
c, err := NewCache(self.ID, []Member{self})
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -161,25 +161,27 @@ func TestMultiNodeCacheTable(t *testing.T) {
caches := make([]*Cache, len(members))
tables := make([]*Table[string], len(members))
for i, m := range members {
c, err := NewCache(m.ID, members, CacheOpts{})
if err != nil {
t.Error(err)
}
defer test.TearDown(c)

caches[i] = c
tables[i] = NewTable[string]("test").
WithGetter(
func(key string) (string, time.Duration, error) {
return "empty", time.Hour, nil
},
).
WithFunction(
"execute", func(s string, args []byte) (string, time.Duration, error) {
return "execute", 0, nil
},
).
Build(c)
func() {
c, err := NewCache(m.ID, members)
if err != nil {
t.Error(err)
}
defer test.TearDown(c)

caches[i] = c
tables[i] = NewTable[string]("test").
WithGetter(
func(key string) (string, time.Duration, error) {
return "empty", time.Hour, nil
},
).
WithFunction(
"execute", func(s string, args []byte) (string, time.Duration, error) {
return "execute", 0, nil
},
).
Build(c)
}()
}

ctx := context.Background()
Expand Down
99 changes: 0 additions & 99 deletions exemple/exemple.go

This file was deleted.

6 changes: 0 additions & 6 deletions hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ func newRing(cfg ringCfg) (*hashring, error) {
members: cfg.Members,
virtualNodes: cfg.VirtualNodes,
}
if r.hashFunc == nil {
r.hashFunc = defaultHashFunc
}
if r.virtualNodes <= 0 {
r.virtualNodes = 32
}

if err := r.populate(); err != nil {
return nil, fmt.Errorf("unable to populate hasring: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions hashring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestRing_NewOK(t *testing.T) {
cfg := ringCfg{
Members: mTest,
VirtualNodes: 10,
HashFunc: defaultHashFunc,
}

ring, err := newRing(cfg)
Expand All @@ -43,6 +44,7 @@ func TestRing_setMembers(t *testing.T) {
cfg := ringCfg{
Members: mTest,
VirtualNodes: 10,
HashFunc: defaultHashFunc,
}

ring, err := newRing(cfg)
Expand Down
Binary file added images/logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestMetrics(t *testing.T) {
c, err := NewCache("1", []Member{{ID: "1", Addr: test.GetUniqueAddr()}}, CacheOpts{})
c, err := NewCache("1", []Member{{ID: "1", Addr: test.GetUniqueAddr()}})
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit 1773ecc

Please sign in to comment.