Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kuma-cp): graceful components #4277

Merged
merged 2 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion app/kuma-dp/cmd/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var _ = Describe("run", func() {
cmd.SetErr(writer)

// when
By("starting the dataplane manager")
By("starting the Kuma DP")
errCh := make(chan error)
go func() {
defer close(errCh)
Expand Down Expand Up @@ -158,6 +158,8 @@ var _ = Describe("run", func() {

// when
By("signaling the dataplane manager to stop")
// we need to close writer, otherwise Cmd#Wait will never finish.
Expect(writer.Close()).To(Succeed())
cancel()

// then
Expand Down
8 changes: 7 additions & 1 deletion app/kuma-dp/cmd/testdata/coredns-mock.sleep.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,10 @@ then
exit 0
fi

sleep 86400
# Send logs for Cmd#Wait to finish
while true;
do
echo "Log"
sleep 0.1
done

8 changes: 7 additions & 1 deletion app/kuma-dp/cmd/testdata/envoy-mock.sleep.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@ fi

echo $$ >"${ENVOY_MOCK_PID_FILE}"

sleep 86400
# Send logs for Cmd#Wait to finish
while true;
do
echo "Log"
sleep 0.1
done

13 changes: 13 additions & 0 deletions app/kuma-dp/pkg/dataplane/dnsserver/dnsserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
command_utils "github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/command"
kuma_dp "github.com/kumahq/kuma/pkg/config/app/kuma-dp"
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/runtime/component"
"github.com/kumahq/kuma/pkg/util/files"
)

Expand All @@ -23,8 +24,12 @@ var (
type DNSServer struct {
opts *Opts
path string

finalizer component.Finalizer
}

var _ component.GracefulComponent = &DNSServer{}

type Opts struct {
Config kuma_dp.Config
Stdout io.Writer
Expand Down Expand Up @@ -87,6 +92,7 @@ func (s *DNSServer) NeedLeaderElection() bool {
}

func (s *DNSServer) Start(stop <-chan struct{}) error {
s.finalizer.Running()
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -140,6 +146,9 @@ func (s *DNSServer) Start(stop <-chan struct{}) error {

go func() {
done <- command.Wait()
// Component should only be considered done after CoreDNS exists.
// Otherwise, we may not propagate SIGTERM on time.
s.finalizer.Done()
}()

select {
Expand All @@ -161,3 +170,7 @@ func (s *DNSServer) Start(stop <-chan struct{}) error {
return err
}
}

func (s *DNSServer) WaitForDone() {
s.finalizer.WaitForDone()
}
13 changes: 12 additions & 1 deletion app/kuma-dp/pkg/dataplane/envoy/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ func New(opts Opts) (*Envoy, error) {
return &Envoy{opts: opts}, nil
}

var _ component.Component = &Envoy{}
var _ component.GracefulComponent = &Envoy{}

type Envoy struct {
opts Opts

finalizer component.Finalizer
}

type EnvoyVersion struct {
Expand All @@ -82,6 +84,8 @@ func lookupEnvoyPath(configuredPath string) (string, error) {
}

func (e *Envoy) Start(stop <-chan struct{}) error {
e.finalizer.Running()

configFile, err := GenerateBootstrapFile(e.opts.Config.DataplaneRuntime, e.opts.BootstrapConfig)
if err != nil {
return err
Expand Down Expand Up @@ -137,6 +141,9 @@ func (e *Envoy) Start(stop <-chan struct{}) error {
done := make(chan error, 1)
go func() {
done <- command.Wait()
// Component should only be considered done after Envoy exists.
// Otherwise, we may not propagate SIGTERM on time.
e.finalizer.Done()
}()

select {
Expand All @@ -158,6 +165,10 @@ func (e *Envoy) Start(stop <-chan struct{}) error {
}
}

func (e *Envoy) WaitForDone() {
e.finalizer.WaitForDone()
}

func (e *Envoy) DrainConnections() error {
resp, err := http.Post(fmt.Sprintf("http://127.0.0.1:%d/healthcheck/fail", e.opts.AdminPort), "", nil)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions pkg/core/runtime/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ type Component interface {
NeedLeaderElection() bool
}

// GracefulComponent is a component that supports waiting until it's finished.
// It's useful if there is cleanup logic that has to be executed before the process exits
// (i.e. sending SIGTERM signals to subprocesses started by this component).
type GracefulComponent interface {
Component

// WaitForDone blocks until all components are done.
// If a component was not started (i.e. leader components on non-leader CP) it returns immediately.
WaitForDone()
}

// Component of Kuma, i.e. gRPC Server, HTTP server, reconciliation loop.
var _ Component = ComponentFunc(nil)

Expand Down Expand Up @@ -52,6 +63,7 @@ type Manager interface {

// Start starts registered components and blocks until the Stop channel is closed.
// Returns an error if there is an error starting any component.
// If there are any GracefulComponent, it waits until all components are done.
Start(<-chan struct{}) error
}

Expand All @@ -73,12 +85,21 @@ func (cm *manager) Add(c ...Component) error {
return nil
}

func (cm *manager) waitForDone() {
for _, c := range cm.components {
if gc, ok := c.(GracefulComponent); ok {
gc.WaitForDone()
}
}
}

func (cm *manager) Start(stop <-chan struct{}) error {
errCh := make(chan error)

cm.startNonLeaderComponents(stop, errCh)
cm.startLeaderComponents(stop, errCh)

defer cm.waitForDone()
select {
case <-stop:
return nil
Expand Down
40 changes: 40 additions & 0 deletions pkg/core/runtime/component/finalizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package component

import (
"sync"

"github.com/kumahq/kuma/pkg/util/channels"
)

// Finalizer is a helper for implementing GracefulComponent
type Finalizer struct {
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
finishCh chan struct{}
mutex sync.Mutex
}

func (f *Finalizer) Running() {
f.mutex.Lock()
defer f.mutex.Unlock()
if f.finishCh == nil || channels.IsClosed(f.finishCh) {
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
f.finishCh = make(chan struct{})
}
}

func (f *Finalizer) Done() {
f.mutex.Lock()
defer f.mutex.Unlock()
if f.finishCh != nil && !channels.IsClosed(f.finishCh) {
close(f.finishCh)
}
}

func (f *Finalizer) WaitForDone() {
f.mutex.Lock()
if f.finishCh != nil && !channels.IsClosed(f.finishCh) {
waitCh := f.finishCh
f.mutex.Unlock()
<-waitCh
} else {
f.mutex.Unlock()
}
}
13 changes: 13 additions & 0 deletions pkg/plugins/bootstrap/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ type kubeComponentManager struct {
*kubeManagerWrapper
oldLeaderElectionNamespace string
leaderComponents []component.Component
gracefulComponents []component.GracefulComponent
}

var _ component.Manager = &kubeComponentManager{}
Expand Down Expand Up @@ -328,6 +329,8 @@ func (cm *kubeComponentManager) Start(done <-chan struct{}) error {
<-done
}()

defer cm.waitForDone()

eg, ctx := errgroup.WithContext(baseCtx)

eg.Go(func() error {
Expand Down Expand Up @@ -367,6 +370,10 @@ var _ kube_manager.LeaderElectionRunnable = component.ComponentFunc(func(i <-cha

func (k *kubeComponentManager) Add(components ...component.Component) error {
for _, c := range components {
if gc, ok := c.(component.GracefulComponent); ok {
k.gracefulComponents = append(k.gracefulComponents, gc)
}

if c.NeedLeaderElection() {
k.leaderComponents = append(k.leaderComponents, c)
} else if err := k.Manager.Add(&componentRunnableAdaptor{Component: c}); err != nil {
Expand All @@ -376,6 +383,12 @@ func (k *kubeComponentManager) Add(components ...component.Component) error {
return nil
}

func (k *kubeComponentManager) waitForDone() {
for _, gc := range k.gracefulComponents {
gc.WaitForDone()
}
}

// This adaptor is required unless component.Component takes a context as input
type componentRunnableAdaptor struct {
component.Component
Expand Down