Skip to content

Commit

Permalink
fix: refactor priority ordered task queue implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge committed Apr 8, 2024
1 parent eab1c1b commit 8b9280e
Show file tree
Hide file tree
Showing 8 changed files with 427 additions and 110 deletions.
42 changes: 42 additions & 0 deletions internal/queue/genheap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package queue

// genericHeap is a generic heap implementation that can be used with any type that satisfies the constraints.Ordered interface.
type genericHeap[T comparable[T]] []T

func (h genericHeap[T]) Len() int {
return len(h)
}

func (h genericHeap[T]) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

// Push pushes an element onto the heap. Do not call directly, use heap.Push
func (h *genericHeap[T]) Push(x interface{}) {
*h = append(*h, x.(T))
}

// Pop pops an element from the heap. Do not call directly, use heap.Pop
func (h *genericHeap[T]) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

func (h genericHeap[T]) Peek() T {
if len(h) == 0 {
var zero T
return zero
}
return h[0]
}

func (h genericHeap[T]) Less(i, j int) bool {
return h[i].Less(h[j])
}

type comparable[T any] interface {
Less(other T) bool
}
48 changes: 48 additions & 0 deletions internal/queue/genheap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package queue

import (
"container/heap"
"testing"
)

type val struct {
v int
}

func (v val) Less(other val) bool {
return v.v < other.v
}

func TestGenericHeapInit(t *testing.T) {
genHeap := genericHeap[val]{{v: 3}, {v: 2}, {v: 1}}
heap.Init(&genHeap)

if genHeap.Len() != 3 {
t.Errorf("expected length to be 3, got %d", genHeap.Len())
}

for _, i := range []int{1, 2, 3} {
v := heap.Pop(&genHeap).(val)
if v.v != i {
t.Errorf("expected %d, got %d", i, v.v)
}
}
}

func TestGenericHeapPushPop(t *testing.T) {
genHeap := genericHeap[val]{} // empty heap
heap.Push(&genHeap, val{v: 3})
heap.Push(&genHeap, val{v: 2})
heap.Push(&genHeap, val{v: 1})

if genHeap.Len() != 3 {
t.Errorf("expected length to be 3, got %d", genHeap.Len())
}

for _, i := range []int{1, 2, 3} {
v := heap.Pop(&genHeap).(val)
if v.v != i {
t.Errorf("expected %d, got %d", i, v.v)
}
}
}
78 changes: 78 additions & 0 deletions internal/queue/timepriorityqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package queue

import (
"container/heap"
"context"
"sync"
"time"
)

// TimePriorityQueue is a priority queue that dequeues elements at (or after) a specified time, and prioritizes elements based on a priority value. It is safe for concurrent use.
type TimePriorityQueue[T any] struct {
mu sync.Mutex
tqueue TimeQueue[priorityEntry[T]]
ready genericHeap[priorityEntry[T]]
}

func NewTimePriorityQueue[T any]() *TimePriorityQueue[T] {
return &TimePriorityQueue[T]{
tqueue: TimeQueue[priorityEntry[T]]{},
ready: genericHeap[priorityEntry[T]]{},
}
}

func (t *TimePriorityQueue[T]) Len() int {
t.mu.Lock()
defer t.mu.Unlock()
return t.tqueue.Len() + t.ready.Len()
}

func (t *TimePriorityQueue[T]) Peek() T {
t.mu.Lock()
defer t.mu.Unlock()

if t.ready.Len() > 0 {
return t.ready.Peek().v
}
return t.tqueue.Peek().v
}

func (t *TimePriorityQueue[T]) Enqueue(at time.Time, priority int, v T) {
t.mu.Lock()
t.tqueue.Enqueue(at, priorityEntry[T]{at, priority, v})
t.mu.Unlock()
}

func (t *TimePriorityQueue[T]) Dequeue(ctx context.Context) T {
t.mu.Lock()
for {
for t.tqueue.Len() > 0 {
thead := t.tqueue.Peek() // peek at the head of the time queue
if thead.at.Before(time.Now()) {
tqe := heap.Pop(&t.tqueue.heap).(timeQueueEntry[priorityEntry[T]])
heap.Push(&t.ready, tqe.v)
} else {
break
}
}
if t.ready.Len() > 0 {
defer t.mu.Unlock()
return heap.Pop(&t.ready).(priorityEntry[T]).v
}
t.mu.Unlock()
// wait for the next element to be ready
val := t.tqueue.Dequeue(ctx)
t.mu.Lock()
heap.Push(&t.ready, val)
}
}

type priorityEntry[T any] struct {
at time.Time
priority int
v T
}

func (t priorityEntry[T]) Less(other priorityEntry[T]) bool {
return t.priority > other.priority
}
53 changes: 53 additions & 0 deletions internal/queue/timepriorityqueue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package queue

import (
"context"
"testing"
"time"
)

// TestTPQEnqueue tests that enqueued elements are retruned highest priority first.
func TestTPQPriority(t *testing.T) {
tpq := NewTimePriorityQueue[int]()

now := time.Now().Add(-time.Second)
for i := 0; i < 100; i++ {
tpq.Enqueue(now, i, i)
}

if tpq.Len() != 100 {
t.Errorf("expected length to be 100, got %d", tpq.Len())
}

for i := 99; i >= 0; i-- {
v := tpq.Dequeue(context.Background())
if v != i {
t.Errorf("expected %d, got %d", i, v)
}
}
}

func TestTPQMixedReadinessStates(t *testing.T) {
tpq := NewTimePriorityQueue[int]()

now := time.Now()
for i := 0; i < 100; i++ {
tpq.Enqueue(now.Add(-100*time.Millisecond), i, i)
}
for i := 0; i < 100; i++ {
tpq.Enqueue(now.Add(100*time.Millisecond), i, i)
}

if tpq.Len() != 200 {
t.Errorf("expected length to be 100, got %d", tpq.Len())
}

for j := 0; j < 2; j++ {
for i := 99; i >= 0; i-- {
v := tpq.Dequeue(context.Background())
if v != i {
t.Errorf("pass %d expected %d, got %d", j, i, v)
}
}
}
}
119 changes: 119 additions & 0 deletions internal/queue/timequeue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package queue

import (
"container/heap"
"context"
"sync"
"time"
)

// TimeQueue is a priority queue that dequeues elements at (or after) a specified time. It is safe for concurrent use.
type TimeQueue[T any] struct {
heap genericHeap[timeQueueEntry[T]]

dequeueMu sync.Mutex
mu sync.Mutex
notify chan struct{}
}

func NewTimeQueue[T any]() *TimeQueue[T] {
return &TimeQueue[T]{
heap: genericHeap[timeQueueEntry[T]]{},
}
}

func (t *TimeQueue[T]) Enqueue(at time.Time, v T) {
t.mu.Lock()
heap.Push(&t.heap, timeQueueEntry[T]{at, v})
if t.notify != nil {
t.notify <- struct{}{}
}
t.mu.Unlock()
}

func (t *TimeQueue[T]) Len() int {
t.mu.Lock()
defer t.mu.Unlock()
return t.heap.Len()
}

func (t *TimeQueue[T]) Peek() T {
t.mu.Lock()
defer t.mu.Unlock()

if t.heap.Len() == 0 {
var zero T
return zero
}
return t.heap.Peek().v
}

func (t *TimeQueue[T]) Dequeue(ctx context.Context) T {
t.dequeueMu.Lock()
defer t.dequeueMu.Unlock()

t.mu.Lock()
t.notify = make(chan struct{}, 1)
defer func() {
t.mu.Lock()
close(t.notify)
t.notify = nil
t.mu.Unlock()
}()
t.mu.Unlock()

for {
t.mu.Lock()

var wait time.Duration
if t.heap.Len() == 0 {
wait = 3 * time.Minute
} else {
val := t.heap.Peek()
wait = time.Until(val.at)
if wait <= 0 {
t.mu.Unlock()
return heap.Pop(&t.heap).(timeQueueEntry[T]).v
}
}
t.mu.Unlock()

timer := time.NewTimer(wait)

select {
case <-timer.C:
t.mu.Lock()
val, ok := heap.Pop(&t.heap).(timeQueueEntry[T])
if !ok || val.at.After(time.Now()) {
t.mu.Unlock()
continue
}
t.mu.Unlock()
return val.v
case <-t.notify: // new task was added, loop again to ensure we have the earliest task.
if !timer.Stop() {
<-timer.C
}
continue
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
var zero T
return zero
}
}
}

type timeQueueEntry[T any] struct {
at time.Time
v T
}

func (t timeQueueEntry[T]) Less(other timeQueueEntry[T]) bool {
return t.at.Before(other.at)
}

func (t timeQueueEntry[T]) Eq(other timeQueueEntry[T]) bool {
return t.at.Equal(other.at)
}
Loading

0 comments on commit 8b9280e

Please sign in to comment.