Skip to content
This repository has been archived by the owner on Dec 21, 2023. It is now read-only.

Commit

Permalink
feat: Wait for all event handlers to complete before exiting `control…
Browse files Browse the repository at this point in the history
…Plane.Register()` (#496)

* feat: Provide Shutdown method to run logic that should be executed right before application shutdown

Signed-off-by: Florian Bacher <florian.bacher@dynatrace.com>
  • Loading branch information
bacherfl authored Jul 8, 2022
1 parent d00898a commit d9a621b
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 39 deletions.
6 changes: 5 additions & 1 deletion examples/cp-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
// If you don't want/need that, you can simply pass nil
logForwarder := logforwarder.New(keptnAPI.LogsV1())

// Create a control plane component that is the main component of cp-connecter and start it
// Create a control plane component that is the main component of cp-connector and start it
// using RunWithGraceFulShutdown
controlPlane := controlplane.New(subscriptionSource, eventSource, logForwarder, controlplane.WithLogger(logger))
if err := controlplane.RunWithGracefulShutdown(controlPlane, LocalService{}, time.Second*10); err != nil {
Expand All @@ -65,6 +65,10 @@ type LocalService struct{}
//
// Note, that you are responsible for sending corresponding .started and .finished events
// on your own.
// Also note, that if you need to ensure that every incoming event is completely processed before the pod running your
// integration is shut down (e.g., due to an upgrade to a newer version), the OnEvent method should process the incoming events synchronously,
// i.e. not in a separate go routine. If you need to process events asynchronously, you need to implement your own synchronization mechanism to ensure all
// events have been completely processed before a shutdown
func (e LocalService) OnEvent(ctx context.Context, event models.KeptnContextExtendedCE) error {
// You can grab handle the event and grab a sender to send back started / finished events to keptn
// eventSender := ctx.Value(controlplane.EventSenderKeyType{}).(types.EventSender)
Expand Down
79 changes: 48 additions & 31 deletions pkg/sdk/connector/controlplane/controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,15 @@ type Integration interface {

// ControlPlane can be used to connect to the Keptn Control Plane
type ControlPlane struct {
subscriptionSource subscriptionsource.SubscriptionSource
eventSource eventsource.EventSource
currentSubscriptions []models.EventSubscription
logger logger.Logger
registered bool
integrationID string
logForwarder logforwarder.LogForwarder
mtx *sync.RWMutex
subscriptionSource subscriptionsource.SubscriptionSource
eventSource eventsource.EventSource
currentSubscriptions []models.EventSubscription
logger logger.Logger
registered bool
integrationID string
logForwarder logforwarder.LogForwarder
mtx *sync.RWMutex
eventHandlerWaitGroup *sync.WaitGroup
}

// WithLogger sets the logger to use
Expand All @@ -61,7 +62,7 @@ func WithLogger(logger logger.Logger) func(plane *ControlPlane) {
//
// This call is blocking.
//
//If you want to start the controlplane component with an own context you need to call the Regiser(ctx,integration)
//If you want to start the controlPlane component with an own context you need to call the Register(ctx,integration)
// method on your own
func RunWithGracefulShutdown(controlPlane *ControlPlane, integration Integration, shutdownTimeout time.Duration) error {
ctxShutdown, cancel := context.WithCancel(context.Background())
Expand All @@ -76,6 +77,7 @@ func RunWithGracefulShutdown(controlPlane *ControlPlane, integration Integration
}()

return controlPlane.Register(ctxShutdown, integration)

}

// New creates a new ControlPlane
Expand All @@ -84,13 +86,14 @@ func RunWithGracefulShutdown(controlPlane *ControlPlane, integration Integration
// and a LogForwarder to forward error logs
func New(subscriptionSource subscriptionsource.SubscriptionSource, eventSource eventsource.EventSource, logForwarder logforwarder.LogForwarder, opts ...func(plane *ControlPlane)) *ControlPlane {
cp := &ControlPlane{
subscriptionSource: subscriptionSource,
eventSource: eventSource,
currentSubscriptions: []models.EventSubscription{},
logger: logger.NewDefaultLogger(),
logForwarder: logForwarder,
registered: false,
mtx: &sync.RWMutex{},
subscriptionSource: subscriptionSource,
eventSource: eventSource,
currentSubscriptions: []models.EventSubscription{},
logger: logger.NewDefaultLogger(),
logForwarder: logForwarder,
registered: false,
mtx: &sync.RWMutex{},
eventHandlerWaitGroup: &sync.WaitGroup{},
}
for _, o := range opts {
o(cp)
Expand Down Expand Up @@ -148,30 +151,50 @@ func (cp *ControlPlane) Register(ctx context.Context, integration Integration) e

// control plane cancelled via context
case <-ctx.Done():
cp.logger.Debug("Controlplane cancelled via context. Unregistering...")
cp.logger.Info("ControlPlane cancelled via context. Unregistering...")
wg.Wait()
cp.waitForEventHandlers()
cp.cleanup()
cp.setRegistrationStatus(false)
return nil

// control plane cancelled via error in either one of the sub components
case e := <-errC:
cp.logger.Debugf("Stopping control plane due to error: %v", e)
cp.cleanup()
cp.logger.Debug("Waiting for components to shutdown")
cp.logger.Errorf("Stopping control plane due to error: %v", e)
cp.logger.Info("Waiting for components to shutdown")
wg.Wait()
cp.waitForEventHandlers()
cp.cleanup()
cp.setRegistrationStatus(false)
return nil
}
}
}

func (cp *ControlPlane) waitForEventHandlers() {
cp.logger.Info("Wait for all event handlers to finish")
cp.eventHandlerWaitGroup.Wait()
cp.logger.Info("All event handlers done - ready to shut down")
}

// IsRegistered can be called to detect whether the controlPlane is registered and ready to receive events
func (cp *ControlPlane) IsRegistered() bool {
cp.mtx.RLock()
defer cp.mtx.RUnlock()
return cp.registered
}

func (cp *ControlPlane) cleanup() {
cp.logger.Info("Stopping subscription source...")
if err := cp.subscriptionSource.Stop(); err != nil {
log.Fatalf("Unable to stop subscription source: %v", err)
}
cp.logger.Info("Stopping event source...")
if err := cp.eventSource.Stop(); err != nil {
log.Fatalf("Unable to stop event source: %v", err)
}
}

func (cp *ControlPlane) handle(ctx context.Context, eventUpdate types.EventUpdate, integration Integration) error {
cp.logger.Debugf("Received an event of type: %s", *eventUpdate.KeptnEvent.Type)
for _, subscription := range cp.currentSubscriptions {
Expand Down Expand Up @@ -204,6 +227,11 @@ func (cp *ControlPlane) getSender(sender types.EventSender) types.EventSender {
}

func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate types.EventUpdate, integration Integration, subscription models.EventSubscription) error {
// increase the eventHandler WaitGroup
cp.eventHandlerWaitGroup.Add(1)
// when the event handler is done, decrease the WaitGroup again
defer cp.eventHandlerWaitGroup.Done()

err := eventUpdate.KeptnEvent.AddTemporaryData(
tmpDataDistributorKey,
types.AdditionalSubscriptionData{
Expand All @@ -226,17 +254,6 @@ func (cp *ControlPlane) forwardMatchedEvent(ctx context.Context, eventUpdate typ
return nil
}

func (cp *ControlPlane) cleanup() {
cp.logger.Info("Stopping subscription source...")
if err := cp.subscriptionSource.Stop(); err != nil {
log.Fatalf("Unable to stop subscription source: %v", err)
}
cp.logger.Info("Stopping event source...")
if err := cp.eventSource.Stop(); err != nil {
log.Fatalf("Unable to stop event source: %v", err)
}
}

func (cp *ControlPlane) setRegistrationStatus(registered bool) {
cp.mtx.Lock()
defer cp.mtx.Unlock()
Expand Down
35 changes: 34 additions & 1 deletion pkg/sdk/connector/controlplane/controlplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ func TestControlPlaneInboundEventIsForwardedToIntegration(t *testing.T) {
var eventChan chan types.EventUpdate
var subsChan chan []models.EventSubscription
var integrationReceivedEvent models.KeptnContextExtendedCE
var subscriptionSourceStopCalled bool
var eventSourceStopCalled bool

mtx := sync.RWMutex{}
eventUpdate := types.EventUpdate{KeptnEvent: models.KeptnContextExtendedCE{ID: "some-id", Type: strutils.Stringp("sh.keptn.event.echo.triggered")}, MetaData: types.EventUpdateMetaData{Subject: "sh.keptn.event.echo.triggered"}}
Expand All @@ -119,21 +121,35 @@ func TestControlPlaneInboundEventIsForwardedToIntegration(t *testing.T) {
mtx.Lock()
defer mtx.Unlock()
subsChan = c
wg.Done()
return nil
},
RegisterFn: func(integration models.Integration) (string, error) {
return "some-id", nil
},
StopFn: func() error {
mtx.Lock()
defer mtx.Unlock()
subscriptionSourceStopCalled = true
return nil
},
}
esm := &fake.EventSourceMock{
StartFn: func(ctx context.Context, data types.RegistrationData, ces chan types.EventUpdate, errC chan error, wg *sync.WaitGroup) error {
mtx.Lock()
defer mtx.Unlock()
eventChan = ces
wg.Done()
return nil
},
OnSubscriptionUpdateFn: func(strings []models.EventSubscription) {},
SenderFn: func() types.EventSender { return callBackSender },
StopFn: func() error {
mtx.Lock()
defer mtx.Unlock()
eventSourceStopCalled = true
return nil
},
}
fm := &LogForwarderMock{
ForwardFn: func(keptnEvent models.KeptnContextExtendedCE, integrationID string) error {
Expand All @@ -152,7 +168,8 @@ func TestControlPlaneInboundEventIsForwardedToIntegration(t *testing.T) {
return nil
},
}
go controlPlane.Register(context.TODO(), integration)
ctx, cancel := context.WithCancel(context.TODO())
go controlPlane.Register(ctx, integration)
require.Eventually(t, func() bool {
mtx.RLock()
defer mtx.RUnlock()
Expand Down Expand Up @@ -185,6 +202,14 @@ func TestControlPlaneInboundEventIsForwardedToIntegration(t *testing.T) {
},
},
}, eventData)

cancel()

require.Eventually(t, func() bool {
mtx.RLock()
defer mtx.RUnlock()
return subscriptionSourceStopCalled && eventSourceStopCalled
}, 5*time.Second, 100*time.Millisecond)
}

func TestControlPlaneInboundEventIsForwardedToIntegrationWithoutLogForwarder(t *testing.T) {
Expand Down Expand Up @@ -527,6 +552,9 @@ func TestControlPlane_IsRegistered(t *testing.T) {
RegisterFn: func(integration models.Integration) (string, error) {
return "some-id", nil
},
StopFn: func() error {
return nil
},
}
esm := &fake.EventSourceMock{
StartFn: func(ctx context.Context, data types.RegistrationData, ces chan types.EventUpdate, errC chan error, wg *sync.WaitGroup) error {
Expand All @@ -541,6 +569,9 @@ func TestControlPlane_IsRegistered(t *testing.T) {
},
OnSubscriptionUpdateFn: func(subscriptions []models.EventSubscription) {},
SenderFn: func() types.EventSender { return callBackSender },
StopFn: func() error {
return nil
},
}
fm := &LogForwarderMock{
ForwardFn: func(keptnEvent models.KeptnContextExtendedCE, integrationID string) error {
Expand Down Expand Up @@ -598,6 +629,7 @@ func TestControlPlane_StoppedByReceivingErrEvent(t *testing.T) {
defer mtx.Unlock()
subsChan = subC
errorC = errC
wg.Done()
return nil
},
RegisterFn: func(integration models.Integration) (string, error) {
Expand All @@ -616,6 +648,7 @@ func TestControlPlane_StoppedByReceivingErrEvent(t *testing.T) {
defer mtx.Unlock()
eventChan = evC
errorC = errC
wg.Done()
return nil
},
OnSubscriptionUpdateFn: func(subscriptions []models.EventSubscription) {},
Expand Down
17 changes: 13 additions & 4 deletions pkg/sdk/connector/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,25 @@ func (nc *NatsConnector) Publish(event models.KeptnContextExtendedCE) error {
if err != nil {
return fmt.Errorf("could not connect to NATS to publish event: %w", err)
}
return conn.Publish(*event.Type, serializedEvent)
if err := conn.Publish(*event.Type, serializedEvent); err != nil {
return fmt.Errorf("could not publish message to NATS: %w", err)
}
return nil
}

// Disconnect disconnects/closes the connection to NATS
func (nc *NatsConnector) Disconnect() error {
connection, err := nc.ensureConnection()
// if we are already disconnected, there is no need to do anything
if !nc.connection.IsConnected() {
return nil
}
// call the Flush() method to make sure the payload does not stay in the buffer and will get lost if a shutdown happens
nc.logger.Debug("flushing NATS buffer")
err := nc.connection.Flush()
if err != nil {
return fmt.Errorf("could not disconnect from NATS: %w", err)
nc.logger.Errorf("Could not flush connection: %v", err)
}
connection.Close()
nc.connection.Close()
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sdk/connector/nats/nats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ func TestNewFromEnv(t *testing.T) {
require.NotNil(t, sub)
}

func TestConnectFails(t *testing.T) {
func TestNoConnection(t *testing.T) {
nc := nats2.New("nats://something:3456")
require.NotNil(t, nc)
err := nc.Disconnect()
err := nc.Publish(models.KeptnContextExtendedCE{})
require.NotNil(t, err)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sdk/keptn.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func (k *Keptn) Start() error {
// add additional waiting time to ensure the waitGroup has been increased for all events that have been received between receiving SIGTERM and this point
<-time.After(5 * time.Second)
wg.Wait()

return err
}

Expand Down

0 comments on commit d9a621b

Please sign in to comment.