Skip to content

Commit

Permalink
fix(#111): handle timeouts in an optimistic manner
Browse files Browse the repository at this point in the history
  • Loading branch information
firminochangani committed Dec 25, 2023
1 parent 4c1a79a commit 5c64696
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 42 deletions.
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ SENT_FROM_EMAIL_ADDRESS=noreply@transactional.dobermann.dev
WORKER_REGION=europe

PRODUCTION_MODE=false

ENDPOINT_CHECK_TIMEOUT_IN_SECONDS=30
2 changes: 2 additions & 0 deletions .env.local
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ SENT_FROM_EMAIL_ADDRESS=noreply@transactional.dobermann.dev
WORKER_REGION=europe

PRODUCTION_MODE=false

ENDPOINT_CHECK_TIMEOUT_IN_SECONDS=30
2 changes: 1 addition & 1 deletion .github/workflows/deploy-production.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ on:
workflow_dispatch:
inputs:
version:
description: 0.0.0 (semver)
description: v0.0.0 (semver)
required: true

jobs:
Expand Down
13 changes: 0 additions & 13 deletions cmd/demo/main.go

This file was deleted.

4 changes: 4 additions & 0 deletions cmd/endpoint_simulator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func main() {
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)

router.GET("/", func(c echo.Context) error {
if c.QueryParam("timeout") == "true" {
time.Sleep(time.Second * 15)
}

if c.QueryParam("is_up") == "true" {
return c.NoContent(http.StatusOK)
}
Expand Down
23 changes: 12 additions & 11 deletions cmd/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,17 @@ import (
var Version = "development"

type Config struct {
AmqpUrl string `envconfig:"AMQP_URL"`
Port int `envconfig:"HTTP_PORT"`
JwtSecret string `envconfig:"JWT_SECRET"`
DebugMode string `envconfig:"DEBUG_MODE"`
DatabaseURL string `envconfig:"DATABASE_URL"`
ResendApiKey string `envconfig:"RESEND_API_KEY"`
HostnameForNotifications string `envconfig:"HOSTNAME_NOTIFICATION"`
SentFromEmailAddress string `envconfig:"SENT_FROM_EMAIL_ADDRESS"`
IsProductionMode bool `envconfig:"PRODUCTION_MODE"`
Region string `envconfig:"WORKER_REGION"`
AmqpUrl string `envconfig:"AMQP_URL"`
Port int `envconfig:"HTTP_PORT"`
JwtSecret string `envconfig:"JWT_SECRET"`
DebugMode string `envconfig:"DEBUG_MODE"`
DatabaseURL string `envconfig:"DATABASE_URL"`
ResendApiKey string `envconfig:"RESEND_API_KEY"`
HostnameForNotifications string `envconfig:"HOSTNAME_NOTIFICATION"`
SentFromEmailAddress string `envconfig:"SENT_FROM_EMAIL_ADDRESS"`
IsProductionMode bool `envconfig:"PRODUCTION_MODE"`
Region string `envconfig:"WORKER_REGION"`
EndpointCheckTimeoutInSeconds int `envconfig:"ENDPOINT_CHECK_TIMEOUT_IN_SECONDS" required:"true"`
}

func (c Config) IsDebugMode() bool {
Expand Down Expand Up @@ -138,7 +139,7 @@ func main() {

logger.Info("Connected successfully to RabbitMQ")

httpChecker, err := endpoint_checkers.NewHttpChecker(config.Region)
httpChecker, err := endpoint_checkers.NewHttpChecker(config.Region, config.EndpointCheckTimeoutInSeconds)
if err != nil {
logger.Fatal(err)
}
Expand Down
15 changes: 8 additions & 7 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ import (
var Version = "development"

type Config struct {
AmqpUrl string `envconfig:"AMQP_URL"`
Port int `envconfig:"HTTP_PORT"`
DebugMode string `envconfig:"DEBUG_MODE"`
DatabaseURL string `envconfig:"DATABASE_URL"`
Region string `envconfig:"WORKER_REGION" required:"true"`
IsProductionMode bool `envconfig:"PRODUCTION_MODE"`
AmqpUrl string `envconfig:"AMQP_URL"`
Port int `envconfig:"HTTP_PORT"`
DebugMode string `envconfig:"DEBUG_MODE"`
DatabaseURL string `envconfig:"DATABASE_URL"`
Region string `envconfig:"WORKER_REGION" required:"true"`
IsProductionMode bool `envconfig:"PRODUCTION_MODE"`
EndpointCheckTimeoutInSeconds int `envconfig:"ENDPOINT_CHECK_TIMEOUT_IN_SECONDS" required:"true"`
}

func (c Config) IsDebugMode() bool {
Expand Down Expand Up @@ -68,7 +69,7 @@ func main() {
logger.Fatal(err)
}

httpChecker, err := endpoint_checkers.NewHttpChecker(config.Region)
httpChecker, err := endpoint_checkers.NewHttpChecker(config.Region, config.EndpointCheckTimeoutInSeconds)
if err != nil {
logger.Fatal(err)
}
Expand Down
30 changes: 27 additions & 3 deletions internal/adapters/endpoint_checkers/http_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package endpoint_checkers

import (
"context"
"errors"
"fmt"
"net/http"
"time"
Expand All @@ -14,15 +15,19 @@ type HttpChecker struct {
region monitor.Region
}

func NewHttpChecker(region string) (HttpChecker, error) {
func NewHttpChecker(region string, timeoutInSeconds int) (HttpChecker, error) {
reg, err := monitor.NewRegion(region)
if err != nil {
return HttpChecker{}, err
}

if timeoutInSeconds < 0 || timeoutInSeconds > 30 {
return HttpChecker{}, fmt.Errorf("the timeout should be within the range of 1 and 30")
}

return HttpChecker{
client: &http.Client{
Timeout: time.Second * 5,
Timeout: time.Second * time.Duration(timeoutInSeconds),
},
region: reg,
}, nil
Expand All @@ -36,13 +41,32 @@ func (h HttpChecker) Check(ctx context.Context, endpointUrl string) (*monitor.Ch

startedAt := time.Now()
resp, err := h.client.Do(req)
if errors.Is(err, errors.Unwrap(err)) {
return h.createCheckResults(startedAt, nil, true)
}

if err != nil {
return nil, fmt.Errorf("unable to check endpoint %s: %v", endpointUrl, err)
}
defer func() { _ = resp.Body.Close() }()

return h.createCheckResults(startedAt, &resp.StatusCode, false)
}

func (h HttpChecker) createCheckResults(startedAt time.Time, statusCode *int, isForcedTimeout bool) (*monitor.CheckResult, error) {
checkDuration := time.Since(startedAt)
checkResult, err := monitor.NewCheckResult(int16(resp.StatusCode), h.region, time.Now(), int16(checkDuration.Milliseconds()))

var sCode int16

if statusCode != nil {
sCode = int16(*statusCode)
}

if isForcedTimeout {
sCode = int16(http.StatusRequestTimeout)
}

checkResult, err := monitor.NewCheckResult(sCode, h.region, time.Now(), int16(checkDuration.Milliseconds()))
if err != nil {
return nil, fmt.Errorf("unable to create CheckResult: %v", err)
}
Expand Down
29 changes: 23 additions & 6 deletions internal/adapters/endpoint_checkers/http_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package endpoint_checkers_test
import (
"context"
"fmt"
"net/http"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/flowck/dobermann/backend/internal/adapters/endpoint_checkers"
"github.com/flowck/dobermann/backend/internal/domain/monitor"
)

func TestHttpChecker(t *testing.T) {
Expand All @@ -18,12 +20,27 @@ func TestHttpChecker(t *testing.T) {

simulatorEndpointUrl := os.Getenv("SIMULATOR_ENDPOINT_URL")

httpChecker, err := endpoint_checkers.NewHttpChecker("europe")
httpChecker, err := endpoint_checkers.NewHttpChecker(monitor.RegionEurope.String(), 2)
require.NoError(t, err)
_, err = httpChecker.Check(ctx, fmt.Sprintf("%s?is_up=true", simulatorEndpointUrl))
assert.NoError(t, err)

checkResult, err := httpChecker.Check(ctx, fmt.Sprintf("%s?is_up=false", simulatorEndpointUrl))
require.NoError(t, err)
assert.True(t, checkResult.IsEndpointDown())
t.Run("is_up", func(t *testing.T) {
t.Parallel()
_, err = httpChecker.Check(ctx, fmt.Sprintf("%s?is_up=true", simulatorEndpointUrl))
assert.NoError(t, err)
})

t.Run("is_down", func(t *testing.T) {
t.Parallel()
checkResult, err := httpChecker.Check(ctx, fmt.Sprintf("%s?is_up=false", simulatorEndpointUrl))
require.NoError(t, err)
assert.True(t, checkResult.IsEndpointDown())
})

t.Run("error_endpoint_timeouts", func(t *testing.T) {
t.Parallel()

result, err := httpChecker.Check(ctx, fmt.Sprintf("%s?is_up=true&timeout=true", simulatorEndpointUrl))
require.NoError(t, err)
assert.Equal(t, http.StatusRequestTimeout, int(result.StatusCode()))
})
}
2 changes: 1 addition & 1 deletion internal/app/command/bulk_check_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (p mockTxProvider) Transact(ctx context.Context, fn command.TransactFunc) e
}

func TestNewBulkCheckEndpointsHandler(t *testing.T) {
endpointsChecker, err := endpoint_checkers.NewHttpChecker("europe")
endpointsChecker, err := endpoint_checkers.NewHttpChecker("europe", 5)
require.NoError(t, err)
txProvider := mockTxProvider{
EventPublisher: events.NewPublisherMock(),
Expand Down
1 change: 1 addition & 0 deletions misc/deploy/service.fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ primary_region = "lhr"
PRODUCTION_MODE=true
HOSTNAME_NOTIFICATION='https://dobermann.dev'
SENT_FROM_EMAIL_ADDRESS='noreply@transactional.dobermann.dev'
ENDPOINT_CHECK_TIMEOUT_IN_SECONDS=30


[http_service]
Expand Down
1 change: 1 addition & 0 deletions misc/deploy/worker.fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ primary_region = "lhr"

[env]
PRODUCTION_MODE=true
ENDPOINT_CHECK_TIMEOUT_IN_SECONDS=30

[http_service]
internal_port = 8080
Expand Down

0 comments on commit 5c64696

Please sign in to comment.