Skip to content

Commit

Permalink
Fix issue where its possible for a component to receive a unit withou…
Browse files Browse the repository at this point in the history
…t a config (#2138)

* Fix issue where checkinExpected channel might have out dated information.

* Run mage fmt.

* Add changelog entry.

* Increase rate lime for failure in test for slow CI runners.

* Cleanups from code review.

* Refactor the design of ensuring an initial expected comes from the observed message.

(cherry picked from commit fefe64f)
  • Loading branch information
blakerouse authored and mergify[bot] committed Jan 23, 2023
1 parent 1ae879a commit c11ebd9
Show file tree
Hide file tree
Showing 12 changed files with 550 additions and 76 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Fix issue where its possible for a component to receive a unit without a config

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
#description:

# Affected component; a word indicating the component this changeset affects.
component:

# PR number; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 2138

# Issue number; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: 2086
5 changes: 3 additions & 2 deletions internal/pkg/agent/control/v1/proto/control_v1.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions internal/pkg/agent/control/v1/proto/control_v1_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions internal/pkg/agent/control/v2/cproto/control_v2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions internal/pkg/agent/control/v2/cproto/control_v2_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 65 additions & 3 deletions pkg/component/fake/component/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"os"
"os/signal"
"strconv"
"syscall"
"time"

Expand Down Expand Up @@ -46,7 +47,7 @@ func main() {
}

func run() error {
logger := zerolog.New(os.Stderr).With().Timestamp().Logger()
logger := zerolog.New(os.Stderr).Level(zerolog.TraceLevel).With().Timestamp().Logger()
ver := client.VersionInfo{
Name: fake,
Version: "1.0",
Expand Down Expand Up @@ -347,7 +348,8 @@ type fakeInput struct {
state client.UnitState
stateMsg string

canceller context.CancelFunc
canceller context.CancelFunc
killerCanceller context.CancelFunc
}

func newFakeInput(logger zerolog.Logger, logLevel client.UnitLogLevel, manager *stateManager, unit *client.Unit, cfg *proto.UnitExpectedConfig) (*fakeInput, error) {
Expand Down Expand Up @@ -399,7 +401,7 @@ func newFakeInput(logger zerolog.Logger, logLevel client.UnitLogLevel, manager *
}
}()
i.canceller = cancel

i.parseConfig(cfg)
return i, nil
}

Expand Down Expand Up @@ -429,6 +431,7 @@ func (f *fakeInput) Update(u *client.Unit) error {
return fmt.Errorf("unit type changed with the same unit ID: %s", config.Type)
}

f.parseConfig(config)
state, stateMsg, err := getStateFromConfig(config)
if err != nil {
return fmt.Errorf("unit config parsing error: %w", err)
Expand All @@ -440,6 +443,65 @@ func (f *fakeInput) Update(u *client.Unit) error {
return nil
}

func (f *fakeInput) parseConfig(config *proto.UnitExpectedConfig) {
// handle a case for killing the component when the pid of the component
// matches the current running PID
cfg := config.Source.AsMap()
killPIDRaw, kill := cfg["kill"]
if kill {
f.maybeKill(killPIDRaw)
}

// handle a case where random killing of the component is enabled
_, killOnInterval := cfg["kill_on_interval"]
f.logger.Trace().Bool("kill_on_interval", killOnInterval).Msg("kill_on_interval config set value")
if killOnInterval {
f.logger.Info().Msg("starting interval killer")
f.runKiller()
} else {
f.logger.Info().Msg("stopping interval killer")
f.stopKiller()
}
}

func (f *fakeInput) maybeKill(pidRaw interface{}) {
if killPID, ok := pidRaw.(string); ok {
if pid, err := strconv.Atoi(killPID); err == nil {
if pid == os.Getpid() {
f.logger.Warn().Msg("killing from config pid")
os.Exit(1)
}
}
}
}

func (f *fakeInput) runKiller() {
if f.killerCanceller != nil {
// already running
return
}
ctx, canceller := context.WithCancel(context.Background())
f.killerCanceller = canceller
go func() {
t := time.NewTimer(500 * time.Millisecond)
defer t.Stop()
select {
case <-ctx.Done():
return
case <-t.C:
f.logger.Warn().Msg("killer performing kill")
os.Exit(1)
}
}()
}

func (f *fakeInput) stopKiller() {
if f.killerCanceller != nil {
f.killerCanceller()
f.killerCanceller = nil
}
}

type stateSetterAction struct {
input *fakeInput
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/component/fake/shipper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"net"
"os"
"path/filepath"
"strings"
)

Expand All @@ -21,6 +22,13 @@ func createListener(path string) (net.Listener, error) {
if _, err := os.Stat(path); !os.IsNotExist(err) {
os.Remove(path)
}
dir := filepath.Dir(path)
if _, err := os.Stat(dir); os.IsNotExist(err) {
err := os.MkdirAll(dir, 0750)
if err != nil {
return nil, err
}
}
lis, err := net.Listen("unix", path)
if err != nil {
return nil, err
Expand Down
8 changes: 2 additions & 6 deletions pkg/component/runtime/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (c *CommandRuntime) Run(ctx context.Context, comm Communicator) error {
sendExpected := c.state.syncExpected(&newComp)
changed := c.state.syncUnits(&newComp)
if sendExpected || c.state.unsettled() {
comm.CheckinExpected(c.state.toCheckinExpected())
comm.CheckinExpected(c.state.toCheckinExpected(), nil)
}
if changed {
c.sendObserved()
Expand All @@ -177,7 +177,7 @@ func (c *CommandRuntime) Run(ctx context.Context, comm Communicator) error {
sendExpected = true
}
if sendExpected {
comm.CheckinExpected(c.state.toCheckinExpected())
comm.CheckinExpected(c.state.toCheckinExpected(), checkin)
}
if changed {
c.sendObserved()
Expand Down Expand Up @@ -331,10 +331,6 @@ func (c *CommandRuntime) start(comm Communicator) error {
c.lastCheckin = time.Time{}
c.missedCheckins = 0

// Ensure there is no pending checkin expected message buffered to avoid sending the new process
// the expected state of the previous process: https://github.com/elastic/beats/issues/34137
comm.ClearPendingCheckinExpected()

proc, err := process.Start(path,
process.WithArgs(args),
process.WithEnv(env),
Expand Down
2 changes: 1 addition & 1 deletion pkg/component/runtime/conn_info_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *mockCommunicator) WriteConnInfo(w io.Writer, services ...client.Service
return nil
}

func (c *mockCommunicator) CheckinExpected(expected *proto.CheckinExpected) {
func (c *mockCommunicator) CheckinExpected(expected *proto.CheckinExpected, observed *proto.CheckinObserved) {
}

func (c *mockCommunicator) ClearPendingCheckinExpected() {
Expand Down
Loading

0 comments on commit c11ebd9

Please sign in to comment.