Skip to content

Commit

Permalink
Calculate unhealthy reason (input/output/other) on agent documents (#…
Browse files Browse the repository at this point in the history
…3338)

* calculate unhealthy_reason on agent doc

* fix test

* added schema on agent.Components

* added changelog

* added tests on parseComponents
  • Loading branch information
juliaElastic authored Mar 18, 2024
1 parent b9133d7 commit 1572693
Show file tree
Hide file tree
Showing 8 changed files with 372 additions and 58 deletions.
32 changes: 32 additions & 0 deletions changelog/fragments/1710256335-add-unhealthy-reason.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Calculate unhealthy reason (input/output/other) in agent document

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3338

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
118 changes: 82 additions & 36 deletions internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ var (
)

const (
kEncodingGzip = "gzip"
kEncodingGzip = "gzip"
FailedStatus = "FAILED"
DegradedStatus = "DEGRADED"
)

// validActionTypes is a map of action.type and if they are valid
Expand Down Expand Up @@ -156,11 +158,12 @@ func invalidateAPIKeysOfInactiveAgent(ctx context.Context, zlog zerolog.Logger,

// validatedCheckin is a struct to wrap all the things that validateRequest returns.
type validatedCheckin struct {
req *CheckinRequest
dur time.Duration
rawMeta []byte
rawComp []byte
seqno sqn.SeqNo
req *CheckinRequest
dur time.Duration
rawMeta []byte
rawComp []byte
seqno sqn.SeqNo
unhealthyReason *[]string
}

func (ct *CheckinT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter, r *http.Request, start time.Time, agent *model.Agent) (validatedCheckin, error) {
Expand Down Expand Up @@ -228,7 +231,7 @@ func (ct *CheckinT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter,
}

// Compare agent_components content and update if different
rawComponents, err := parseComponents(zlog, agent, &req)
rawComponents, unhealthyReason, err := parseComponents(zlog, agent, &req)
if err != nil {
return val, err
}
Expand All @@ -240,11 +243,12 @@ func (ct *CheckinT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter,
}

return validatedCheckin{
req: &req,
dur: pollDuration,
rawMeta: rawMeta,
rawComp: rawComponents,
seqno: seqno,
req: &req,
dur: pollDuration,
rawMeta: rawMeta,
rawComp: rawComponents,
seqno: seqno,
unhealthyReason: unhealthyReason,
}, nil
}

Expand All @@ -258,6 +262,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
rawMeta := validated.rawMeta
rawComponents := validated.rawComp
seqno := validated.seqno
unhealthyReason := validated.unhealthyReason

// Handle upgrade details for agents using the new 8.11 upgrade details field of the checkin.
// Older agents will communicate any issues with upgrades via the Ack endpoint.
Expand Down Expand Up @@ -302,7 +307,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
defer longPoll.Stop()

// Initial update on checkin, and any user fields that might have changed
err = ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, rawMeta, rawComponents, seqno, ver)
err = ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, rawMeta, rawComponents, seqno, ver, unhealthyReason)
if err != nil {
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -356,7 +361,7 @@ func (ct *CheckinT) ProcessRequest(zlog zerolog.Logger, w http.ResponseWriter, r
zlog.Trace().Msg("fire long poll")
break LOOP
case <-tick.C:
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver)
err := ct.bc.CheckIn(agent.Id, string(req.Status), req.Message, nil, rawComponents, nil, ver, unhealthyReason)
if err != nil {
zlog.Error().Err(err).Str(logger.AgentID, agent.Id).Msg("checkin failed")
}
Expand Down Expand Up @@ -917,50 +922,52 @@ func parseMeta(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]
return outMeta, nil
}

func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]byte, error) {
func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinRequest) ([]byte, *[]string, error) {
var unhealthyReason []string

// fallback to other if components don't exist
if agent.UnhealthyReason == nil && (agent.LastCheckinStatus == FailedStatus || agent.LastCheckinStatus == DegradedStatus) {
unhealthyReason = []string{"other"}
} else {
unhealthyReason = agent.UnhealthyReason
}

if req.Components == nil {
return nil, nil
return nil, &unhealthyReason, nil
}

agentComponentsJSON, err := json.Marshal(agent.Components)
if err != nil {
return nil, &unhealthyReason, fmt.Errorf("agent.Components marshal: %w", err)
}

// Quick comparison first; compare the JSON payloads.
// If the data is not consistently normalized, this short-circuit will not work.
if bytes.Equal(*req.Components, agent.Components) {
if bytes.Equal(*req.Components, agentComponentsJSON) {
zlog.Trace().Msg("quick comparing agent components data is equal")
return nil, nil
return nil, &unhealthyReason, nil
}

// Deserialize the request components data
var reqComponents interface{}
var reqComponents []model.ComponentsItems
if len(*req.Components) > 0 {
if err := json.Unmarshal(*req.Components, &reqComponents); err != nil {
return nil, fmt.Errorf("parseComponents request: %w", err)
}
// Validate that components is an array
if _, ok := reqComponents.([]interface{}); !ok {
return nil, errors.New("parseComponets request: components property is not array")
return nil, &unhealthyReason, fmt.Errorf("parseComponents request: %w", err)
}
}

// If empty, don't step on existing data
if reqComponents == nil {
return nil, nil
}

// Deserialize the agent's components copy
var agentComponents interface{}
if len(agent.Components) > 0 {
if err := json.Unmarshal(agent.Components, &agentComponents); err != nil {
return nil, fmt.Errorf("parseComponents local: %w", err)
}
return nil, &unhealthyReason, nil
}

var outComponents []byte

// Compare the deserialized meta structures and return the bytes to update if different
if !reflect.DeepEqual(reqComponents, agentComponents) {
if !reflect.DeepEqual(reqComponents, agent.Components) {

zlog.Trace().
RawJSON("oldComponents", agent.Components).
RawJSON("oldComponents", agentComponentsJSON).
RawJSON("newComponents", *req.Components).
Msg("local components data is not equal")

Expand All @@ -969,9 +976,48 @@ func parseComponents(zlog zerolog.Logger, agent *model.Agent, req *CheckinReques
Msg("applying new components data")

outComponents = *req.Components
compUnhealthyReason := calcUnhealthyReason(reqComponents)
if len(compUnhealthyReason) > 0 {
unhealthyReason = compUnhealthyReason
}
}

zlog.Debug().Any("unhealthy_reason", unhealthyReason).Msg("unhealthy reason")

return outComponents, &unhealthyReason, nil
}

func calcUnhealthyReason(reqComponents []model.ComponentsItems) []string {
var unhealthyReason []string
hasUnhealthyInput := false
hasUnhealthyOutput := false
hasUnhealthyComponent := false
for _, component := range reqComponents {
if component.Status == FailedStatus || component.Status == DegradedStatus {
hasUnhealthyComponent = true
for _, unit := range component.Units {
if unit.Status == FailedStatus || unit.Status == DegradedStatus {
if unit.Type == "input" {
hasUnhealthyInput = true
} else if unit.Type == "output" {
hasUnhealthyOutput = true
}
}
}
}
}
unhealthyReason = make([]string, 0)
if hasUnhealthyInput {
unhealthyReason = append(unhealthyReason, "input")
}
if hasUnhealthyOutput {
unhealthyReason = append(unhealthyReason, "output")
}
if !hasUnhealthyInput && !hasUnhealthyOutput && hasUnhealthyComponent {
unhealthyReason = append(unhealthyReason, "other")
}

return outComponents, nil
return unhealthyReason
}

func calcPollDuration(zlog zerolog.Logger, pollDuration, setupDuration, jitterDuration time.Duration) (time.Duration, time.Duration) {
Expand Down
Loading

0 comments on commit 1572693

Please sign in to comment.