Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where its possible for a component to receive a unit without a config #2138

Merged
merged 6 commits into from
Jan 23, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/component/runtime/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (c *CommandRuntime) Run(ctx context.Context, comm Communicator) error {
sendExpected := c.state.syncExpected(&newComp)
changed := c.state.syncUnits(&newComp)
if sendExpected || c.state.unsettled() {
comm.CheckinExpected(c.state.toCheckinExpected())
comm.CheckinExpected(c.state.toCheckinExpected(), nil)
}
if changed {
c.sendObserved()
Expand All @@ -177,7 +177,7 @@ func (c *CommandRuntime) Run(ctx context.Context, comm Communicator) error {
sendExpected = true
}
if sendExpected {
comm.CheckinExpected(c.state.toCheckinExpected())
comm.CheckinExpected(c.state.toCheckinExpected(), checkin)
}
if changed {
c.sendObserved()
Expand Down
2 changes: 1 addition & 1 deletion pkg/component/runtime/conn_info_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (c *mockCommunicator) WriteConnInfo(w io.Writer, services ...client.Service
return nil
}

func (c *mockCommunicator) CheckinExpected(expected *proto.CheckinExpected) {
func (c *mockCommunicator) CheckinExpected(expected *proto.CheckinExpected, observed *proto.CheckinObserved) {
}

func (c *mockCommunicator) ClearPendingCheckinExpected() {
Expand Down
121 changes: 61 additions & 60 deletions pkg/component/runtime/runtime_comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ type Communicator interface {
// to the provided services.
WriteConnInfo(w io.Writer, services ...client.Service) error
// CheckinExpected sends the expected state to the component.
CheckinExpected(expected *proto.CheckinExpected)
//
// observed is the observed message received from the component and what was used to compute the provided
// expected message. In the case that `CheckinExpected` is being called from a configuration change resulting
// in a previously observed message not being present then `nil` should be passed in for observed.
CheckinExpected(expected *proto.CheckinExpected, observed *proto.CheckinObserved)
// CheckinObserved receives the observed state from the component.
CheckinObserved() <-chan *proto.CheckinObserved
}
Expand All @@ -55,6 +59,10 @@ type runtimeComm struct {
checkinExpected chan *proto.CheckinExpected
checkinObserved chan *proto.CheckinObserved

initCheckinObserved *proto.CheckinObserved
initCheckinExpectedCh chan *proto.CheckinExpected
initCheckinObservedMx sync.Mutex

actionsConn bool
actionsDone chan bool
actionsLock sync.RWMutex
Expand Down Expand Up @@ -127,7 +135,7 @@ func (c *runtimeComm) WriteConnInfo(w io.Writer, services ...client.Service) err
return nil
}

func (c *runtimeComm) CheckinExpected(expected *proto.CheckinExpected) {
func (c *runtimeComm) CheckinExpected(expected *proto.CheckinExpected, observed *proto.CheckinObserved) {
if c.agentInfo != nil && c.agentInfo.AgentID() != "" {
expected.AgentInfo = &proto.CheckinAgentInfo{
Id: c.agentInfo.AgentID(),
Expand All @@ -137,6 +145,30 @@ func (c *runtimeComm) CheckinExpected(expected *proto.CheckinExpected) {
} else {
expected.AgentInfo = nil
}

// we need to determine if the communicator is currently in the initial observed message path
// in the case that it is we send the expected state over a different channel
c.initCheckinObservedMx.Lock()
initObserved := c.initCheckinObserved
expectedCh := c.initCheckinExpectedCh
if initObserved != nil {
// the next call to `CheckinExpected` must be from the initial `CheckinObserved` message
if observed != initObserved {
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
// not the initial observed message; we don't send it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any scenario where the initial observed message could be lost or altered before we get here? Unlikely but if it this were possible we'd be stuck unable to configure anything.

Is there a client side timeout on the Checkin request from the component? If there is that would guard against this by giving it a way to make a second request and reset the initCheckinObserved message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the lack of an expected message being sent to the client would cause it to miss subsequent checkins that would also get us out of this theoretical state via the eventual restart of the process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it will result in an eventual restart of the process by the command runtime. So this situation is self healing, but extremely unlikely.

c.initCheckinObservedMx.Unlock()
return
}
// it is the expected from the initial observed message
// clear the initial state
c.initCheckinObserved = nil
c.initCheckinExpectedCh = nil
c.initCheckinObservedMx.Unlock()
expectedCh <- expected
return
}
c.initCheckinObservedMx.Unlock()

// not in the initial observed message path; send it over the standard channel
c.checkinExpected <- expected
}

Expand All @@ -147,49 +179,9 @@ func (c *runtimeComm) CheckinObserved() <-chan *proto.CheckinObserved {
// latestCheckinExpected ensures that the latest expected checkin is used
func (c *runtimeComm) latestCheckinExpected(exp *proto.CheckinExpected) *proto.CheckinExpected {
cmacknz marked this conversation as resolved.
Show resolved Hide resolved
latest := exp
latestByKey := make(map[ComponentUnitKey]*proto.UnitExpected)
if latest != nil {
for _, unit := range latest.Units {
latestByKey[ComponentUnitKey{client.UnitType(unit.Type), unit.Id}] = unit
}
}
for {
select {
case next := <-c.checkinExpected:
// ensure that this message includes data from a previous message
//
// it is possible that this next message did not include the `Config` for the unit because the
// previous message include it already thinking that the component got that unit config
//
// ensure that if that is the case that we copy the config from the previous onto the latest
//
// this really should not happen and this is very defensive in design, but I believe it is better
// to be very defensive to ensure that the component always receive its needed configuration for
// a unit then have a very rare chance that we don't send it
for _, unit := range next.Units {
if unit.Config != nil {
// has a config and its latest, nothing to do
continue
}
prevUnit, ok := latestByKey[ComponentUnitKey{client.UnitType(unit.Type), unit.Id}]
if !ok {
// previous didn't have the unit at all; so nothing to do
continue
}
if prevUnit.Config != nil && prevUnit.ConfigStateIdx == unit.ConfigStateIdx {
// copy the unit from the previous onto the new latest
unit.Config = prevUnit.Config
}
}
if latest != nil && latest.AgentInfo != nil && next.AgentInfo == nil {
// copy the agent info to the new latest
next.AgentInfo = latest.AgentInfo
}
latest = next
latestByKey = make(map[ComponentUnitKey]*proto.UnitExpected, len(latest.Units))
for _, unit := range latest.Units {
latestByKey[ComponentUnitKey{client.UnitType(unit.Type), unit.Id}] = unit
}
case latest = <-c.checkinExpected:
default:
return latest
}
Expand Down Expand Up @@ -221,26 +213,30 @@ func (c *runtimeComm) checkin(server proto.ElasticAgent_CheckinV2Server, init *p
c.checkinLock.Unlock()
}()

waitExp := make(chan bool)
initExp := make(chan *proto.CheckinExpected)
recvDone := make(chan bool)
sendDone := make(chan bool)
go func() {
defer func() {
close(sendDone)
}()
WAIT:
// wait until the goroutine should start listening on the `checkinExpected channel
// see comment below about why this waits until the `waitExp` is closed
for {
select {
case <-checkinDone:
return
case <-recvDone:

// initial startup waits for the first expected message from the dedicated initExp channel
select {
case <-checkinDone:
return
case <-recvDone:
return
case expected := <-initExp:
err := server.Send(expected)
if err != nil {
if reportableErr(err) {
c.logger.Debugf("check-in stream failed to send initial expected state: %s", err)
}
return
case <-waitExp:
break WAIT
}
}

for {
var expected *proto.CheckinExpected
select {
Expand All @@ -263,13 +259,18 @@ func (c *runtimeComm) checkin(server proto.ElasticAgent_CheckinV2Server, init *p
}()

// at this point the client is connected, and it has sent it's first initial checkin
// all other previous messages on `c.checkinExpected` are void. The push onto `c.checkinObserved`
// should result in a new message on `c.checkinExpected` so before that push we want to clear
// the channel as well as prevent the sender channel from reading from `c.checkinExpected` until
// we have sent the message on `c.checkinObserved`.
c.latestCheckinExpected(nil)
// the initial expected message must come before the sender goroutine will send any other
// expected messages. `CheckinExpected` method will also drop any expected messages that do not
// match the observed message to ensure that the expected that we receive is from the initial
// observed state.
c.initCheckinObservedMx.Lock()
c.initCheckinObserved = init
c.initCheckinExpectedCh = initExp
c.latestCheckinExpected(nil) // clears all queued expected messages
c.initCheckinObservedMx.Unlock()

// send the initial message (manager then calls `CheckinExpected` method with the result)
c.checkinObserved <- init
close(waitExp)

go func() {
for {
Expand Down
6 changes: 3 additions & 3 deletions pkg/component/runtime/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (s *ServiceRuntime) stop(ctx context.Context, comm Communicator, lastChecki
if checkedIn {
s.log.Debugf("send stopping state to %s service", name)
s.state.forceExpectedState(client.UnitStateStopping)
comm.CheckinExpected(s.state.toCheckinExpected())
comm.CheckinExpected(s.state.toCheckinExpected(), nil)
} else {
s.log.Debugf("%s service had never checked in, proceed to uninstall", name)
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func (s *ServiceRuntime) processNewComp(newComp component.Component, comm Commun
sendExpected := s.state.syncExpected(&newComp)
changed := s.state.syncUnits(&newComp)
if sendExpected || s.state.unsettled() {
comm.CheckinExpected(s.state.toCheckinExpected())
comm.CheckinExpected(s.state.toCheckinExpected(), nil)
}
if changed {
s.sendObserved()
Expand Down Expand Up @@ -287,7 +287,7 @@ func (s *ServiceRuntime) processCheckin(checkin *proto.CheckinObserved, comm Com
sendExpected = true
}
if sendExpected {
comm.CheckinExpected(s.state.toCheckinExpected())
comm.CheckinExpected(s.state.toCheckinExpected(), checkin)
}
if changed {
s.sendObserved()
Expand Down