Skip to content

Commit

Permalink
fix: leave discovery service later in the reset sequence
Browse files Browse the repository at this point in the history
Fixes #8057

I went back and forth on the way to fix it exactly, and ended up with a
pretty simple version of a fix.

The problem was that discovery service was removing the member at the
initial phase of reset, which actually still requires KubeSpan to be up:

* leaving `etcd` (need to talk to other members)
* stopping pods (might need to talk to Kubernetes API with some CNIs)

Now leaving discovery service happens way later, when network
interactions are no longer required.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira committed Dec 13, 2023
1 parent 0c86ca1 commit 10c59a6
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 55 deletions.
4 changes: 2 additions & 2 deletions .drone.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ local integration_kubespan = Step('e2e-kubespan', target='e2e-qemu', privileged=
WITH_CLUSTER_DISCOVERY: 'true',
WITH_KUBESPAN: 'true',
IMAGE_REGISTRY: local_registry,
WITH_CONFIG_PATCH: '[{"op": "replace", "path": "/cluster/discovery/registries/kubernetes/disabled", "value": false}]', // use Kubernetes discovery backend
});
local integration_default_hostname = Step('e2e-default-hostname', target='e2e-qemu', privileged=true, depends_on=[integration_kubespan], environment={
// regression test: make sure Talos works in maintenance mode when no hostname is set
Expand All @@ -557,9 +556,10 @@ local integration_default_hostname = Step('e2e-default-hostname', target='e2e-qe
DISABLE_DHCP_HOSTNAME: 'true',
});

local integration_qemu_encrypted_vip = Step('e2e-encrypted-vip', target='e2e-qemu', privileged=true, depends_on=[load_artifacts], environment={
local integration_qemu_encrypted_vip = Step('e2e-encrypted-kubespan-vip', target='e2e-qemu', privileged=true, depends_on=[load_artifacts], environment={
WITH_DISK_ENCRYPTION: 'true',
WITH_VIRTUAL_IP: 'true',
WITH_KUBESPAN: 'true',
IMAGE_REGISTRY: local_registry,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func (ctrl *DiscoveryServiceController) Inputs() []controller.Input {
},
{
Namespace: runtime.NamespaceName,
Type: runtime.MachineStatusType,
ID: optional.Some(runtime.MachineStatusID),
Type: runtime.MachineResetSignalType,
ID: optional.Some(runtime.MachineResetSignalID),
Kind: controller.InputWeak,
},
}
Expand Down Expand Up @@ -218,9 +218,9 @@ func (ctrl *DiscoveryServiceController) Run(ctx context.Context, r controller.Ru
return fmt.Errorf("error listing endpoints: %w", err)
}

machineStatus, err := safe.ReaderGet[*runtime.MachineStatus](ctx, r, resource.NewMetadata(runtime.NamespaceName, runtime.MachineStatusType, runtime.MachineStatusID, resource.VersionUndefined))
machineResetSginal, err := safe.ReaderGetByID[*runtime.MachineResetSignal](ctx, r, runtime.MachineResetSignalID)
if err != nil && !state.IsNotFoundError(err) {
return fmt.Errorf("error getting machine status: %w", err)
return fmt.Errorf("error getting machine reset signal: %w", err)
}

if client == nil {
Expand Down Expand Up @@ -257,9 +257,9 @@ func (ctrl *DiscoveryServiceController) Run(ctx context.Context, r controller.Ru

// delete/update local affiliate
//
// if the node enters resetting stage, cleanup the local affiliate
// if the node enters final resetting stage, cleanup the local affiliate
// otherwise, update local affiliate data
if machineStatus != nil && machineStatus.TypedSpec().Stage == runtime.MachineStageResetting {
if machineResetSginal != nil {
client.DeleteLocalAffiliate()
} else {
localData := pbAffiliate(affiliateSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,8 @@ func (suite *DiscoveryServiceSuite) TestReconcile() {
)

// pretend that machine is being reset
machineStatus := runtime.NewMachineStatus()
machineStatus.TypedSpec().Stage = runtime.MachineStageResetting
suite.Require().NoError(suite.state.Create(suite.ctx, machineStatus))
machineResetSignal := runtime.NewMachineResetSignal()
suite.Require().NoError(suite.state.Create(suite.ctx, machineResetSignal))

// client should see the affiliate being deleted
suite.Assert().NoError(retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ func (*Sequencer) Reset(r runtime.Runtime, in runtime.ResetOptions) []runtime.Ph
in.GetGraceful() && (r.Config().Machine().Type() != machine.TypeWorker),
"leave",
LeaveEtcd,
).Append(
"preReset",
SendResetSignal,
).AppendList(
phaseListErrorHandler(logError, stopAllPhaselist(r, withKexec)...),
).Append(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2303,6 +2303,13 @@ func StoreShutdownEmergency(runtime.Sequence, any) (runtime.TaskExecutionFunc, s
}, "storeShutdownEmergency"
}

// SendResetSignal func represents the task to send the final reset signal.
func SendResetSignal(runtime.Sequence, any) (runtime.TaskExecutionFunc, string) {
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) (err error) {
return r.State().V1Alpha2().Resources().Create(ctx, resourceruntime.NewMachineResetSignal())
}, "sendResetSignal"
}

func pauseOnFailure(callback func(runtime.Sequence, any) (runtime.TaskExecutionFunc, string),
timeout time.Duration,
) func(seq runtime.Sequence, data any) (runtime.TaskExecutionFunc, string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func NewState() (*State, error) {
&runtime.KmsgLogConfig{},
&runtime.MaintenanceServiceConfig{},
&runtime.MaintenanceServiceRequest{},
&runtime.MachineResetSignal{},
&runtime.MachineStatus{},
&runtime.MetaKey{},
&runtime.MetaLoaded{},
Expand Down
52 changes: 11 additions & 41 deletions internal/integration/api/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"time"

"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/rtestutils"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/siderolabs/gen/maps"
"github.com/siderolabs/gen/value"
"github.com/siderolabs/gen/xslices"
"github.com/stretchr/testify/assert"

"github.com/siderolabs/talos/internal/integration/base"
"github.com/siderolabs/talos/pkg/machinery/client"
Expand Down Expand Up @@ -262,18 +264,16 @@ func (suite *DiscoverySuite) TestKubeSpanPeers() {
for _, node := range nodes {
nodeCtx := client.WithNode(suite.ctx, node)

peerSpecs := suite.getKubeSpanPeerSpecs(nodeCtx)
suite.Assert().Len(peerSpecs, len(nodes)-1)
rtestutils.AssertLength[*kubespan.PeerSpec](nodeCtx, suite.T(), suite.Client.COSI, len(nodes)-1)
rtestutils.AssertLength[*kubespan.PeerStatus](nodeCtx, suite.T(), suite.Client.COSI, len(nodes)-1)

peerStatuses := suite.getKubeSpanPeerStatuses(nodeCtx)
suite.Assert().Len(peerStatuses, len(nodes)-1)

for _, status := range peerStatuses {
suite.Assert().Equal(kubespan.PeerStateUp, status.TypedSpec().State)
suite.Assert().False(value.IsZero(status.TypedSpec().Endpoint))
suite.Assert().Greater(status.TypedSpec().ReceiveBytes, int64(0))
suite.Assert().Greater(status.TypedSpec().TransmitBytes, int64(0))
}
rtestutils.AssertAll[*kubespan.PeerStatus](nodeCtx, suite.T(), suite.Client.COSI,
func(status *kubespan.PeerStatus, asrt *assert.Assertions) {
asrt.Equal(kubespan.PeerStateUp, status.TypedSpec().State)
asrt.False(value.IsZero(status.TypedSpec().Endpoint))
asrt.Greater(status.TypedSpec().ReceiveBytes, int64(0))
asrt.Greater(status.TypedSpec().TransmitBytes, int64(0))
})
}
}

Expand Down Expand Up @@ -310,36 +310,6 @@ func (suite *DiscoverySuite) getAffiliates(nodeCtx context.Context, namespace re
return result
}

func (suite *DiscoverySuite) getKubeSpanPeerSpecs(nodeCtx context.Context) []*kubespan.PeerSpec {
var result []*kubespan.PeerSpec

items, err := safe.StateListAll[*kubespan.PeerSpec](nodeCtx, suite.Client.COSI)
suite.Require().NoError(err)

it := items.Iterator()

for it.Next() {
result = append(result, it.Value())
}

return result
}

func (suite *DiscoverySuite) getKubeSpanPeerStatuses(nodeCtx context.Context) []*kubespan.PeerStatus {
var result []*kubespan.PeerStatus

items, err := safe.StateListAll[*kubespan.PeerStatus](nodeCtx, suite.Client.COSI)
suite.Require().NoError(err)

it := items.Iterator()

for it.Next() {
result = append(result, it.Value())
}

return result
}

func init() {
allSuites = append(allSuites, new(DiscoverySuite))
}
7 changes: 6 additions & 1 deletion internal/integration/base/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/siderolabs/go-retry/retry"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"

"github.com/siderolabs/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
Expand Down Expand Up @@ -399,7 +400,11 @@ func (apiSuite *APISuite) ClearConnectionRefused(ctx context.Context, nodes ...s
continue
}

if strings.Contains(err.Error(), "connection refused") {
if client.StatusCode(err) == codes.Unavailable {
return retry.ExpectedError(err)
}

if strings.Contains(err.Error(), "connection refused") || strings.Contains(err.Error(), "connection reset by peer") {
return retry.ExpectedError(err)
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/machinery/resources/runtime/deep_copy.generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions pkg/machinery/resources/runtime/machine_reset_signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package runtime

import (
"github.com/cosi-project/runtime/pkg/resource"
"github.com/cosi-project/runtime/pkg/resource/meta"
"github.com/cosi-project/runtime/pkg/resource/protobuf"
"github.com/cosi-project/runtime/pkg/resource/typed"

"github.com/siderolabs/talos/pkg/machinery/proto"
)

// MachineResetSignalType is type of MachineResetSignal resource.
const MachineResetSignalType = resource.Type("MachineResetSignals.runtime.talos.dev")

// MachineResetSignalID is singleton MachineResetSignal resource ID.
const MachineResetSignalID = resource.ID("machine")

// MachineResetSignal resource is created to signal that the machine is going to be reset soon.
//
// This resource is created when all remaining actions are local to the node, and network communication is not required.
type MachineResetSignal = typed.Resource[MachineResetSignalSpec, MachineResetSignalExtension]

// MachineResetSignalSpec describes the spec of MachineResetSignal.
//
//gotagsrewrite:gen
type MachineResetSignalSpec struct{}

// NewMachineResetSignal initializes a MachineResetSignal resource.
func NewMachineResetSignal() *MachineResetSignal {
return typed.NewResource[MachineResetSignalSpec, MachineResetSignalExtension](
resource.NewMetadata(NamespaceName, MachineResetSignalType, MachineResetSignalID, resource.VersionUndefined),
MachineResetSignalSpec{},
)
}

// MachineResetSignalExtension is auxiliary resource data for MachineResetSignal.
type MachineResetSignalExtension struct{}

// ResourceDefinition implements meta.ResourceDefinitionProvider interface.
func (MachineResetSignalExtension) ResourceDefinition() meta.ResourceDefinitionSpec {
return meta.ResourceDefinitionSpec{
Type: MachineResetSignalType,
Aliases: []resource.Type{},
DefaultNamespace: NamespaceName,
}
}

func init() {
proto.RegisterDefaultTypes()

err := protobuf.RegisterDynamic[MachineResetSignalSpec](MachineResetSignalType, &MachineResetSignal{})
if err != nil {
panic(err)
}
}
2 changes: 1 addition & 1 deletion pkg/machinery/resources/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@

package runtime

//go:generate deep-copy -type DevicesStatusSpec -type EventSinkConfigSpec -type KernelModuleSpecSpec -type KernelParamSpecSpec -type KernelParamStatusSpec -type KmsgLogConfigSpec -type MaintenanceServiceConfigSpec -type MaintenanceServiceRequestSpec -type MachineStatusSpec -type MetaKeySpec -type MountStatusSpec -type PlatformMetadataSpec -type SecurityStateSpec -type MetaLoadedSpec -type UniqueMachineTokenSpec -header-file ../../../../hack/boilerplate.txt -o deep_copy.generated.go .
//go:generate deep-copy -type DevicesStatusSpec -type EventSinkConfigSpec -type KernelModuleSpecSpec -type KernelParamSpecSpec -type KernelParamStatusSpec -type KmsgLogConfigSpec -type MaintenanceServiceConfigSpec -type MaintenanceServiceRequestSpec -type MachineResetSignalSpec -type MachineStatusSpec -type MetaKeySpec -type MountStatusSpec -type PlatformMetadataSpec -type SecurityStateSpec -type MetaLoadedSpec -type UniqueMachineTokenSpec -header-file ../../../../hack/boilerplate.txt -o deep_copy.generated.go .
1 change: 1 addition & 0 deletions pkg/machinery/resources/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestRegisterResource(t *testing.T) {
&runtime.KernelParamStatus{},
&runtime.KmsgLogConfig{},
&runtime.MachineStatus{},
&runtime.MachineResetSignal{},
&runtime.MaintenanceServiceConfig{},
&runtime.MaintenanceServiceRequest{},
&runtime.MetaKey{},
Expand Down

0 comments on commit 10c59a6

Please sign in to comment.