-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.go
96 lines (83 loc) · 2.08 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//
// @project GeniusRabbit 2018 - 2020
// @author Dmitry Ponomarev <demdxx@gmail.com> 2018 - 2020
//
package natstream
import (
"context"
nstream "github.com/nats-io/stan.go"
nc "github.com/geniusrabbit/notificationcenter/v2"
"github.com/geniusrabbit/notificationcenter/v2/encoder"
"github.com/geniusrabbit/notificationcenter/v2/internal/bytebuffer"
)
// Publisher for NATS queue
type Publisher struct {
// List of topics
topic string
// Connection to the nats server
conn nstream.Conn
// Message encoder
encoder encoder.Encoder
// errorHandler process errors
errorHandler nc.ErrorHandler
// panicHandler process panics
panicHandler nc.PanicHandler
}
// NewPublisher of the NATS stream server
func NewPublisher(options ...Option) (*Publisher, error) {
var opts Options
for _, opt := range options {
opt(&opts)
}
conn, err := nstream.Connect(opts.clusterID(), opts.clientID(), opts.NatsOptions...)
if err != nil || conn == nil {
return nil, err
}
return &Publisher{
topic: opts.randomTopic(),
conn: conn,
encoder: opts.encoder(),
errorHandler: opts.ErrorHandler,
panicHandler: opts.PanicHandler,
}, nil
}
// MustNewPublisher of the NATS stream server
func MustNewPublisher(options ...Option) *Publisher {
stream, err := NewPublisher(options...)
if err != nil || stream == nil {
panic(err)
}
return stream
}
// Publish one or more messages to the pub-service
func (s *Publisher) Publish(ctx context.Context, messages ...any) (err error) {
buff := bytebuffer.AcquireBuffer()
defer func() {
bytebuffer.ReleaseBuffer(buff)
if s.panicHandler == nil {
return
}
if rec := recover(); rec != nil {
s.panicHandler(nil, rec)
}
}()
for _, msg := range messages {
buff.Reset()
if err = s.encoder(msg, buff); err == nil {
err = s.conn.Publish(s.topic, buff.Bytes())
}
if err != nil {
if s.errorHandler == nil {
break
}
// Massage wasn't encoded so we can process only error
s.errorHandler(nil, err)
err = nil
}
}
return err
}
// Close nats-stream client
func (s *Publisher) Close() error {
return s.conn.Close()
}