Skip to content

Commit

Permalink
fix: Use localStorage for memqueue tests instead of Redis for #162
Browse files Browse the repository at this point in the history
  • Loading branch information
shaunco committed Nov 21, 2021
1 parent 303ad56 commit b2ec9f5
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 89 deletions.
7 changes: 4 additions & 3 deletions memqueue/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ func BenchmarkCallAsync(b *testing.B) {
ctx := context.Background()

q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Name: "test",
Storage: taskq.NewLocalStorage(),
})
defer q.Close()

Expand All @@ -36,8 +37,8 @@ func BenchmarkNamedMessage(b *testing.B) {
ctx := context.Background()

q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Redis: redisRing(),
Name: "test",
Storage: taskq.NewLocalStorage(),
})
defer q.Close()

Expand Down
49 changes: 21 additions & 28 deletions memqueue/memqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"testing"
"time"

"github.com/go-redis/redis/v8"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
uuid "github.com/satori/go.uuid"
Expand Down Expand Up @@ -39,7 +38,8 @@ var _ = Describe("message with args", func() {

BeforeEach(func() {
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Name: "test",
Storage: taskq.NewLocalStorage(),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "test",
Expand Down Expand Up @@ -68,7 +68,8 @@ var _ = Describe("context.Context", func() {

BeforeEach(func() {
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Name: "test",
Storage: taskq.NewLocalStorage(),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "test",
Expand Down Expand Up @@ -97,7 +98,8 @@ var _ = Describe("message with invalid number of args", func() {

BeforeEach(func() {
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Name: "test",
Storage: taskq.NewLocalStorage(),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "test",
Expand Down Expand Up @@ -132,7 +134,8 @@ var _ = Describe("HandlerFunc", func() {

BeforeEach(func() {
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Name: "test",
Storage: taskq.NewLocalStorage(),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "test",
Expand Down Expand Up @@ -168,7 +171,8 @@ var _ = Describe("message retry timing", func() {
count = 0
ch = make(chan time.Time, 10)
q = memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Name: "test",
Storage: taskq.NewLocalStorage(),
})
task = taskq.RegisterTask(&taskq.TaskOptions{
Name: "test",
Expand Down Expand Up @@ -231,7 +235,8 @@ var _ = Describe("failing queue with error handler", func() {

BeforeEach(func() {
q = memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Name: "test",
Storage: taskq.NewLocalStorage(),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "test",
Expand Down Expand Up @@ -261,8 +266,8 @@ var _ = Describe("named message", func() {

BeforeEach(func() {
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Redis: redisRing(),
Name: "test",
Storage: taskq.NewLocalStorage(),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "test",
Expand Down Expand Up @@ -306,8 +311,8 @@ var _ = Describe("CallOnce", func() {
now = time.Now()

q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Redis: redisRing(),
Name: "test",
Storage: taskq.NewLocalStorage(),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "test",
Expand Down Expand Up @@ -349,7 +354,8 @@ var _ = Describe("stress testing", func() {

BeforeEach(func() {
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Name: "test",
Storage: taskq.NewLocalStorage(),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "test",
Expand Down Expand Up @@ -381,6 +387,7 @@ var _ = Describe("stress testing failing queue", func() {
q := memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
PauseErrorsThreshold: -1,
Storage: taskq.NewLocalStorage(),
})
task := taskq.RegisterTask(&taskq.TaskOptions{
Name: "test",
Expand Down Expand Up @@ -416,8 +423,8 @@ var _ = Describe("empty queue", func() {
BeforeEach(func() {
processed = 0
q = memqueue.NewQueue(&taskq.QueueOptions{
Name: "test",
Redis: redisRing(),
Name: "test",
Storage: taskq.NewLocalStorage(),
})
task = taskq.RegisterTask(&taskq.TaskOptions{
Name: "test",
Expand Down Expand Up @@ -511,17 +518,3 @@ func slot(period time.Duration) int64 {
}
return tm.Unix() / periodSec
}

var (
ringOnce sync.Once
ring *redis.Ring
)

func redisRing() *redis.Ring {
ringOnce.Do(func() {
ring = redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{"0": ":6379"},
})
})
return ring
}
68 changes: 68 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package taskq

import (
"context"
"sync"
"time"

"github.com/hashicorp/golang-lru/simplelru"
)

type Storage interface {
Exists(ctx context.Context, key string) bool
}

var _ Storage = (*localStorage)(nil)
var _ Storage = (*redisStorage)(nil)

// LOCAL

type localStorage struct {
mu sync.Mutex
cache *simplelru.LRU
}

func NewLocalStorage() Storage {
return &localStorage{}
}

func (s *localStorage) Exists(_ context.Context, key string) bool {
s.mu.Lock()
defer s.mu.Unlock()

if s.cache == nil {
var err error
s.cache, err = simplelru.NewLRU(128000, nil)
if err != nil {
panic(err)
}
}

_, ok := s.cache.Get(key)
if ok {
return true
}

s.cache.Add(key, nil)
return false
}

// REDIS

type redisStorage struct {
redis Redis
}

func newRedisStorage(redis Redis) Storage {
return &redisStorage{
redis: redis,
}
}

func (s *redisStorage) Exists(ctx context.Context, key string) bool {
val, err := s.redis.SetNX(ctx, key, "", 24*time.Hour).Result()
if err != nil {
return true
}
return !val
}
58 changes: 0 additions & 58 deletions taskq.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"context"
"log"
"os"
"sync"
"time"

"github.com/go-redis/redis/v8"
"github.com/hashicorp/golang-lru/simplelru"

"github.com/vmihailenco/taskq/v3/internal"
)
Expand Down Expand Up @@ -42,59 +40,3 @@ type Redis interface {
ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd
ScriptLoad(ctx context.Context, script string) *redis.StringCmd
}

type Storage interface {
Exists(ctx context.Context, key string) bool
}

type redisStorage struct {
redis Redis
}

var _ Storage = (*redisStorage)(nil)

func newRedisStorage(redis Redis) redisStorage {
return redisStorage{
redis: redis,
}
}

func (s redisStorage) Exists(ctx context.Context, key string) bool {
if localCacheExists(key) {
return true
}

val, err := s.redis.SetNX(ctx, key, "", 24*time.Hour).Result()
if err != nil {
return true
}
return !val
}

//------------------------------------------------------------------------------

var (
mu sync.Mutex
cache *simplelru.LRU
)

func localCacheExists(key string) bool {
mu.Lock()
defer mu.Unlock()

if cache == nil {
var err error
cache, err = simplelru.NewLRU(128000, nil)
if err != nil {
panic(err)
}
}

_, ok := cache.Get(key)
if ok {
return true
}

cache.Add(key, nil)
return false
}

0 comments on commit b2ec9f5

Please sign in to comment.