diff --git a/go.mod b/go.mod index ef91544..c1d5508 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,9 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/uuid v1.3.1 // indirect + github.com/hashicorp/go-cleanhttp v0.5.2 // indirect + github.com/hashicorp/go-retryablehttp v0.7.4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 291419b..d8d4e7a 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,20 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= +github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= +github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= +github.com/hashicorp/go-retryablehttp v0.7.4 h1:ZQgVdpTdAL7WpMIwLzCfbalOcSUdkDZnpUv3/+BxzFA= +github.com/hashicorp/go-retryablehttp v0.7.4/go.mod h1:Jy/gPYAdjqffZ/yFGCFV2doI5wjtH1ewM9u8iYVjtX8= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.5 h1:s5PTfem8p8EbKQOctVV53k6jCJt3UX4IEJzwh+C324Q= github.com/stretchr/testify v1.7.5/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= diff --git a/lib/novu.go b/lib/novu.go index 353566c..1ac40c6 100644 --- a/lib/novu.go +++ b/lib/novu.go @@ -4,11 +4,15 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "net/url" + "strconv" "strings" "time" + "github.com/google/uuid" + "github.com/hashicorp/go-retryablehttp" "github.com/pkg/errors" ) @@ -17,9 +21,17 @@ const ( NovuVersion = "v1" ) +type RetryConfigType struct { + InitialDelay time.Duration // inital delay + WaitMin time.Duration // Minimum time to wait + WaitMax time.Duration // Maximum time to wait + RetryMax int // Maximum number of retries +} + type Config struct { - BackendURL *url.URL - HttpClient *http.Client + BackendURL *url.URL + HttpClient *http.Client + RetryConfig *RetryConfigType } type APIClient struct { @@ -47,7 +59,37 @@ func NewAPIClient(apiKey string, cfg *Config) *APIClient { cfg.BackendURL = buildBackendURL(cfg) if cfg.HttpClient == nil { - cfg.HttpClient = &http.Client{Timeout: 20 * time.Second} + retyableClient := retryablehttp.NewClient() + if cfg.RetryConfig != nil { + retyableClient.RetryWaitMin = cfg.RetryConfig.WaitMin + retyableClient.RetryWaitMax = cfg.RetryConfig.WaitMax + retyableClient.RetryMax = cfg.RetryConfig.RetryMax + retyableClient.Backoff = func(min, max time.Duration, attemptNum int, resp *http.Response) time.Duration { + if resp != nil { + if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable { + if s, ok := resp.Header["Retry-After"]; ok { + if sleep, err := strconv.ParseInt(s[0], 10, 64); err == nil { + return time.Second * time.Duration(sleep) + } + } + } + } + if attemptNum == 0 { + return cfg.RetryConfig.InitialDelay //wait for InitialDelay on 1st retry + } + mult := math.Pow(2, float64(attemptNum)) * float64(min) + sleep := time.Duration(mult) + //float64(sleep) != mult is to make sure there is no conversion error + //if there is a conversion error, number is huge and we set the sleep to max + if float64(sleep) != mult || sleep > max { + sleep = max + } + return sleep + } + } else { + retyableClient.RetryMax = 0 //by default no retry + } + cfg.HttpClient = retyableClient.StandardClient() } c := &APIClient{apiKey: apiKey} @@ -70,6 +112,7 @@ func NewAPIClient(apiKey string, cfg *Config) *APIClient { func (c APIClient) sendRequest(req *http.Request, resp interface{}) (*http.Response, error) { req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", fmt.Sprintf("ApiKey %s", c.apiKey)) + req.Header.Set("Idempotency-Key", uuid.New().String()) res, err := c.config.HttpClient.Do(req) if err != nil { diff --git a/lib/novu_test.go b/lib/novu_test.go new file mode 100644 index 0000000..cc2b2f1 --- /dev/null +++ b/lib/novu_test.go @@ -0,0 +1,168 @@ +package lib_test + +import ( + "context" + "encoding/json" + "log" + "net/http" + "net/http/httptest" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/novuhq/go-novu/lib" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestError_Retry_With_Custom_Config(t *testing.T) { + var ( + subscriberBulkPayload lib.SubscriberBulkPayload + receivedBody lib.SubscriberBulkPayload + expectedRequest lib.SubscriberBulkPayload + ) + reqCount := 0 + var idempotencyHeader []string + allElementsSame := func(arr []string) bool { + if len(arr) == 0 { + return true // An empty array is considered to have all elements the same. + } + firstElement := arr[0] + for _, element := range arr { + if element != firstElement { + return false + } + } + return true + } + subscriberService := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if err := json.NewDecoder(req.Body).Decode(&receivedBody); err != nil { + log.Printf("error in unmarshalling %+v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + reqCount++ + + t.Run("Header must contain Idempotency-Key", func(t *testing.T) { + idKey := req.Header.Get("Idempotency-Key") + idempotencyHeader = append(idempotencyHeader, idKey) + assert.NotNil(t, idKey) + }) + t.Run("Header must contain ApiKey", func(t *testing.T) { + authKey := req.Header.Get("Authorization") + assert.True(t, strings.Contains(authKey, novuApiKey)) + assert.True(t, strings.HasPrefix(authKey, "ApiKey")) + }) + + t.Run("URL and request method is as expected", func(t *testing.T) { + expectedURL := "/v1/subscribers/bulk" + assert.Equal(t, http.MethodPost, req.Method) + assert.Equal(t, expectedURL, req.RequestURI) + }) + + t.Run("Request is as expected", func(t *testing.T) { + fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &expectedRequest) + assert.Equal(t, expectedRequest, receivedBody) + }) + + var resp lib.SubscriberResponse + fileToStruct(filepath.Join("../testdata", "subscriber_bulk_response.json"), &resp) + + w.WriteHeader(http.StatusInternalServerError) + bb, _ := json.Marshal(resp) + w.Write(bb) + })) + + defer subscriberService.Close() + + ctx := context.Background() + fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &subscriberBulkPayload) + + c := lib.NewAPIClient(novuApiKey, &lib.Config{BackendURL: lib.MustParseURL(subscriberService.URL), RetryConfig: &lib.RetryConfigType{RetryMax: 5, InitialDelay: 0 * time.Second}}) + + resp, err := c.SubscriberApi.BulkCreate(ctx, subscriberBulkPayload) + require.NotNil(t, err) + assert.NotNil(t, resp) + + //idempotency and retry tests + assert.Equal(t, reqCount, 6) + assert.Equal(t, len(idempotencyHeader), 6) + assert.True(t, allElementsSame(idempotencyHeader)) +} +func TestError_Retry_With_Default_Config(t *testing.T) { + var ( + subscriberBulkPayload lib.SubscriberBulkPayload + receivedBody lib.SubscriberBulkPayload + expectedRequest lib.SubscriberBulkPayload + ) + reqCount := 0 + var idempotencyHeader []string + allElementsSame := func(arr []string) bool { + if len(arr) == 0 { + return true // An empty array is considered to have all elements the same. + } + firstElement := arr[0] + for _, element := range arr { + if element != firstElement { + return false + } + } + return true + } + subscriberService := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if err := json.NewDecoder(req.Body).Decode(&receivedBody); err != nil { + log.Printf("error in unmarshalling %+v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + reqCount++ + + t.Run("Header must contain Idempotency-Key", func(t *testing.T) { + idKey := req.Header.Get("Idempotency-Key") + idempotencyHeader = append(idempotencyHeader, idKey) + assert.NotNil(t, idKey) + }) + t.Run("Header must contain ApiKey", func(t *testing.T) { + authKey := req.Header.Get("Authorization") + assert.True(t, strings.Contains(authKey, novuApiKey)) + assert.True(t, strings.HasPrefix(authKey, "ApiKey")) + }) + + t.Run("URL and request method is as expected", func(t *testing.T) { + expectedURL := "/v1/subscribers/bulk" + assert.Equal(t, http.MethodPost, req.Method) + assert.Equal(t, expectedURL, req.RequestURI) + }) + + t.Run("Request is as expected", func(t *testing.T) { + fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &expectedRequest) + assert.Equal(t, expectedRequest, receivedBody) + }) + + var resp lib.SubscriberResponse + fileToStruct(filepath.Join("../testdata", "subscriber_bulk_response.json"), &resp) + + w.WriteHeader(http.StatusInternalServerError) + bb, _ := json.Marshal(resp) + w.Write(bb) + })) + + defer subscriberService.Close() + + ctx := context.Background() + fileToStruct(filepath.Join("../testdata", "subscriber_bulk.json"), &subscriberBulkPayload) + + c := lib.NewAPIClient(novuApiKey, &lib.Config{BackendURL: lib.MustParseURL(subscriberService.URL)}) + + resp, err := c.SubscriberApi.BulkCreate(ctx, subscriberBulkPayload) + require.NotNil(t, err) + assert.NotNil(t, resp) + + //idempotency and retry tests + assert.Equal(t, reqCount, 1) + assert.True(t, allElementsSame(idempotencyHeader)) + assert.Equal(t, len(idempotencyHeader), 1) +}