-
Notifications
You must be signed in to change notification settings - Fork 9
/
structuredhub.go
213 lines (191 loc) · 6.92 KB
/
structuredhub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// Copyright 2016 Canonical Ltd.
// Licensed under the LGPLv3, see LICENCE file for details.
package pubsub
import (
"encoding/json"
"reflect"
"github.com/juju/clock"
"github.com/juju/errors"
)
// StructuredHub allows the hander functions to accept either structures
// or map[string]interface{}. The published structure does not need to match
// the structures of the subscribers. The structures are marshalled using the
// Marshaller defined in the StructuredHubConfig. If one is not specified, the
// marshalling is handled by the standard json library.
type StructuredHub struct {
hub SimpleHub
marshaller Marshaller
annotations map[string]interface{}
postProcess func(map[string]interface{}) (map[string]interface{}, error)
}
// Marshaller defines the Marshal and Unmarshal methods used to serialize and
// deserialize the structures used in Publish and Subscription handlers of the
// structured hub.
type Marshaller interface {
// Marshal converts the argument into a byte streem that it can then Unmarshal.
Marshal(interface{}) ([]byte, error)
// Unmarshal attempts to convert the byte stream into type passed in as the
// second arg.
Unmarshal([]byte, interface{}) error
}
// StructuredHubConfig is the argument struct for NewStructuredHub.
type StructuredHubConfig struct {
// Logger allows specifying a logging implementation for debug
// and trace level messages emitted from the hub.
Logger Logger
// Metrics allows the passing in of a metrics collector.
Metrics Metrics
// Clock defines a clock to help improve test coverage.
Clock clock.Clock
// Marshaller defines how the structured hub will convert from structures to
// a map[string]interface{} and back. If this is not specified, the
// `JSONMarshaller` is used.
Marshaller Marshaller
// Annotations are added to each message that is published if and only if
// the values are not already set.
Annotations map[string]interface{}
// PostProcess allows the caller to modify the resulting
// map[string]interface{}. This is useful when a dynamic value, such as a
// timestamp is added to the map, or when other type conversions are
// necessary across all the values in the map.
PostProcess func(map[string]interface{}) (map[string]interface{}, error)
}
// JSONMarshaller simply wraps the json.Marshal and json.Unmarshal calls for the
// Marshaller interface.
var JSONMarshaller = &jsonMarshaller{}
type jsonMarshaller struct{}
func (*jsonMarshaller) Marshal(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (*jsonMarshaller) Unmarshal(data []byte, v interface{}) error {
return json.Unmarshal(data, v)
}
// NewStructuredHub returns a new StructuredHub instance.
func NewStructuredHub(config *StructuredHubConfig) *StructuredHub {
if config == nil {
config = new(StructuredHubConfig)
}
logger := config.Logger
if logger == nil {
logger = noOpLogger{}
}
metrics := config.Metrics
if metrics == nil {
metrics = noOpMetrics{}
}
time := config.Clock
if time == nil {
time = clock.WallClock
}
if config.Marshaller == nil {
config.Marshaller = JSONMarshaller
}
return &StructuredHub{
hub: SimpleHub{
logger: logger,
metrics: metrics,
clock: time,
},
marshaller: config.Marshaller,
annotations: config.Annotations,
postProcess: config.PostProcess,
}
}
// Publish will notifiy all the subscribers that are interested by calling
// their handler function.
//
// The data is serialized out using the marshaller and then back into a
// map[string]interface{}. If there is an error marshalling the data, Publish
// fails with an error. The resulting map is then updated with any
// annotations provided. The annotated values are only set if the specified
// field is missing or empty. After the annotations are set, the PostProcess
// function is called if one was specified. The resulting map is then passed
// to each of the subscribers.
//
// Subscribers are notified in parallel, and that no
// modification should be done to the data or data races will occur.
//
// The return function when called blocks and waits for all callbacks to be
// completed.
func (h *StructuredHub) Publish(topic string, data interface{}) (func(), error) {
if data == nil {
data = make(map[string]interface{})
}
asMap, err := h.toStringMap(data)
if err != nil {
return nil, errors.Trace(err)
}
for key, defaultValue := range h.annotations {
if value, exists := asMap[key]; !exists || value == reflect.Zero(reflect.TypeOf(value)).Interface() {
asMap[key] = defaultValue
}
}
if h.postProcess != nil {
asMap, err = h.postProcess(asMap)
if err != nil {
return nil, errors.Trace(err)
}
}
h.hub.logger.Tracef("publish %q: %#v", topic, asMap)
return h.hub.Publish(topic, asMap), nil
}
func (h *StructuredHub) toStringMap(data interface{}) (map[string]interface{}, error) {
var result map[string]interface{}
resultType := reflect.TypeOf(result)
dataType := reflect.TypeOf(data)
if dataType.AssignableTo(resultType) {
cast, ok := data.(map[string]interface{})
if !ok {
return nil, errors.Errorf("%T assignable to map[string]interface{} but isn't one?", data)
}
return cast, nil
}
bytes, err := h.marshaller.Marshal(data)
if err != nil {
return nil, errors.Annotate(err, "marshalling")
}
err = h.marshaller.Unmarshal(bytes, &result)
if err != nil {
return nil, errors.Annotate(err, "unmarshalling")
}
return result, nil
}
// Subscribe takes a topic with a handler function. If the topic is the same
// as the published topic, the handler function is called with the published
// topic and the associated data.
//
// The function return value is a function that will unsubscribe the caller
// from the hub, for this subscription.
//
// The hander function must have the signature:
// `func(string, map[string]interface{})`
// or
// `func(string, SomeStruct, error)`
// where `SomeStruct` is any structure.
//
// If the hander function does not match one of these signatures, the Subscribe
// function returns an error.
//
// The map[string]interface{} from the
// Publish call is unmarshalled into the `SomeStruct` structure. If there is
// an error unmarshalling the handler is called with a zerod structure and an
// error with the marshalling error.
func (h *StructuredHub) Subscribe(topic string, handler interface{}) (func(), error) {
return h.SubscribeMatch(equalTopic(topic), handler)
}
// SubscribeMatch takes a function that determins whether the topic matches,
// and a handler function. If the matcher matches the published topic, the
// handler function is called with the published topic and the associated
// data.
//
// All other aspects of the function are the same as the `Subscribe` method.
func (h *StructuredHub) SubscribeMatch(matcher func(string) bool, handler interface{}) (func(), error) {
if matcher == nil {
return nil, errors.NotValidf("missing matcher")
}
callback, err := newStructuredCallback(h.hub.logger, h.marshaller, handler)
if err != nil {
return nil, errors.Trace(err)
}
return h.hub.SubscribeMatch(matcher, callback.handler), nil
}