-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventbus.go
114 lines (96 loc) · 2.58 KB
/
eventbus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package decs
import (
"time"
)
type ResultNotifier interface {
DispatchSuccess(event Event)
DispatchFailure(event Event)
NotifySuccess(eventName string, data interface{})
NotifyFailure(eventName string, data interface{})
}
type EventDispatcher interface {
Dispatch(event Event)
}
type conditionalCallback struct {
condition func(string) bool
trigger Trigger
callback func(interface{}, EventDispatcher)
}
// Implements both, ResultNotifier and EventDispatcher
type eventBus struct {
queue []*event
errQueue []*event
subs map[string]map[Trigger][]func(interface{}, EventDispatcher)
condSubs []*conditionalCallback
commandBus *CommandBus
causation Message
}
func newEventBus() *eventBus {
return &eventBus{
subs: make(map[string]map[Trigger][]func(interface{}, EventDispatcher)),
}
}
func (ebus *eventBus) DispatchSuccess(evt Event) {
event := evt.(*event)
event.trigger = AfterSuccess
ebus.queue = append(ebus.queue, event)
}
func (ebus *eventBus) DispatchFailure(evt Event) {
event := evt.(*event)
event.trigger = AfterFailure
ebus.errQueue = append(ebus.errQueue, event)
}
func (ebus *eventBus) NotifySuccess(eventName string, data interface{}) {
ebus.DispatchSuccess(NewEvent(eventName, data))
}
func (ebus *eventBus) NotifyFailure(eventName string, data interface{}) {
ebus.DispatchFailure(NewEvent(eventName, data))
}
// Dispatches all events synchronously
func (ebus *eventBus) dispatchAll() {
queue := make([]*event, len(ebus.queue))
copy(queue, ebus.queue)
ebus.queue = ebus.queue[:0]
errQueue := make([]*event, len(ebus.errQueue))
copy(errQueue, ebus.errQueue)
ebus.errQueue = ebus.errQueue[:0]
for _, evt := range queue {
ebus.Dispatch(evt)
}
for _, evt := range errQueue {
ebus.Dispatch(evt)
}
}
// Dispatches event synchronously
func (ebus *eventBus) Dispatch(evt Event) {
event, ok := evt.(*event)
if !ok {
event = castEvent(evt)
}
event.Id = ebus.commandBus.idGenerator.Create()
event.version = ebus.commandBus.versionTracker.next()
event.raised = time.Now()
if ebus.causation != nil {
event.aggregateID = ebus.causation.AggregateID()
event.correlationID = ebus.causation.CorrelationID()
event.causationID = ebus.causation.ID()
}
ebus.causation = event
m := ebus.subs[event.name]
if m != nil {
for _, callback := range m[event.trigger] {
callback(event.data, ebus)
}
for _, callback := range m[After] {
callback(event.data, ebus)
}
}
for _, cc := range ebus.condSubs {
if cc.condition(event.name) {
if cc.trigger != After && cc.trigger != event.trigger {
continue
}
cc.callback(event.data, ebus)
}
}
}