Skip to content

Commit

Permalink
working, need to fix tests still
Browse files Browse the repository at this point in the history
  • Loading branch information
mfreeman451 committed Oct 9, 2024
1 parent 806b944 commit b0545cd
Show file tree
Hide file tree
Showing 16 changed files with 52 additions and 642 deletions.
13 changes: 13 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,19 @@ func (c *Client) Publish(ctx context.Context, subject string, message []byte) er

// Subscribe subscribes to a topic and returns a single message.
func (c *Client) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) {

if c.subManager == nil {
return nil, errors.New("subscription manager is not initialized")
}

if c.connManager == nil {
return nil, errors.New("connection manager is not initialized")
}

if c.connManager.JetStream() == nil {
return nil, errors.New("jetstream is not initialized")
}

return c.subManager.Subscribe(ctx, topic, c.connManager.JetStream(), c.Config, c.logger, c.metrics)
}

Expand Down
5 changes: 4 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package nats

import (
"time"

"gofr.dev/pkg/gofr/datasource/pubsub"
)

// Config defines the Client configuration.
Expand All @@ -25,7 +27,7 @@ type StreamConfig struct {
}

// New creates a new Client.
func New(cfg *Config) *PubSubWrapper {
func New(cfg *Config, logger pubsub.Logger) *PubSubWrapper {
if cfg == nil {
cfg = &Config{}
}
Expand All @@ -37,6 +39,7 @@ func New(cfg *Config) *PubSubWrapper {
client := &Client{
Config: cfg,
subManager: NewSubscriptionManager(cfg.BatchSize),
logger: logger,
}

return &PubSubWrapper{Client: client}
Expand Down
24 changes: 23 additions & 1 deletion connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ func NewConnectionManager(
logger pubsub.Logger,
natsConnector NATSConnector,
jetStreamCreator JetStreamCreator) *ConnectionManager {

// if logger is nil panic
if logger == nil {
panic("logger is required")
}

if natsConnector == nil {
natsConnector = &DefaultNATSConnector{}
}

if jetStreamCreator == nil {
jetStreamCreator = &DefaultJetStreamCreator{}
}

return &ConnectionManager{
config: cfg,
logger: logger,
Expand All @@ -66,7 +80,15 @@ func NewConnectionManager(

// Connect establishes a connection to NATS and sets up JetStream.
func (cm *ConnectionManager) Connect() error {
connInterface, err := cm.natsConnector.Connect(cm.config.Server, nats.Name("GoFr NATS JetStreamClient"))
cm.logger.Logf("Connecting to NATS server at %v", cm.config.Server)

opts := []nats.Option{nats.Name("GoFr NATS JetStreamClient")}

if cm.config.CredsFile != "" {
opts = append(opts, nats.UserCredentials(cm.config.CredsFile))
}

connInterface, err := cm.natsConnector.Connect(cm.config.Server, opts...)
if err != nil {
cm.logger.Errorf("failed to connect to NATS server at %v: %v", cm.config.Server, err)

Expand Down
1 change: 1 addition & 0 deletions connectors.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// Package nats connector.go
package nats

import (
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module gofr.dev/pkg/gofr/datasource/pubsub/nats
module github.com/carverauto/gofr-nats

go 1.23.1

Expand Down
380 changes: 1 addition & 379 deletions go.sum

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion pubsub_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nats

import (
"context"
"fmt"

"gofr.dev/pkg/gofr/datasource"
"gofr.dev/pkg/gofr/datasource/pubsub"
Expand Down Expand Up @@ -45,9 +46,15 @@ func (w *PubSubWrapper) Health() datasource.Health {

// Connect establishes a connection to NATS.
func (w *PubSubWrapper) Connect() {
fmt.Println("Connecting to NATS using PubSubWrapper")
if w.Client.connManager != nil && w.Client.connManager.Health().Status == datasource.StatusUp {
w.Client.logger.Log("NATS connection already established")
return
}

err := w.Client.Connect()
if err != nil {
w.Client.logger.Errorf("Error connecting to NATS: %v", err)
w.Client.logger.Errorf("PubSubWrapper: Error connecting to NATS: %v", err)
}
}

Expand Down
1 change: 1 addition & 0 deletions subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (*SubscriptionManager) consumeMessages(
buffer chan *pubsub.Message,
cfg *Config,
logger pubsub.Logger) {
// TODO: propagate errors to caller
for {
select {
case <-ctx.Done():
Expand Down
15 changes: 0 additions & 15 deletions using-subscriber-nats/Dockerfile

This file was deleted.

18 changes: 0 additions & 18 deletions using-subscriber-nats/configs/.env

This file was deleted.

75 changes: 0 additions & 75 deletions using-subscriber-nats/main.go

This file was deleted.

69 changes: 0 additions & 69 deletions using-subscriber-nats/main_test.go

This file was deleted.

20 changes: 0 additions & 20 deletions using-subscriber-nats/migrations/1721800255_create_topics.go

This file was deleted.

11 changes: 0 additions & 11 deletions using-subscriber-nats/migrations/all.go

This file was deleted.

21 changes: 0 additions & 21 deletions using-subscriber-nats/migrations/all_test.go

This file was deleted.

30 changes: 0 additions & 30 deletions using-subscriber-nats/readme.md

This file was deleted.

0 comments on commit b0545cd

Please sign in to comment.