Skip to content

Commit

Permalink
fix: service lifecycle issues
Browse files Browse the repository at this point in the history
The core change is moving the context out of the `ServiceRunner` struct
to be a local variable, and using a channel to notify about shutdown
events.

Add more synchronization between Run and the moment service started to
avoid mis-identifying not running (yet) service as successfully finished.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
Co-authored-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
  • Loading branch information
smira and DmitriyMV committed Mar 19, 2024
1 parent ead37ab commit 89fc68b
Show file tree
Hide file tree
Showing 10 changed files with 1,693 additions and 1,536 deletions.
1 change: 1 addition & 0 deletions api/machine/machine.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ message ServiceStateEvent {
FINISHED = 5;
FAILED = 6;
SKIPPED = 7;
STARTING = 8;
}
Action action = 2;
string message = 3;
Expand Down
3 changes: 3 additions & 0 deletions internal/app/machined/pkg/system/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ const (
StateFinished
StateFailed
StateSkipped
StateStarting
)

func (state ServiceState) String() string {
switch state {
case StateInitialized:
return "Initialized"
case StateStarting:
return "Starting"
case StatePreparing:
return "Preparing"
case StateWaiting:
Expand Down
18 changes: 18 additions & 0 deletions internal/app/machined/pkg/system/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package system

import (
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/pkg/conditions"
)

func NewServices(runtime runtime.Runtime) *singleton { //nolint:revive
return newServices(runtime)
}

func WaitForServiceWithInstance(instance *singleton, event StateEvent, service string) conditions.Condition {
return waitForService(instance, event, service)
}
93 changes: 93 additions & 0 deletions internal/app/machined/pkg/system/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package system_test

import (
"context"
"io"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/internal/app/machined/pkg/system"
"github.com/siderolabs/talos/internal/app/machined/pkg/system/events"
"github.com/siderolabs/talos/internal/app/machined/pkg/system/runner"
"github.com/siderolabs/talos/internal/app/machined/pkg/system/runner/goroutine"
"github.com/siderolabs/talos/pkg/conditions"
)

type TestCondition struct{}

func (TestCondition) String() string {
return "test-condition"
}

func (TestCondition) Wait(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(10 * time.Millisecond):
return nil
}
}

type TestService struct{}

func (TestService) ID(runtime.Runtime) string {
return "test-service"
}

func (TestService) PreFunc(ctx context.Context, r runtime.Runtime) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
return nil
}
}

func (TestService) Runner(r runtime.Runtime) (runner.Runner, error) {
return goroutine.NewRunner(r, "test-service", func(ctx context.Context, r runtime.Runtime, logOutput io.Writer) error {
<-ctx.Done()

return nil
}), nil
}

func (TestService) PostFunc(runtime.Runtime, events.ServiceState) error {
return nil
}

func (TestService) Condition(runtime.Runtime) conditions.Condition {
return TestCondition{}
}

func (TestService) DependsOn(runtime.Runtime) []string {
return nil
}

func TestRestartService(t *testing.T) {
deadline, ok := t.Deadline()
if !ok {
deadline = time.Now().Add(15 * time.Second)
}

ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

services := system.NewServices(nil)

services.Load(TestService{})

for i := 0; i < 100; i++ {
require.NoError(t, services.Start("test-service"))

require.NoError(t, system.WaitForServiceWithInstance(services, system.StateEventUp, "test-service").Wait(ctx))

require.NoError(t, services.Stop(ctx, "test-service"))
}
}
22 changes: 14 additions & 8 deletions internal/app/machined/pkg/system/service_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ const (
type serviceCondition struct {
mu sync.Mutex
waitingRegister bool
instance *singleton

event StateEvent
service string
}

func (sc *serviceCondition) Wait(ctx context.Context) error {
instance.mu.Lock()
svcrunner := instance.state[sc.service]
instance.mu.Unlock()
sc.instance.mu.Lock()
svcrunner := sc.instance.state[sc.service]
sc.instance.mu.Unlock()

if svcrunner == nil {
return sc.waitRegister(ctx)
Expand Down Expand Up @@ -68,9 +69,9 @@ func (sc *serviceCondition) waitRegister(ctx context.Context) error {
var svcrunner *ServiceRunner

for {
instance.mu.Lock()
svcrunner = instance.state[sc.service]
instance.mu.Unlock()
sc.instance.mu.Lock()
svcrunner = sc.instance.state[sc.service]
sc.instance.mu.Unlock()

if svcrunner != nil {
break
Expand Down Expand Up @@ -103,8 +104,13 @@ func (sc *serviceCondition) String() string {

// WaitForService waits for service to reach some state event.
func WaitForService(event StateEvent, service string) conditions.Condition {
return waitForService(instance, event, service)
}

func waitForService(instance *singleton, event StateEvent, service string) conditions.Condition {
return &serviceCondition{
event: event,
service: service,
instance: instance,
event: event,
service: service,
}
}
59 changes: 31 additions & 28 deletions internal/app/machined/pkg/system/service_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ var WaitConditionCheckInterval = time.Second
type ServiceRunner struct {
mu sync.Mutex

runtime runtime.Runtime
service Service
id string
runtime runtime.Runtime
service Service
id string
instance *singleton

state events.ServiceState
events events.ServiceEvents
Expand All @@ -44,23 +45,19 @@ type ServiceRunner struct {

stateSubscribers map[StateEvent][]chan<- struct{}

ctxMu sync.Mutex
ctx context.Context //nolint:containedctx
ctxCancel context.CancelFunc
stopCh chan struct{}
}

// NewServiceRunner creates new ServiceRunner around Service instance.
func NewServiceRunner(service Service, runtime runtime.Runtime) *ServiceRunner {
ctx, ctxCancel := context.WithCancel(context.Background())

func NewServiceRunner(instance *singleton, service Service, runtime runtime.Runtime) *ServiceRunner {
return &ServiceRunner{
service: service,
instance: instance,
runtime: runtime,
id: service.ID(runtime),
state: events.StateInitialized,
stateSubscribers: make(map[StateEvent][]chan<- struct{}),
ctx: ctx,
ctxCancel: ctxCancel,
stopCh: make(chan struct{}, 1),
}
}

Expand Down Expand Up @@ -192,23 +189,32 @@ var ErrSkip = errors.New("service skipped")
// Run returns an error when a service stops.
//
// Run should be run in a goroutine.
func (svcrunner *ServiceRunner) Run() error {
defer func() {
// reset context for the next run
svcrunner.ctxMu.Lock()
svcrunner.ctx, svcrunner.ctxCancel = context.WithCancel(context.Background())
svcrunner.ctxMu.Unlock()
//
//nolint:gocyclo
func (svcrunner *ServiceRunner) Run(notifyChannels ...chan<- struct{}) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
select {
case <-ctx.Done():
return
case <-svcrunner.stopCh:
cancel()
}
}()

svcrunner.ctxMu.Lock()
ctx := svcrunner.ctx
svcrunner.ctxMu.Unlock()
svcrunner.UpdateState(ctx, events.StateStarting, "Starting service")

for _, notifyCh := range notifyChannels {
close(notifyCh)
}

condition := svcrunner.service.Condition(svcrunner.runtime)

dependencies := svcrunner.service.DependsOn(svcrunner.runtime)
if len(dependencies) > 0 {
serviceConditions := xslices.Map(dependencies, func(dep string) conditions.Condition { return WaitForService(StateEventUp, dep) })
serviceConditions := xslices.Map(dependencies, func(dep string) conditions.Condition { return waitForService(instance, StateEventUp, dep) })
serviceDependencies := conditions.WaitForAll(serviceConditions...)

if condition != nil {
Expand Down Expand Up @@ -318,10 +324,6 @@ func (svcrunner *ServiceRunner) run(ctx context.Context, runnr runner.Runner) er
}()
}

// when service run finishes, cancel context, this is important if service
// terminates on its own before being terminated by Stop()
defer svcrunner.ctxCancel()

select {
case <-ctx.Done():
err := runnr.Stop()
Expand All @@ -344,9 +346,10 @@ func (svcrunner *ServiceRunner) run(ctx context.Context, runnr runner.Runner) er
//
// Shutdown completes when Start() returns.
func (svcrunner *ServiceRunner) Shutdown() {
svcrunner.ctxMu.Lock()
defer svcrunner.ctxMu.Unlock()
svcrunner.ctxCancel()
select {
case svcrunner.stopCh <- struct{}{}:
default:
}
}

// AsProto returns protobuf struct with the state of the service runner.
Expand Down
Loading

0 comments on commit 89fc68b

Please sign in to comment.