-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub_wrapper.go
76 lines (60 loc) · 2.01 KB
/
pubsub_wrapper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package nats
import (
"context"
"gofr.dev/pkg/gofr/datasource"
"gofr.dev/pkg/gofr/datasource/pubsub"
)
// PubSubWrapper adapts Client to pubsub.JetStreamClient.
type PubSubWrapper struct {
Client *Client
}
// Publish publishes a message to a topic.
func (w *PubSubWrapper) Publish(ctx context.Context, topic string, message []byte) error {
return w.Client.Publish(ctx, topic, message)
}
// Subscribe subscribes to a topic and returns a single message.
func (w *PubSubWrapper) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) {
return w.Client.Subscribe(ctx, topic)
}
// CreateTopic creates a new topic (stream) in NATS JetStream.
func (w *PubSubWrapper) CreateTopic(ctx context.Context, name string) error {
return w.Client.CreateTopic(ctx, name)
}
// DeleteTopic deletes a topic (stream) in NATS JetStream.
func (w *PubSubWrapper) DeleteTopic(ctx context.Context, name string) error {
return w.Client.DeleteTopic(ctx, name)
}
// Close closes the Client.
func (w *PubSubWrapper) Close() error {
ctx := context.Background()
return w.Client.Close(ctx)
}
// Health returns the health status of the Client.
func (w *PubSubWrapper) Health() datasource.Health {
return w.Client.Health()
}
// Connect establishes a connection to NATS.
func (w *PubSubWrapper) Connect(ctx context.Context) error {
if w.Client.connManager != nil && w.Client.connManager.Health().Status == datasource.StatusUp {
w.Client.logger.Log("NATS connection already established")
return nil
}
err := w.Client.Connect(ctx)
if err != nil {
w.Client.logger.Errorf("PubSubWrapper: Error connecting to NATS: %v", err)
return err
}
return nil
}
// UseLogger sets the logger for the NATS client.
func (w *PubSubWrapper) UseLogger(logger any) {
w.Client.UseLogger(logger)
}
// UseMetrics sets the metrics for the NATS client.
func (w *PubSubWrapper) UseMetrics(metrics any) {
w.Client.UseMetrics(metrics)
}
// UseTracer sets the tracer for the NATS client.
func (w *PubSubWrapper) UseTracer(tracer any) {
w.Client.UseTracer(tracer)
}