diff --git a/memqueue/bench_test.go b/memqueue/bench_test.go index 55b9a51..193e895 100644 --- a/memqueue/bench_test.go +++ b/memqueue/bench_test.go @@ -13,7 +13,8 @@ func BenchmarkCallAsync(b *testing.B) { ctx := context.Background() q := memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", + Name: "test", + Storage: taskq.NewLocalStorage(), }) defer q.Close() @@ -36,8 +37,8 @@ func BenchmarkNamedMessage(b *testing.B) { ctx := context.Background() q := memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", - Redis: redisRing(), + Name: "test", + Storage: taskq.NewLocalStorage(), }) defer q.Close() diff --git a/memqueue/memqueue_test.go b/memqueue/memqueue_test.go index f4f4d46..5b3876e 100644 --- a/memqueue/memqueue_test.go +++ b/memqueue/memqueue_test.go @@ -11,7 +11,6 @@ import ( "testing" "time" - "github.com/go-redis/redis/v8" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" uuid "github.com/satori/go.uuid" @@ -39,7 +38,8 @@ var _ = Describe("message with args", func() { BeforeEach(func() { q := memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", + Name: "test", + Storage: taskq.NewLocalStorage(), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "test", @@ -68,7 +68,8 @@ var _ = Describe("context.Context", func() { BeforeEach(func() { q := memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", + Name: "test", + Storage: taskq.NewLocalStorage(), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "test", @@ -97,7 +98,8 @@ var _ = Describe("message with invalid number of args", func() { BeforeEach(func() { q := memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", + Name: "test", + Storage: taskq.NewLocalStorage(), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "test", @@ -132,7 +134,8 @@ var _ = Describe("HandlerFunc", func() { BeforeEach(func() { q := memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", + Name: "test", + Storage: taskq.NewLocalStorage(), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "test", @@ -168,7 +171,8 @@ var _ = Describe("message retry timing", func() { count = 0 ch = make(chan time.Time, 10) q = memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", + Name: "test", + Storage: taskq.NewLocalStorage(), }) task = taskq.RegisterTask(&taskq.TaskOptions{ Name: "test", @@ -231,7 +235,8 @@ var _ = Describe("failing queue with error handler", func() { BeforeEach(func() { q = memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", + Name: "test", + Storage: taskq.NewLocalStorage(), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "test", @@ -261,8 +266,8 @@ var _ = Describe("named message", func() { BeforeEach(func() { q := memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", - Redis: redisRing(), + Name: "test", + Storage: taskq.NewLocalStorage(), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "test", @@ -306,8 +311,8 @@ var _ = Describe("CallOnce", func() { now = time.Now() q := memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", - Redis: redisRing(), + Name: "test", + Storage: taskq.NewLocalStorage(), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "test", @@ -349,7 +354,8 @@ var _ = Describe("stress testing", func() { BeforeEach(func() { q := memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", + Name: "test", + Storage: taskq.NewLocalStorage(), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "test", @@ -381,6 +387,7 @@ var _ = Describe("stress testing failing queue", func() { q := memqueue.NewQueue(&taskq.QueueOptions{ Name: "test", PauseErrorsThreshold: -1, + Storage: taskq.NewLocalStorage(), }) task := taskq.RegisterTask(&taskq.TaskOptions{ Name: "test", @@ -416,8 +423,8 @@ var _ = Describe("empty queue", func() { BeforeEach(func() { processed = 0 q = memqueue.NewQueue(&taskq.QueueOptions{ - Name: "test", - Redis: redisRing(), + Name: "test", + Storage: taskq.NewLocalStorage(), }) task = taskq.RegisterTask(&taskq.TaskOptions{ Name: "test", @@ -511,17 +518,3 @@ func slot(period time.Duration) int64 { } return tm.Unix() / periodSec } - -var ( - ringOnce sync.Once - ring *redis.Ring -) - -func redisRing() *redis.Ring { - ringOnce.Do(func() { - ring = redis.NewRing(&redis.RingOptions{ - Addrs: map[string]string{"0": ":6379"}, - }) - }) - return ring -} diff --git a/storage.go b/storage.go new file mode 100644 index 0000000..bd3ae92 --- /dev/null +++ b/storage.go @@ -0,0 +1,68 @@ +package taskq + +import ( + "context" + "sync" + "time" + + "github.com/hashicorp/golang-lru/simplelru" +) + +type Storage interface { + Exists(ctx context.Context, key string) bool +} + +var _ Storage = (*localStorage)(nil) +var _ Storage = (*redisStorage)(nil) + +// LOCAL + +type localStorage struct { + mu sync.Mutex + cache *simplelru.LRU +} + +func NewLocalStorage() Storage { + return &localStorage{} +} + +func (s *localStorage) Exists(_ context.Context, key string) bool { + s.mu.Lock() + defer s.mu.Unlock() + + if s.cache == nil { + var err error + s.cache, err = simplelru.NewLRU(128000, nil) + if err != nil { + panic(err) + } + } + + _, ok := s.cache.Get(key) + if ok { + return true + } + + s.cache.Add(key, nil) + return false +} + +// REDIS + +type redisStorage struct { + redis Redis +} + +func newRedisStorage(redis Redis) Storage { + return &redisStorage{ + redis: redis, + } +} + +func (s *redisStorage) Exists(ctx context.Context, key string) bool { + val, err := s.redis.SetNX(ctx, key, "", 24*time.Hour).Result() + if err != nil { + return true + } + return !val +} diff --git a/taskq.go b/taskq.go index 410d7cc..334b1a7 100644 --- a/taskq.go +++ b/taskq.go @@ -4,11 +4,9 @@ import ( "context" "log" "os" - "sync" "time" "github.com/go-redis/redis/v8" - "github.com/hashicorp/golang-lru/simplelru" "github.com/vmihailenco/taskq/v3/internal" ) @@ -42,59 +40,3 @@ type Redis interface { ScriptExists(ctx context.Context, scripts ...string) *redis.BoolSliceCmd ScriptLoad(ctx context.Context, script string) *redis.StringCmd } - -type Storage interface { - Exists(ctx context.Context, key string) bool -} - -type redisStorage struct { - redis Redis -} - -var _ Storage = (*redisStorage)(nil) - -func newRedisStorage(redis Redis) redisStorage { - return redisStorage{ - redis: redis, - } -} - -func (s redisStorage) Exists(ctx context.Context, key string) bool { - if localCacheExists(key) { - return true - } - - val, err := s.redis.SetNX(ctx, key, "", 24*time.Hour).Result() - if err != nil { - return true - } - return !val -} - -//------------------------------------------------------------------------------ - -var ( - mu sync.Mutex - cache *simplelru.LRU -) - -func localCacheExists(key string) bool { - mu.Lock() - defer mu.Unlock() - - if cache == nil { - var err error - cache, err = simplelru.NewLRU(128000, nil) - if err != nil { - panic(err) - } - } - - _, ok := cache.Get(key) - if ok { - return true - } - - cache.Add(key, nil) - return false -}