Skip to content

Commit

Permalink
fix(kuma-cp): graceful components (#4277)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz authored May 16, 2022
1 parent ba23332 commit 24d2dae
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 4 deletions.
4 changes: 3 additions & 1 deletion app/kuma-dp/cmd/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ var _ = Describe("run", func() {
cmd.SetErr(writer)

// when
By("starting the dataplane manager")
By("starting the Kuma DP")
errCh := make(chan error)
go func() {
defer close(errCh)
Expand Down Expand Up @@ -158,6 +158,8 @@ var _ = Describe("run", func() {

// when
By("signaling the dataplane manager to stop")
// we need to close writer, otherwise Cmd#Wait will never finish.
Expect(writer.Close()).To(Succeed())
cancel()

// then
Expand Down
8 changes: 7 additions & 1 deletion app/kuma-dp/cmd/testdata/coredns-mock.sleep.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,10 @@ then
exit 0
fi

sleep 86400
# Send logs for Cmd#Wait to finish
while true;
do
echo "Log"
sleep 0.1
done

8 changes: 7 additions & 1 deletion app/kuma-dp/cmd/testdata/envoy-mock.sleep.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,10 @@ fi

echo $$ >"${ENVOY_MOCK_PID_FILE}"

sleep 86400
# Send logs for Cmd#Wait to finish
while true;
do
echo "Log"
sleep 0.1
done

14 changes: 14 additions & 0 deletions app/kuma-dp/pkg/dataplane/dnsserver/dnsserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"io"
"os/exec"
"regexp"
"sync"
"text/template"

"github.com/pkg/errors"

command_utils "github.com/kumahq/kuma/app/kuma-dp/pkg/dataplane/command"
kuma_dp "github.com/kumahq/kuma/pkg/config/app/kuma-dp"
"github.com/kumahq/kuma/pkg/core"
"github.com/kumahq/kuma/pkg/core/runtime/component"
"github.com/kumahq/kuma/pkg/util/files"
)

Expand All @@ -23,8 +25,12 @@ var (
type DNSServer struct {
opts *Opts
path string

wg sync.WaitGroup
}

var _ component.GracefulComponent = &DNSServer{}

type Opts struct {
Config kuma_dp.Config
Stdout io.Writer
Expand Down Expand Up @@ -87,6 +93,7 @@ func (s *DNSServer) NeedLeaderElection() bool {
}

func (s *DNSServer) Start(stop <-chan struct{}) error {
s.wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -140,6 +147,9 @@ func (s *DNSServer) Start(stop <-chan struct{}) error {

go func() {
done <- command.Wait()
// Component should only be considered done after CoreDNS exists.
// Otherwise, we may not propagate SIGTERM on time.
s.wg.Done()
}()

select {
Expand All @@ -161,3 +171,7 @@ func (s *DNSServer) Start(stop <-chan struct{}) error {
return err
}
}

func (s *DNSServer) WaitForDone() {
s.wg.Wait()
}
14 changes: 13 additions & 1 deletion app/kuma-dp/pkg/dataplane/envoy/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"time"

envoy_bootstrap_v3 "github.com/envoyproxy/go-control-plane/envoy/config/bootstrap/v3"
Expand Down Expand Up @@ -57,10 +58,12 @@ func New(opts Opts) (*Envoy, error) {
return &Envoy{opts: opts}, nil
}

var _ component.Component = &Envoy{}
var _ component.GracefulComponent = &Envoy{}

type Envoy struct {
opts Opts

wg sync.WaitGroup
}

type EnvoyVersion struct {
Expand All @@ -82,6 +85,8 @@ func lookupEnvoyPath(configuredPath string) (string, error) {
}

func (e *Envoy) Start(stop <-chan struct{}) error {
e.wg.Add(1)

configFile, err := GenerateBootstrapFile(e.opts.Config.DataplaneRuntime, e.opts.BootstrapConfig)
if err != nil {
return err
Expand Down Expand Up @@ -137,6 +142,9 @@ func (e *Envoy) Start(stop <-chan struct{}) error {
done := make(chan error, 1)
go func() {
done <- command.Wait()
// Component should only be considered done after Envoy exists.
// Otherwise, we may not propagate SIGTERM on time.
e.wg.Done()
}()

select {
Expand All @@ -158,6 +166,10 @@ func (e *Envoy) Start(stop <-chan struct{}) error {
}
}

func (e *Envoy) WaitForDone() {
e.wg.Wait()
}

func (e *Envoy) DrainConnections() error {
resp, err := http.Post(fmt.Sprintf("http://127.0.0.1:%d/healthcheck/fail", e.opts.AdminPort), "", nil)
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions pkg/core/runtime/component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ type Component interface {
NeedLeaderElection() bool
}

// GracefulComponent is a component that supports waiting until it's finished.
// It's useful if there is cleanup logic that has to be executed before the process exits
// (i.e. sending SIGTERM signals to subprocesses started by this component).
type GracefulComponent interface {
Component

// WaitForDone blocks until all components are done.
// If a component was not started (i.e. leader components on non-leader CP) it returns immediately.
WaitForDone()
}

// Component of Kuma, i.e. gRPC Server, HTTP server, reconciliation loop.
var _ Component = ComponentFunc(nil)

Expand Down Expand Up @@ -52,6 +63,7 @@ type Manager interface {

// Start starts registered components and blocks until the Stop channel is closed.
// Returns an error if there is an error starting any component.
// If there are any GracefulComponent, it waits until all components are done.
Start(<-chan struct{}) error
}

Expand All @@ -73,12 +85,21 @@ func (cm *manager) Add(c ...Component) error {
return nil
}

func (cm *manager) waitForDone() {
for _, c := range cm.components {
if gc, ok := c.(GracefulComponent); ok {
gc.WaitForDone()
}
}
}

func (cm *manager) Start(stop <-chan struct{}) error {
errCh := make(chan error)

cm.startNonLeaderComponents(stop, errCh)
cm.startLeaderComponents(stop, errCh)

defer cm.waitForDone()
select {
case <-stop:
return nil
Expand Down
13 changes: 13 additions & 0 deletions pkg/plugins/bootstrap/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ type kubeComponentManager struct {
*kubeManagerWrapper
oldLeaderElectionNamespace string
leaderComponents []component.Component
gracefulComponents []component.GracefulComponent
}

var _ component.Manager = &kubeComponentManager{}
Expand Down Expand Up @@ -328,6 +329,8 @@ func (cm *kubeComponentManager) Start(done <-chan struct{}) error {
<-done
}()

defer cm.waitForDone()

eg, ctx := errgroup.WithContext(baseCtx)

eg.Go(func() error {
Expand Down Expand Up @@ -367,6 +370,10 @@ var _ kube_manager.LeaderElectionRunnable = component.ComponentFunc(func(i <-cha

func (k *kubeComponentManager) Add(components ...component.Component) error {
for _, c := range components {
if gc, ok := c.(component.GracefulComponent); ok {
k.gracefulComponents = append(k.gracefulComponents, gc)
}

if c.NeedLeaderElection() {
k.leaderComponents = append(k.leaderComponents, c)
} else if err := k.Manager.Add(&componentRunnableAdaptor{Component: c}); err != nil {
Expand All @@ -376,6 +383,12 @@ func (k *kubeComponentManager) Add(components ...component.Component) error {
return nil
}

func (k *kubeComponentManager) waitForDone() {
for _, gc := range k.gracefulComponents {
gc.WaitForDone()
}
}

// This adaptor is required unless component.Component takes a context as input
type componentRunnableAdaptor struct {
component.Component
Expand Down

0 comments on commit 24d2dae

Please sign in to comment.