-
Notifications
You must be signed in to change notification settings - Fork 0
/
hub.go
144 lines (127 loc) · 3.85 KB
/
hub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/*
* Copyright 2022, Cloudchacho
* All rights reserved.
*/
package taskhawk
import (
"context"
"fmt"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
)
// Hub is the central struct used to dispatch Taskhawk tasks / run consumer
type Hub struct {
tasks map[string]taskDef
config Config
publisher publisher
}
// Config used to configure Taskhawk Hub
type Config struct {
// Sync changes taskhawk dispatch to synchronous mode. This is similar
// to Celery's Eager mode and is helpful for integration testing
Sync bool
// Instrumenter for the consumer
Instrumenter Instrumenter
// GetLogger returns the logger object for given context
GetLogger GetLoggerFunc
}
// NewHub creates a hub
func NewHub(config Config, backend PublisherBackend) *Hub {
return &Hub{
publisher: publisher{backend: backend, serializer: jsonifier{}, instrumenter: config.Instrumenter},
config: config,
tasks: map[string]taskDef{},
}
}
// dispatch a task asynchronously with specified priority.
// The concrete type of input is expected to be same as the concrete type of NewInput()'s return value.
func (h *Hub) dispatch(ctx context.Context, taskName string, input any, priority Priority) error {
headers := make(map[string]string)
if headersCarrier, ok := input.(HeadersCarrier); ok {
for key, value := range headersCarrier.GetHeaders() {
headers[key] = value
}
}
m, err := newMessage(
taskName,
input,
headers,
uuid.NewV4().String(),
priority,
)
if err != nil {
return err
}
if h.config.Sync {
task := h.tasks[taskName]
return task.call(ctx, m, nil)
}
return h.publisher.Publish(ctx, m)
}
func (h *Hub) getTask(name string) (taskDef, error) {
task, ok := h.tasks[name]
if !ok {
return task, fmt.Errorf("%w: %s", ErrTaskNotFound, name)
}
return task, nil
}
// ListenForMessages starts a taskhawk listener for the provided message types
//
// Cancelable context may be used to cancel processing of messages
func (h *Hub) ListenForMessages(ctx context.Context, request ListenRequest, backend ConsumerBackend) error {
c := queueConsumer{
consumer{
backend: backend,
deserializer: jsonifier{},
getLogger: h.config.GetLogger,
instrumenter: h.config.Instrumenter,
hub: h,
},
}
c.initDefaults()
return c.ListenForMessages(ctx, request)
}
// RequeueDLQ re-queues everything in the taskhawk DLQ back into the taskhawk queue
func (h *Hub) RequeueDLQ(ctx context.Context, request ListenRequest, backend ConsumerBackend) error {
c := queueConsumer{
consumer{
backend: backend,
deserializer: jsonifier{},
getLogger: h.config.GetLogger,
instrumenter: h.config.Instrumenter,
hub: h,
},
}
c.initDefaults()
return c.RequeueDLQ(ctx, request)
}
// RegisterTask registers the task to the hub with priority 'default'.
// Priority may be overridden at dispatch time using `DispatchWithPriority`.
func RegisterTask[T any](h *Hub, taskName string, taskFn TaskFn[T]) (Task[T], error) {
return RegisterTaskWithPriority(h, taskName, taskFn, PriorityDefault)
}
// RegisterTaskWithPriority registers the task to the hub with specified priority.
// This will set the default priority, and may be overridden at dispatch time using `DispatchWithPriority`.
func RegisterTaskWithPriority[T any](h *Hub, taskName string, taskFn TaskFn[T], defaultPriority Priority) (Task[T], error) {
if taskName == "" {
return Task[T]{}, errors.New("task name not set")
}
if _, found := h.tasks[taskName]; found {
return Task[T]{}, errors.Errorf("task with name '%s' already registered", taskName)
}
taskFn = wrapTaskFn(taskFn)
h.tasks[taskName] = taskDef{
execute: func(ctx context.Context, data any) error {
return taskFn(ctx, data.(*T))
},
newInput: func() any {
return new(T)
},
}
task := Task[T]{
hub: h,
defaultPriority: defaultPriority,
taskName: taskName,
}
return task, nil
}