-
Notifications
You must be signed in to change notification settings - Fork 0
/
options.go
209 lines (174 loc) · 4.58 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package natstream
import (
"context"
"math/rand"
"net/url"
"strings"
nats "github.com/nats-io/nats.go"
nstream "github.com/nats-io/stan.go"
nc "github.com/geniusrabbit/notificationcenter/v2"
"github.com/geniusrabbit/notificationcenter/v2/encoder"
"github.com/geniusrabbit/notificationcenter/v2/internal/logger"
)
type loggerInterface interface {
Error(params ...any)
Debugf(msg string, params ...any)
}
// Options of the NATS wrapper
type Options struct {
Ctx context.Context
// Raw options from the "github.com/nats-io/stan.go" module
NatsOptions []nstream.Option
// NatsSubscriptions suboptions of subscriptions
NatsSubscriptions []nstream.Subscription
// Name of the subscription group
GroupName string
// Names of topics for subscribing or publishing
Topics []string
// ClusterID common for the group of services
ClusterID string
// Client ID unical for the service
ClientID string
// ErrorHandler of message processing
ErrorHandler nc.ErrorHandler
// PanicHandler process panic
PanicHandler nc.PanicHandler
// Message encoder interface
Encoder encoder.Encoder
// Logger of subscriber
Logger loggerInterface
}
func (opt *Options) encoder() encoder.Encoder {
if opt.Encoder == nil {
return encoder.JSON
}
return opt.Encoder
}
func (opt *Options) randomTopic() string {
if len(opt.Topics) == 0 {
return `default`
}
return opt.Topics[rand.Int()%len(opt.Topics)]
}
func (opt *Options) group() string {
if opt.GroupName == `` {
return `default`
}
return opt.GroupName
}
func (opt *Options) context() context.Context {
if opt.Ctx == nil {
return context.Background()
}
return opt.Ctx
}
func (opt *Options) clusterID() string {
if opt.ClusterID == `` {
return `default`
}
return opt.ClusterID
}
func (opt *Options) clientID() string {
if opt.ClientID == `` {
return `default`
}
return opt.ClientID
}
func (opt *Options) logger() loggerInterface {
if opt.Logger == nil {
return logger.DefaultLogger
}
return opt.Logger
}
// Option of the NATS subscriber or publisher
type Option func(opt *Options)
// WithNatsURL is an Option to set the URL the client should connect to.
// The url can contain username/password semantics. e.g. nats://derek:pass@localhost:4222
// Comma separated arrays are also supported, e.g. urlA, urlB.
func WithNatsURL(urlString string) Option {
return func(opt *Options) {
u, err := url.Parse(urlString)
if err != nil {
panic(err)
}
if len(u.Path) > 1 {
opt.GroupName = u.Path[1:]
}
topics := strings.Split(u.Query().Get(`topics`), `,`)
if len(topics) == 1 && topics[0] == `` {
topics = nil
}
u.Path = ``
u.RawQuery = ``
if len(topics) > 0 {
opt.Topics = topics
}
opt.NatsOptions = append(opt.NatsOptions, nstream.NatsURL(u.String()))
}
}
// WithNatsConn is an Option to set the underlying NATS connection to be used
// by a streaming connection object. When such option is set, closing the
// streaming connection does not close the provided NATS connection.
func WithNatsConn(nc *nats.Conn) Option {
return WithNatsOptions(nstream.NatsConn(nc))
}
// WithClusterID puts the cluster ID value
func WithClusterID(id string) Option {
return func(opt *Options) {
opt.ClusterID = id
}
}
// WithClientID puts the client ID value
func WithClientID(id string) Option {
return func(opt *Options) {
opt.ClientID = id
}
}
// WithGroupName puts the name group of the subsciber or publisher
func WithGroupName(name string) Option {
return func(opt *Options) {
opt.GroupName = name
}
}
// WithContext puts the client context value
func WithContext(ctx context.Context) Option {
return func(opt *Options) {
opt.Ctx = ctx
}
}
// WithTopics will set the list of topics for publishing or subscribing
func WithTopics(topics ...string) Option {
return func(opt *Options) {
opt.Topics = topics
}
}
// WithNatsOptions provides options of the NATS module
func WithNatsOptions(natsOpts ...nstream.Option) Option {
return func(opt *Options) {
opt.NatsOptions = append(opt.NatsOptions, natsOpts...)
}
}
// WithErrorHandler set handler of error processing
func WithErrorHandler(h nc.ErrorHandler) Option {
return func(options *Options) {
options.ErrorHandler = h
}
}
// WithPanicHandler set handler of panic processing
func WithPanicHandler(h nc.PanicHandler) Option {
return func(options *Options) {
options.PanicHandler = h
}
}
// WithEncoder set the message encoder
func WithEncoder(encoder encoder.Encoder) Option {
return func(opt *Options) {
opt.Encoder = encoder
}
}
// WithLogger provides logging interface
func WithLogger(logger loggerInterface) Option {
return func(opt *Options) {
opt.Logger = logger
}
}