diff --git a/client.go b/client.go index c4f5675..beba7d4 100644 --- a/client.go +++ b/client.go @@ -44,7 +44,13 @@ func (c *Client) Connect() error { } c.connManager = connManager - c.streamManager = NewStreamManager(c.connManager.JetStream(), c.logger) + + js, err := c.connManager.JetStream() + if err != nil { + return err + } + + c.streamManager = NewStreamManager(js, c.logger) c.subManager = NewSubscriptionManager(c.Config.BatchSize) c.logSuccessfulConnection() @@ -95,7 +101,12 @@ func (c *Client) Publish(ctx context.Context, subject string, message []byte) er // Subscribe subscribes to a topic and returns a single message. func (c *Client) Subscribe(ctx context.Context, topic string) (*pubsub.Message, error) { - return c.subManager.Subscribe(ctx, topic, c.connManager.JetStream(), c.Config, c.logger, c.metrics) + js, err := c.connManager.JetStream() + if err != nil { + return nil, err + } + + return c.subManager.Subscribe(ctx, topic, js, c.Config, c.logger, c.metrics) } func (c *Client) generateConsumerName(subject string) string { @@ -109,7 +120,11 @@ func (c *Client) SubscribeWithHandler(ctx context.Context, subject string, handl // Cancel any existing subscription for this subject c.cancelExistingSubscription(subject) - js := c.connManager.JetStream() + js, err := c.connManager.JetStream() + if err != nil { + return err + } + consumerName := c.generateConsumerName(subject) cons, err := c.createOrUpdateConsumer(ctx, js, subject, consumerName) @@ -241,3 +256,13 @@ func (c *Client) DeleteStream(ctx context.Context, name string) error { func (c *Client) CreateOrUpdateStream(ctx context.Context, cfg *jetstream.StreamConfig) (jetstream.Stream, error) { return c.streamManager.CreateOrUpdateStream(ctx, cfg) } + +// GetJetStreamStatus returns the status of the JetStream connection. +func GetJetStreamStatus(ctx context.Context, js jetstream.JetStream) string { + _, err := js.AccountInfo(ctx) + if err != nil { + return jetStreamStatusError + ": " + err.Error() + } + + return jetStreamStatusOK +} diff --git a/client_test.go b/client_test.go index 446e6a3..7db7297 100644 --- a/client_test.go +++ b/client_test.go @@ -126,7 +126,7 @@ func TestNATSClient_SubscribeSuccess(t *testing.T) { Value: []byte("test message"), } - mockConnManager.EXPECT().JetStream().Return(mockJetStream).Times(2) + mockConnManager.EXPECT().JetStream().Return(mockJetStream, nil).AnyTimes() mockSubManager.EXPECT(). Subscribe(ctx, "test-subject", mockJetStream, gomock.Any(), gomock.Any(), gomock.Any()). @@ -164,7 +164,7 @@ func TestNATSClient_SubscribeError(t *testing.T) { ctx := context.Background() expectedErr := errSubscriptionError - mockConnManager.EXPECT().JetStream().Return(mockJetStream).Times(2) + mockConnManager.EXPECT().JetStream().Return(mockJetStream, nil).AnyTimes() mockSubManager.EXPECT(). Subscribe(ctx, "test-subject", mockJetStream, gomock.Any(), gomock.Any(), gomock.Any()). @@ -491,208 +491,218 @@ func TestClient_SubscribeWithHandler(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - // Create separate mock consumers and message batches for each subscription - mockConnManager := NewMockConnectionManagerInterface(ctrl) - mockJetStream := NewMockJetStream(ctrl) - mockConsumer1 := NewMockConsumer(ctrl) - mockMessageBatch1 := NewMockMessageBatch(ctrl) - messageChan1 := make(chan jetstream.Msg, 1) - mockMsg1 := NewMockMsg(ctrl) + client, mocks := setupClientAndMocks(t, ctrl) + + var wg sync.WaitGroup - mockConsumer2 := NewMockConsumer(ctrl) - mockMessageBatch2 := NewMockMessageBatch(ctrl) - messageChan2 := make(chan jetstream.Msg, 1) - mockMsg2 := NewMockMsg(ctrl) + wg.Add(2) // Two handlers - // Create a mock for SubscriptionManagerInterface - mockSubManager := NewMockSubscriptionManagerInterface(ctrl) + t.Run("First_Subscription", func(t *testing.T) { + testFirstSubscription(t, client, mocks, &wg) + }) + + t.Run("Second_Subscription", func(t *testing.T) { + testSecondSubscription(t, client, mocks, &wg) + }) + + waitForHandlersToComplete(t, &wg) + + err := client.Close(context.Background()) + require.NoError(t, err) +} + +func setupClientAndMocks(t *testing.T, ctrl *gomock.Controller) (*Client, *testMocks) { + t.Helper() + + mocks := &testMocks{ + connManager: NewMockConnectionManagerInterface(ctrl), + jetStream: NewMockJetStream(ctrl), + consumer1: NewMockConsumer(ctrl), + messageBatch1: NewMockMessageBatch(ctrl), + msg1: NewMockMsg(ctrl), + consumer2: NewMockConsumer(ctrl), + messageBatch2: NewMockMessageBatch(ctrl), + msg2: NewMockMsg(ctrl), + subManager: NewMockSubscriptionManagerInterface(ctrl), + messageChan1: make(chan jetstream.Msg, 1), + messageChan2: make(chan jetstream.Msg, 1), + } - // Initialize the client with all necessary mocks client := &Client{ - connManager: mockConnManager, - subManager: mockSubManager, // Assign the mockSubManager here - Config: &Config{ - Consumer: "test-consumer", - Stream: StreamConfig{ - Stream: "test-stream", - MaxDeliver: 3, - }, - MaxWait: time.Second, - }, + connManager: mocks.connManager, + subManager: mocks.subManager, + Config: createTestConfig(), logger: logging.NewMockLogger(logging.DEBUG), subscriptions: make(map[string]context.CancelFunc), } - // Set up expectations for JetStream() to be called twice (for two subscriptions) - mockConnManager.EXPECT(). - JetStream(). - Return(mockJetStream). - Times(2) + setupCommonExpectations(mocks) - // Set up expectations for subManager.Close() - mockSubManager.EXPECT(). - Close(). - Times(1) + return client, mocks +} - // Set up expectations for connManager.Close(ctx) - mockConnManager.EXPECT(). - Close(gomock.Any()). - Times(1) // Removed Return(nil) since Close does not return anything +func createTestConfig() *Config { + return &Config{ + Consumer: "test-consumer", + Stream: StreamConfig{ + Stream: "test-stream", + MaxDeliver: 3, + }, + MaxWait: time.Second, + } +} - // --------------------- - // Synchronization Setup - // --------------------- - var wg sync.WaitGroup +func setupCommonExpectations(mocks *testMocks) { + mocks.connManager.EXPECT().JetStream().Return(mocks.jetStream, nil).AnyTimes() + mocks.subManager.EXPECT().Close().Times(1) + mocks.connManager.EXPECT().Close(gomock.Any()).AnyTimes() +} - wg.Add(2) // Two handlers +func testFirstSubscription(t *testing.T, client *Client, mocks *testMocks, wg *sync.WaitGroup) { + t.Helper() - // --------------------- - // First Subscription Execution - // --------------------- - t.Run("First_Subscription", func(t *testing.T) { - // Expect CreateOrUpdateConsumer to be called once for the first subscription - mockJetStream.EXPECT(). - CreateOrUpdateConsumer(gomock.Any(), gomock.Any(), gomock.Any()). - Return(mockConsumer1, nil). - Times(1) - - // Expect Fetch to be called twice: once to fetch the message, once to fetch nothing - mockConsumer1.EXPECT(). - Fetch(gomock.Any(), gomock.Any()). - Return(mockMessageBatch1, nil). - Times(2) - - // Expect Messages to return the message channel first, then nil - gomock.InOrder( - mockMessageBatch1.EXPECT(). - Messages(). - Return(messageChan1). - Times(1), - - mockMessageBatch1.EXPECT(). - Messages(). - Return(nil). - Times(1), - ) - - // Expect Error to be called once for the first subscription - mockMessageBatch1.EXPECT(). - Error(). - Return(nil). - Times(1) // Adjusted from Times(2) to Times(1) + setupFirstSubscriptionExpectations(mocks) - // Expect Ack to be called once for the first message - mockMsg1.EXPECT(). - Ack(). - Return(nil). - Times(1) + firstHandlerCalled := make(chan bool, 1) + firstHandler := createFirstHandler(t, firstHandlerCalled, wg) - // Define the first handler that acknowledges the message - firstHandlerCalled := make(chan bool, 1) - firstHandler := func(context.Context, jetstream.Msg) error { - t.Log("First handler called") - firstHandlerCalled <- true + err := client.SubscribeWithHandler(context.Background(), "test-subject", firstHandler) + require.NoError(t, err) - wg.Done() + mocks.messageChan1 <- mocks.msg1 + close(mocks.messageChan1) - return nil - } + assertHandlerCalled(t, firstHandlerCalled, "First handler") +} - // Subscribe with the first handler - err := client.SubscribeWithHandler(context.Background(), "test-subject", firstHandler) - require.NoError(t, err) +func testSecondSubscription(t *testing.T, client *Client, mocks *testMocks, wg *sync.WaitGroup) { + t.Helper() - // Send a message to trigger the first handler - messageChan1 <- mockMsg1 + setupSecondSubscriptionExpectations(mocks) - // Close the message channel to allow Fetch to return nil on the next call - close(messageChan1) + errorHandlerCalled := make(chan bool, 1) + errorHandler := createErrorHandler(t, errorHandlerCalled, wg) - // Wait for the first handler to be called - select { - case <-firstHandlerCalled: - t.Log("First handler was called successfully") - case <-time.After(time.Second * 5): - t.Fatal("First handler was not called within the expected time") - } - }) + err := client.SubscribeWithHandler(context.Background(), "test-subject", errorHandler) + require.NoError(t, err) - // --------------------- - // Second Subscription Execution - // --------------------- - t.Run("Second_Subscription", func(t *testing.T) { - // Expect CreateOrUpdateConsumer to be called once for the second subscription - mockJetStream.EXPECT(). - CreateOrUpdateConsumer(gomock.Any(), gomock.Any(), gomock.Any()). - Return(mockConsumer2, nil). - Times(1) - - // Expect Fetch to be called twice: once to fetch the message, once to fetch nothing - mockConsumer2.EXPECT(). - Fetch(gomock.Any(), gomock.Any()). - Return(mockMessageBatch2, nil). - Times(2) - - // Expect Messages to return the message channel first, then nil - gomock.InOrder( - mockMessageBatch2.EXPECT(). - Messages(). - Return(messageChan2). - Times(1), - - mockMessageBatch2.EXPECT(). - Messages(). - Return(nil). - Times(1), - ) - - // Expect Error to be called once for the second subscription - mockMessageBatch2.EXPECT(). - Error(). + mocks.messageChan2 <- mocks.msg2 + close(mocks.messageChan2) + + assertHandlerCalled(t, errorHandlerCalled, "Error handler") +} + +func setupFirstSubscriptionExpectations(mocks *testMocks) { + mocks.jetStream.EXPECT(). + CreateOrUpdateConsumer(gomock.Any(), gomock.Any(), gomock.Any()). + Return(mocks.consumer1, nil). + Times(1) + + mocks.consumer1.EXPECT(). + Fetch(gomock.Any(), gomock.Any()). + Return(mocks.messageBatch1, nil). + Times(2) + + gomock.InOrder( + mocks.messageBatch1.EXPECT(). + Messages(). + Return(mocks.messageChan1). + Times(1), + + mocks.messageBatch1.EXPECT(). + Messages(). Return(nil). - Times(1) // Adjusted from Times(2) to Times(1) + Times(1), + ) - // Expect Nak to be called once for the second message - mockMsg2.EXPECT(). - Nak(). + mocks.messageBatch1.EXPECT(). + Error(). + Return(nil). + Times(1) + + mocks.msg1.EXPECT(). + Ack(). + Return(nil). + Times(1) +} + +func setupSecondSubscriptionExpectations(mocks *testMocks) { + mocks.jetStream.EXPECT(). + CreateOrUpdateConsumer(gomock.Any(), gomock.Any(), gomock.Any()). + Return(mocks.consumer2, nil). + Times(1) + + mocks.consumer2.EXPECT(). + Fetch(gomock.Any(), gomock.Any()). + Return(mocks.messageBatch2, nil). + Times(2) + + gomock.InOrder( + mocks.messageBatch2.EXPECT(). + Messages(). + Return(mocks.messageChan2). + Times(1), + + mocks.messageBatch2.EXPECT(). + Messages(). Return(nil). - Times(1) + Times(1), + ) - // Define the second handler that returns an error, causing a NAK - errorHandlerCalled := make(chan bool, 1) - errorHandler := func(context.Context, jetstream.Msg) error { - t.Log("Error handler called") - errorHandlerCalled <- true + mocks.messageBatch2.EXPECT(). + Error(). + Return(nil). + Times(1) - t.Logf("Error handling message: %v", errHandlerError) - t.Logf("Error processing message: %v", errHandlerError) - wg.Done() + mocks.msg2.EXPECT(). + Nak(). + Return(nil). + Times(1) +} - return errHandlerError - } +func createFirstHandler(t *testing.T, handlerCalled chan<- bool, wg *sync.WaitGroup) func(context.Context, jetstream.Msg) error { + t.Helper() - // Subscribe with the error handler - err := client.SubscribeWithHandler(context.Background(), "test-subject", errorHandler) - require.NoError(t, err) + return func(context.Context, jetstream.Msg) error { + t.Log("First handler called") + handlerCalled <- true - // Send a message to trigger the error handler - messageChan2 <- mockMsg2 + wg.Done() - // Close the message channel to allow Fetch to return nil on the next call - close(messageChan2) + return nil + } +} - // Wait for the error handler to be called - select { - case <-errorHandlerCalled: - t.Log("Error handler was called successfully") - case <-time.After(time.Second): - t.Fatal("Error handler was not called within the expected time") - } - }) +func createErrorHandler(t *testing.T, handlerCalled chan<- bool, wg *sync.WaitGroup) func(context.Context, jetstream.Msg) error { + t.Helper() + + return func(context.Context, jetstream.Msg) error { + t.Log("Error handler called") + handlerCalled <- true + + t.Logf("Error handling message: %v", errHandlerError) + t.Logf("Error processing message: %v", errHandlerError) + + wg.Done() + + return errHandlerError + } +} + +func assertHandlerCalled(t *testing.T, handlerCalled <-chan bool, handlerName string) { + t.Helper() + + select { + case <-handlerCalled: + t.Logf("%s was called successfully", handlerName) + case <-time.After(time.Second * 5): + t.Fatalf("%s was not called within the expected time", handlerName) + } +} + +func waitForHandlersToComplete(t *testing.T, wg *sync.WaitGroup) { + t.Helper() - // --------------------- - // Wait for All Handlers to Complete - // --------------------- done := make(chan struct{}) go func() { wg.Wait() @@ -705,12 +715,20 @@ func TestClient_SubscribeWithHandler(t *testing.T) { case <-time.After(5 * time.Second): t.Fatal("Handlers did not complete within the expected time") } +} - // --------------------- - // Gracefully Close the Client - // --------------------- - err := client.Close(context.Background()) - require.NoError(t, err) +type testMocks struct { + connManager *MockConnectionManagerInterface + jetStream *MockJetStream + consumer1 *MockConsumer + messageBatch1 *MockMessageBatch + messageChan1 chan jetstream.Msg + msg1 *MockMsg + consumer2 *MockConsumer + messageBatch2 *MockMessageBatch + messageChan2 chan jetstream.Msg + msg2 *MockMsg + subManager *MockSubscriptionManagerInterface } func TestClient_CreateStream(t *testing.T) { diff --git a/connection_manager.go b/connection_manager.go index 5566169..4a3bf49 100644 --- a/connection_manager.go +++ b/connection_manager.go @@ -25,8 +25,12 @@ type ConnectionManager struct { jetStreamCreator JetStreamCreator } -func (cm *ConnectionManager) JetStream() jetstream.JetStream { - return cm.jetStream +func (cm *ConnectionManager) JetStream() (jetstream.JetStream, error) { + if cm.jetStream == nil { + return nil, errJetStreamNotConfigured + } + + return cm.jetStream, nil } // natsConnWrapper wraps a nats.Conn to implement the ConnInterface. diff --git a/connection_manager_test.go b/connection_manager_test.go index bd96531..034a7f9 100644 --- a/connection_manager_test.go +++ b/connection_manager_test.go @@ -148,12 +148,28 @@ func TestConnectionManager_Health(t *testing.T) { } func TestConnectionManager_JetStream(t *testing.T) { - mockJS := NewMockJetStream(gomock.NewController(t)) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockJS := NewMockJetStream(ctrl) cm := &ConnectionManager{ jetStream: mockJS, } - assert.Equal(t, mockJS, cm.JetStream()) + js, err := cm.JetStream() + require.NoError(t, err) + assert.Equal(t, mockJS, js) +} + +func TestConnectionManager_JetStream_Nil(t *testing.T) { + cm := &ConnectionManager{ + jetStream: nil, + } + + js, err := cm.JetStream() + require.Error(t, err) + assert.Nil(t, js) + assert.EqualError(t, err, "JetStream is not configured") } func TestNatsConnWrapper_Status(t *testing.T) { diff --git a/health.go b/health.go index 0235692..0440834 100644 --- a/health.go +++ b/health.go @@ -3,7 +3,6 @@ package nats import ( "context" - "github.com/nats-io/nats.go/jetstream" "gofr.dev/pkg/gofr/datasource" ) @@ -12,7 +11,6 @@ const ( jetStreamStatusOK = "OK" jetStreamStatusError = "Error" jetStreamConnected = "CONNECTED" - jetStreamConnecting = "CONNECTING" jetStreamDisconnecting = "DISCONNECTED" ) @@ -27,22 +25,18 @@ func (c *Client) Health() datasource.Health { health := c.connManager.Health() health.Details["backend"] = natsBackend - js := c.connManager.JetStream() - if js != nil { - health.Details["jetstream_enabled"] = true - health.Details["jetstream_status"] = getJetStreamStatus(context.Background(), js) - } else { + js, err := c.connManager.JetStream() + if err != nil { health.Details["jetstream_enabled"] = false - } - - return health -} + health.Details["jetstream_status"] = jetStreamStatusError + ": " + err.Error() -func getJetStreamStatus(ctx context.Context, js jetstream.JetStream) string { - _, err := js.AccountInfo(ctx) - if err != nil { - return jetStreamStatusError + ": " + err.Error() + return health } - return jetStreamStatusOK + // Call AccountInfo() to get JetStream status + jetStreamStatus := GetJetStreamStatus(context.Background(), js) + health.Details["jetstream_enabled"] = true + health.Details["jetstream_status"] = jetStreamStatus + + return health } diff --git a/health_test.go b/health_test.go index 2ab5b74..4f9c263 100644 --- a/health_test.go +++ b/health_test.go @@ -36,7 +36,7 @@ func defineHealthTestCases() []healthTestCase { "connection_status": jetStreamConnected, }, }) - mockConnManager.EXPECT().JetStream().Return(mockJS) + mockConnManager.EXPECT().JetStream().Return(mockJS, nil) mockJS.EXPECT().AccountInfo(gomock.Any()).Return(&jetstream.AccountInfo{}, nil) }, expectedStatus: datasource.StatusUp, @@ -58,7 +58,7 @@ func defineHealthTestCases() []healthTestCase { "connection_status": jetStreamDisconnecting, }, }) - mockConnManager.EXPECT().JetStream().Return(nil) + mockConnManager.EXPECT().JetStream().Return(nil, errJetStreamNotConfigured) }, expectedStatus: datasource.StatusDown, expectedDetails: map[string]interface{}{ @@ -66,6 +66,7 @@ func defineHealthTestCases() []healthTestCase { "backend": natsBackend, "connection_status": jetStreamDisconnecting, "jetstream_enabled": false, + "jetstream_status": jetStreamStatusError + ": JetStream is not configured", }, }, { @@ -78,7 +79,7 @@ func defineHealthTestCases() []healthTestCase { "connection_status": jetStreamConnected, }, }) - mockConnManager.EXPECT().JetStream().Return(mockJS) + mockConnManager.EXPECT().JetStream().Return(mockJS, nil) mockJS.EXPECT().AccountInfo(gomock.Any()).Return(nil, errJetStream) }, expectedStatus: datasource.StatusUp, diff --git a/interfaces.go b/interfaces.go index 1c3f8c9..0ed719b 100644 --- a/interfaces.go +++ b/interfaces.go @@ -46,7 +46,7 @@ type ConnectionManagerInterface interface { Close(ctx context.Context) Publish(ctx context.Context, subject string, message []byte, metrics Metrics) error Health() datasource.Health - JetStream() jetstream.JetStream + JetStream() (jetstream.JetStream, error) } // SubscriptionManagerInterface represents the main Subscription Manager. diff --git a/mock_client.go b/mock_client.go index 8fb434c..2b8c7ff 100644 --- a/mock_client.go +++ b/mock_client.go @@ -365,11 +365,12 @@ func (mr *MockConnectionManagerInterfaceMockRecorder) Health() *gomock.Call { } // JetStream mocks base method. -func (m *MockConnectionManagerInterface) JetStream() jetstream.JetStream { +func (m *MockConnectionManagerInterface) JetStream() (jetstream.JetStream, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "JetStream") ret0, _ := ret[0].(jetstream.JetStream) - return ret0 + ret1, _ := ret[1].(error) + return ret0, ret1 } // JetStream indicates an expected call of JetStream.