Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Queue creation behind new func that evaluates queue type #4252

Merged
merged 7 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions cmd/server/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,11 @@ func checkSqliteFileExist(path string) error {
return err
}

func setupQueue(ctx context.Context, s store.Store) queue.Queue {
return queue.WithTaskStore(ctx, queue.New(ctx), s)
func setupQueue(ctx context.Context, s store.Store) (queue.Queue, error) {
return queue.New(ctx, queue.Config{
Backend: queue.TypeMemory,
Store: s,
})
}

func setupMembershipService(_ context.Context, _store store.Store) cache.MembershipService {
Expand Down Expand Up @@ -143,18 +146,19 @@ func setupJWTSecret(_store store.Store) (string, error) {
return jwtSecret, nil
}

func setupEvilGlobals(ctx context.Context, c *cli.Command, s store.Store) error {
func setupEvilGlobals(ctx context.Context, c *cli.Command, s store.Store) (err error) {
// services
server.Config.Services.Queue = setupQueue(ctx, s)
server.Config.Services.Logs = logging.New()
server.Config.Services.Pubsub = pubsub.New()
server.Config.Services.Membership = setupMembershipService(ctx, s)
serviceManager, err := services.NewManager(c, s, setup.Forge)
server.Config.Services.Queue, err = setupQueue(ctx, s)
if err != nil {
return fmt.Errorf("could not setup queue: %w", err)
}
server.Config.Services.Manager, err = services.NewManager(c, s, setup.Forge)
if err != nil {
return fmt.Errorf("could not setup service manager: %w", err)
}
server.Config.Services.Manager = serviceManager

server.Config.Services.LogStore, err = setupLogStore(c, s)
if err != nil {
return fmt.Errorf("could not setup log store: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions server/queue/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ const processTimeInterval = 100 * time.Millisecond

var ErrWorkerKicked = fmt.Errorf("worker was kicked")

// New returns a new fifo queue.
func New(ctx context.Context) Queue {
// NewMemoryQueue returns a new fifo queue.
func NewMemoryQueue(ctx context.Context) Queue {
q := &fifo{
ctx: ctx,
workers: map[*worker]struct{}{},
Expand Down
28 changes: 14 additions & 14 deletions server/queue/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestFifo(t *testing.T) {
want := &model.Task{ID: "1"}
ctx := context.Background()

q := New(ctx)
q := NewMemoryQueue(ctx)
assert.NoError(t, q.Push(ctx, want))
info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue")
Expand All @@ -55,7 +55,7 @@ func TestFifoExpire(t *testing.T) {
want := &model.Task{ID: "1"}
ctx, cancel := context.WithCancelCause(context.Background())

q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
q.extension = 0
assert.NoError(t, q.Push(ctx, want))
info := q.Info(ctx)
Expand All @@ -78,7 +78,7 @@ func TestFifoWait(t *testing.T) {
want := &model.Task{ID: "1"}
ctx := context.Background()

q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.Push(ctx, want))

got, err := q.Poll(ctx, 1, filterFnTrue)
Expand All @@ -101,7 +101,7 @@ func TestFifoEvict(t *testing.T) {
t1 := &model.Task{ID: "1"}
ctx := context.Background()

q := New(ctx)
q := NewMemoryQueue(ctx)
assert.NoError(t, q.Push(ctx, t1))
info := q.Info(ctx)
assert.Len(t, info.Pending, 1, "expect task in pending queue")
Expand All @@ -125,7 +125,7 @@ func TestFifoDependencies(t *testing.T) {
DepStatus: make(map[string]model.StatusValue),
}

q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task1}))

got, err := q.Poll(ctx, 1, filterFnTrue)
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestFifoErrors(t *testing.T) {
RunOn: []string{"success", "failure"},
}

q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))

got, err := q.Poll(ctx, 1, filterFnTrue)
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestFifoErrors2(t *testing.T) {
DepStatus: make(map[string]model.StatusValue),
}

q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestFifoErrorsMultiThread(t *testing.T) {
DepStatus: make(map[string]model.StatusValue),
}

q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))

obtainedWorkCh := make(chan *model.Task)
Expand Down Expand Up @@ -314,7 +314,7 @@ func TestFifoTransitiveErrors(t *testing.T) {
DepStatus: make(map[string]model.StatusValue),
}

q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))

got, err := q.Poll(ctx, 1, filterFnTrue)
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestFifoCancel(t *testing.T) {
RunOn: []string{"success", "failure"},
}

q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))

_, _ = q.Poll(ctx, 1, filterFnTrue)
Expand All @@ -371,7 +371,7 @@ func TestFifoPause(t *testing.T) {
ID: "1",
}

q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestFifoPauseResume(t *testing.T) {
ID: "1",
}

q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
q.Pause()
assert.NoError(t, q.Push(ctx, task1))
q.Resume()
Expand All @@ -429,7 +429,7 @@ func TestWaitingVsPending(t *testing.T) {
RunOn: []string{"success", "failure"},
}

q, _ := New(ctx).(*fifo)
q, _ := NewMemoryQueue(ctx).(*fifo)
assert.NoError(t, q.PushAtOnce(ctx, []*model.Task{task2, task3, task1}))

got, _ := q.Poll(ctx, 1, filterFnTrue)
Expand Down Expand Up @@ -519,7 +519,7 @@ func TestShouldRun(t *testing.T) {

func TestFifoWithScoring(t *testing.T) {
ctx := context.Background()
q := New(ctx)
q := NewMemoryQueue(ctx)

// Create tasks with different labels
tasks := []*model.Task{
Expand Down
32 changes: 32 additions & 0 deletions server/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package queue
import (
"context"
"errors"
"fmt"
"strings"

"go.woodpecker-ci.org/woodpecker/v2/server/model"
"go.woodpecker-ci.org/woodpecker/v2/server/store"
)

var (
Expand Down Expand Up @@ -115,3 +117,33 @@ type Queue interface {
// KickAgentWorkers kicks all workers for a given agent.
KickAgentWorkers(agentID int64)
}

// Config holds the configuration for the queue.
type Config struct {
Backend Type
Store store.Store
}

// Queue type
type Type string

const (
TypeMemory Type = "memory"
)

// New creates a new queue based on the provided configuration.
func New(ctx context.Context, config Config) (Queue, error) {
var q Queue

switch config.Backend {
case TypeMemory:
q := NewMemoryQueue(ctx)
if config.Store != nil {
q = WithTaskStore(ctx, q, config.Store)
}
default:
return nil, fmt.Errorf("unsupported queue backend: %s", config.Backend)
}

return q, nil
}