Skip to content

Commit

Permalink
Algod: New health endpoint (k8s /ready endpoint) (#4844)
Browse files Browse the repository at this point in the history
Add a `/ready` readiness probe endpoint for kubernetes deployments
  • Loading branch information
ahangsu authored Mar 31, 2023
1 parent d27b674 commit 7703bc4
Show file tree
Hide file tree
Showing 20 changed files with 857 additions and 328 deletions.
22 changes: 11 additions & 11 deletions catchup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Service struct {
deadlineTimeout time.Duration
blockValidationPool execpool.BacklogPool

// suspendForCatchpointWriting defines whether we've ran into a state where the ledger is currently busy writing the
// suspendForCatchpointWriting defines whether we've run into a state where the ledger is currently busy writing the
// catchpoint file. If so, we want to suspend the catchup process until the catchpoint file writing is complete,
// and resume from there without stopping the catchup timer.
suspendForCatchpointWriting bool
Expand Down Expand Up @@ -233,10 +233,10 @@ func (s *Service) innerFetch(r basics.Round, peer network.Peer) (blk *bookkeepin

// fetchAndWrite fetches a block, checks the cert, and writes it to the ledger. Cert checking and ledger writing both wait for the ledger to advance if necessary.
// Returns false if we should stop trying to catch up. This may occur for several reasons:
// - If the context is canceled (e.g. if the node is shutting down)
// - If we couldn't fetch the block (e.g. if there are no peers available or we've reached the catchupRetryLimit)
// - If the block is already in the ledger (e.g. if agreement service has already written it)
// - If the retrieval of the previous block was unsuccessful
// - If the context is canceled (e.g. if the node is shutting down)
// - If we couldn't fetch the block (e.g. if there are no peers available, or we've reached the catchupRetryLimit)
// - If the block is already in the ledger (e.g. if agreement service has already written it)
// - If the retrieval of the previous block was unsuccessful
func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool, lookbackComplete chan bool, peerSelector *peerSelector) bool {
// If sync-ing this round is not intended, don't fetch it
if dontSyncRound := s.GetDisableSyncRound(); dontSyncRound != 0 && r >= basics.Round(dontSyncRound) {
Expand All @@ -258,10 +258,10 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,
loggedMessage := fmt.Sprintf("fetchAndWrite(%d): block retrieval exceeded retry limit", r)
if _, initialSync := s.IsSynchronizing(); initialSync {
// on the initial sync, it's completly expected that we won't be able to get all the "next" blocks.
// Therefore info should suffice.
// Therefore, info should suffice.
s.log.Info(loggedMessage)
} else {
// On any subsequent sync, we migth be looking for multiple rounds into the future, so it's completly
// On any subsequent sync, we might be looking for multiple rounds into the future, so it's completely
// reasonable that we would fail retrieving the future block.
// Generate a warning here only if we're failing to retrieve X+1 or below.
// All other block retrievals should not generate a warning.
Expand Down Expand Up @@ -294,7 +294,7 @@ func (s *Service) fetchAndWrite(r basics.Round, prevFetchCompleteChan chan bool,
s.log.Debugf("fetchAndWrite(%v): Could not fetch: %v (attempt %d)", r, err, i)
peerSelector.rankPeer(psp, peerRankDownloadFailed)
// we've just failed to retrieve a block; wait until the previous block is fetched before trying again
// to avoid the usecase where the first block doesn't exists and we're making many requests down the chain
// to avoid the usecase where the first block doesn't exist, and we're making many requests down the chain
// for no reason.
if !hasLookback {
select {
Expand Down Expand Up @@ -479,7 +479,7 @@ func (s *Service) pipelinedFetch(seedLookback uint64) {
go func() {
defer wg.Done()
for t := range taskCh {
completed <- t() // This write to completed comes after a read from taskCh, so the invariant is preserved.
completed <- t() // This write comes after a read from taskCh, so the invariant is preserved.
}
}()
}
Expand Down Expand Up @@ -632,10 +632,10 @@ func (s *Service) periodicSync() {
}

// Syncs the client with the network. sync asks the network for last known block and tries to sync the system
// up the to the highest number it gets.
// up to the highest number it gets.
func (s *Service) sync() {
// Only run sync once at a time
// Store start time of sync - in NS so we can compute time.Duration (which is based on NS)
// Store start time of sync - in NS, so we can compute time.Duration (which is based on NS)
start := time.Now()

timeInNS := start.UnixNano()
Expand Down
30 changes: 30 additions & 0 deletions daemon/algod/api/algod.oas2.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,36 @@
}
}
},
"/ready": {
"get": {
"tags": [
"public",
"common"
],
"produces": [
"application/json"
],
"scheme": [
"http"
],
"summary": "Returns OK if healthy and fully caught up.",
"operationId": "GetReady",
"responses": {
"200": {
"description": "OK."
},
"500": {
"description": "Internal Error"
},
"503": {
"description": "Node not ready yet"
},
"default": {
"description": "Unknown Error"
}
}
}
},
"/metrics": {
"get": {
"tags": [
Expand Down
28 changes: 28 additions & 0 deletions daemon/algod/api/algod.oas3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2126,6 +2126,34 @@
]
}
},
"/ready": {
"get": {
"operationId": "GetReady",
"responses": {
"200": {
"content": {},
"description": "OK."
},
"500": {
"content": {},
"description": "Internal Error"
},
"503": {
"content": {},
"description": "Node not ready yet"
},
"default": {
"content": {},
"description": "Unknown Error"
}
},
"summary": "Returns OK if healthy and fully caught up.",
"tags": [
"public",
"common"
]
}
},
"/swagger.json": {
"get": {
"description": "Returns the entire swagger spec in json.",
Expand Down
6 changes: 6 additions & 0 deletions daemon/algod/api/client/restClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,12 @@ func (client RestClient) HealthCheck() error {
return client.get(nil, "/health", nil)
}

// ReadyCheck does a readiness check on the potentially running node,
// returning an error if the node is not ready (caught up and healthy)
func (client RestClient) ReadyCheck() error {
return client.get(nil, "/ready", nil)
}

// StatusAfterBlock waits for a block to occur then returns the StatusResponse after that block
// blocks on the node end
// Not supported
Expand Down
60 changes: 60 additions & 0 deletions daemon/algod/api/server/common/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package common

import (
"encoding/json"
"fmt"
"net/http"

"github.com/labstack/echo/v4"

"github.com/algorand/go-algorand/agreement"
"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/daemon/algod/api"
"github.com/algorand/go-algorand/daemon/algod/api/server/lib"
"github.com/algorand/go-algorand/daemon/algod/api/spec/common"
"github.com/algorand/go-algorand/node"
)

// GenesisJSON is an httpHandler for route GET /genesis
Expand Down Expand Up @@ -89,6 +92,63 @@ func HealthCheck(ctx lib.ReqContext, context echo.Context) {
json.NewEncoder(w).Encode(nil)
}

// Ready is a httpHandler for route GET /ready
// it serves "readiness" probe on if the node is healthy and fully caught-up.
func Ready(ctx lib.ReqContext, context echo.Context) {
// swagger:operation GET /ready Ready
//---
// Summary: Returns OK if healthy and fully caught up.
// Produces:
// - application/json
// Schemes:
// - http
// Responses:
// 200:
// description: OK.
// 500:
// description: Internal Error.
// 503:
// description: Node not ready yet.
// default: { description: Unknown Error }
w := context.Response().Writer
w.Header().Set("Content-Type", "application/json")

stat, err := ctx.Node.Status()
code := http.StatusOK

// isReadyFromStat checks the `Node.Status()` result
// and decide if the node is at the latest round
// must satisfy following sub conditions:
// 1. the node is not in a fast-catchup stage
// 2. the node's time since last round should be [0, deadline),
// while deadline = bigLambda + smallLambda = 17s
// 3. the node's catchup time is 0
isReadyFromStat := func(status node.StatusReport) bool {
timeSinceLastRound := status.TimeSinceLastRound().Milliseconds()

return len(status.Catchpoint) == 0 &&
timeSinceLastRound >= 0 &&
timeSinceLastRound < agreement.DeadlineTimeout().Milliseconds() &&
status.CatchupTime.Milliseconds() == 0
}

if err != nil {
code = http.StatusInternalServerError
ctx.Log.Error(err)
} else if stat.StoppedAtUnsupportedRound {
code = http.StatusInternalServerError
err = fmt.Errorf("stopped at an unsupported round")
ctx.Log.Error(err)
} else if !isReadyFromStat(stat) {
code = http.StatusServiceUnavailable
err = fmt.Errorf("ready failed as the node is catching up")
ctx.Log.Info(err)
}

w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(nil)
}

// VersionsHandler is an httpHandler for route GET /versions
func VersionsHandler(ctx lib.ReqContext, context echo.Context) {
// swagger:route GET /versions GetVersion
Expand Down
7 changes: 7 additions & 0 deletions daemon/algod/api/server/common/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ var Routes = lib.Routes{
HandlerFunc: HealthCheck,
},

lib.Route{
Name: "ready",
Method: "GET",
Path: "/ready",
HandlerFunc: Ready,
},

lib.Route{
Name: "swagger.json",
Method: "GET",
Expand Down
88 changes: 88 additions & 0 deletions daemon/algod/api/server/common/test/handlers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (C) 2019-2023 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package test

import (
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/labstack/echo/v4"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/daemon/algod/api/server/common"
"github.com/algorand/go-algorand/daemon/algod/api/server/lib"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/node"
"github.com/algorand/go-algorand/test/partitiontest"
)

func mockNodeStatusInRangeHelper(
t *testing.T, statusCode MockNodeCatchupStatus,
expectedErr error, expectedStatus node.StatusReport) {
mockNodeInstance := makeMockNode(statusCode)
status, err := mockNodeInstance.Status()
if expectedErr != nil {
require.Error(t, err, expectedErr)
} else {
require.Equal(t, expectedStatus, status)
}
}

func TestMockNodeStatus(t *testing.T) {
partitiontest.PartitionTest(t)

mockNodeStatusInRangeHelper(
t, CaughtUpAndReady, nil, cannedStatusReportCaughtUpAndReadyGolden)
mockNodeStatusInRangeHelper(
t, CatchingUpFast, nil, cannedStatusReportCatchingUpFastGolden)
mockNodeStatusInRangeHelper(
t, StoppedAtUnsupported, nil, cannedStatusReportStoppedAtUnsupportedGolden)
mockNodeStatusInRangeHelper(
t, 399, fmt.Errorf("catchup status out of scope error"), node.StatusReport{})
}

func readyEndpointTestHelper(
t *testing.T, node *mockNode, expectedCode int) {
reqCtx := lib.ReqContext{
Node: node,
Log: logging.NewLogger(),
Shutdown: make(chan struct{}),
}

e := echo.New()
req := httptest.NewRequest(http.MethodGet, "/", nil)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)

common.Ready(reqCtx, c)
require.Equal(t, expectedCode, rec.Code)
}

func TestReadyEndpoint(t *testing.T) {
partitiontest.PartitionTest(t)

mockNodeInstance := makeMockNode(CaughtUpAndReady)
readyEndpointTestHelper(t, mockNodeInstance, http.StatusOK)

mockNodeInstance.catchupStatus = CatchingUpFast
readyEndpointTestHelper(t, mockNodeInstance, http.StatusServiceUnavailable)

mockNodeInstance.catchupStatus = StoppedAtUnsupported
readyEndpointTestHelper(t, mockNodeInstance, http.StatusInternalServerError)
}
Loading

0 comments on commit 7703bc4

Please sign in to comment.