From 38e64eca60855806c1f3f353a63526801609c3a1 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Tue, 25 Jun 2024 12:43:36 -0700 Subject: [PATCH 1/4] fix: Only register signal handlers if user intends to use them (#1215) closes #1212, and fixes a regression from #989. Previously we would only register signal handlers if the user intended to use them. #989 changed this behavior [here](https://github.com/uber-go/fx/pull/989/files#diff-6c4b6ed7dc8834cef100f50dae61c30ffe7775a3f3f6f5a557517cb740c44a2dR649). This regression meant that if you only used app.Start()/app.Stop(), fx would register signal handlers for no reason as the user didn't use app.Done/app.Wait. --- app.go | 3 ++- app_internal_test.go | 23 +++++++++++++++++++++++ app_test.go | 36 +++++++++++++++++++++++++++++++++++- shutdown_test.go | 1 + signal.go | 2 +- signal_test.go | 10 ++++------ 6 files changed, 66 insertions(+), 9 deletions(-) diff --git a/app.go b/app.go index 8189f9d71..ef3d35a0d 100644 --- a/app.go +++ b/app.go @@ -704,7 +704,6 @@ func (app *App) start(ctx context.Context) error { if err := app.lifecycle.Start(ctx); err != nil { return err } - app.receivers.Start(ctx) return nil }) } @@ -742,6 +741,7 @@ func (app *App) Stop(ctx context.Context) (err error) { // Alternatively, a signal can be broadcast to all done channels manually by // using the Shutdown functionality (see the [Shutdowner] documentation for details). func (app *App) Done() <-chan os.Signal { + app.receivers.Start() // No-op if running return app.receivers.Done() } @@ -752,6 +752,7 @@ func (app *App) Done() <-chan os.Signal { // in the [ShutdownSignal] struct. // Otherwise, the signal that was received will be set. func (app *App) Wait() <-chan ShutdownSignal { + app.receivers.Start() // No-op if running return app.receivers.Wait() } diff --git a/app_internal_test.go b/app_internal_test.go index efdc2f33a..c9b49f8ab 100644 --- a/app_internal_test.go +++ b/app_internal_test.go @@ -21,8 +21,10 @@ package fx import ( + "context" "errors" "fmt" + "os" "sync" "testing" @@ -115,3 +117,24 @@ func TestAnnotationError(t *testing.T) { assert.ErrorIs(t, err, wantErr) assert.Contains(t, err.Error(), wantErr.Error()) } + +// TestStartDoesNotRegisterSignals verifies that signal.Notify is not called +// when a user starts an app. signal.Notify should only be called when the +// .Wait/.Done are called. Note that app.Run calls .Wait() implicitly. +func TestStartDoesNotRegisterSignals(t *testing.T) { + app := New() + calledNotify := false + + // Mock notify function to spy when this is called. + app.receivers.notify = func(c chan<- os.Signal, sig ...os.Signal) { + calledNotify = true + } + app.receivers.stopNotify = func(c chan<- os.Signal) {} + + app.Start(context.Background()) + defer app.Stop(context.Background()) + assert.False(t, calledNotify, "notify should not be called when app starts") + + _ = app.Wait() // User signals intent have fx listen for signals. This should call notify + assert.True(t, calledNotify, "notify should be called after Wait") +} diff --git a/app_test.go b/app_test.go index 0a46d915c..198000cdc 100644 --- a/app_test.go +++ b/app_test.go @@ -2331,7 +2331,9 @@ func TestHookConstructors(t *testing.T) { func TestDone(t *testing.T) { t.Parallel() - done := fxtest.New(t).Done() + app := fxtest.New(t) + defer app.RequireStop() + done := app.Done() require.NotNil(t, done, "Got a nil channel.") select { case sig := <-done: @@ -2340,6 +2342,38 @@ func TestDone(t *testing.T) { } } +// TestShutdownThenWait tests that if we call .Shutdown before waiting, the wait +// will still return the last shutdown signal. +func TestShutdownThenWait(t *testing.T) { + t.Parallel() + + var ( + s Shutdowner + stopped bool + ) + app := fxtest.New( + t, + Populate(&s), + Invoke(func(lc Lifecycle) { + lc.Append(StopHook(func() { + stopped = true + })) + }), + ).RequireStart() + require.NotNil(t, s) + + err := s.Shutdown(ExitCode(1337)) + assert.NoError(t, err) + assert.False(t, stopped) + + shutdownSig := <-app.Wait() + assert.Equal(t, 1337, shutdownSig.ExitCode) + assert.False(t, stopped) + + app.RequireStop() + assert.True(t, stopped) +} + func TestReplaceLogger(t *testing.T) { t.Parallel() diff --git a/shutdown_test.go b/shutdown_test.go index 49956196a..c4971ffc2 100644 --- a/shutdown_test.go +++ b/shutdown_test.go @@ -116,6 +116,7 @@ func TestShutdown(t *testing.T) { ) require.NoError(t, app.Start(context.Background()), "error starting app") + t.Cleanup(func() { app.Stop(context.Background()) }) // in t.Cleanup so this happens after all subtests return (not just this function) defer require.NoError(t, app.Stop(context.Background())) for i := 0; i < 10; i++ { diff --git a/signal.go b/signal.go index 1b8456899..595a847bc 100644 --- a/signal.go +++ b/signal.go @@ -102,7 +102,7 @@ func (recv *signalReceivers) running() bool { return recv.shutdown != nil && recv.finished != nil } -func (recv *signalReceivers) Start(ctx context.Context) { +func (recv *signalReceivers) Start() { recv.m.Lock() defer recv.m.Unlock() diff --git a/signal_test.go b/signal_test.go index 95d6fe458..18d96f479 100644 --- a/signal_test.go +++ b/signal_test.go @@ -74,9 +74,7 @@ func TestSignal(t *testing.T) { t.Parallel() t.Run("timeout", func(t *testing.T) { recv := newSignalReceivers() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - recv.Start(ctx) + recv.Start() timeoutCtx, cancel := context.WithTimeout(context.Background(), 0) defer cancel() err := recv.Stop(timeoutCtx) @@ -86,8 +84,8 @@ func TestSignal(t *testing.T) { recv := newSignalReceivers() ctx, cancel := context.WithCancel(context.Background()) defer cancel() - recv.Start(ctx) - recv.Start(ctx) // should be a no-op if already running + recv.Start() + recv.Start() // should be a no-op if already running require.NoError(t, recv.Stop(ctx)) }) t.Run("notify", func(t *testing.T) { @@ -106,7 +104,7 @@ func TestSignal(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - recv.Start(ctx) + recv.Start() stub <- syscall.SIGTERM stub <- syscall.SIGTERM require.Equal(t, syscall.SIGTERM, <-recv.Done()) From 45af511c27eebb3b9e02abe4a35e1f978ad61bdc Mon Sep 17 00:00:00 2001 From: Jacob Oaks Date: Tue, 25 Jun 2024 16:10:21 -0400 Subject: [PATCH 2/4] Prepare Release v1.22.1 (#1217) Update changelog and `version.go` for a patch release that includes #1215. --- CHANGELOG.md | 7 +++++-- version.go | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e15773d18..08642a59a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,8 +10,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased -- No changes yet. +## [1.22.1](https://github.com/uber-go/fx/compare/v1.22.0...v1.22.1) - 2024-06-25 + +### Fixed +- Fx apps will only listen to signals when `.Run()`, `.Wait()`, or `.Done()` + are called, fixing a regression introduced in v1.19.0. ## [1.22.0](https://github.com/uber-go/fx/compare/v1.21.1...v1.22.0) - 2024-05-30 diff --git a/version.go b/version.go index 8bb2b5b12..9ca711601 100644 --- a/version.go +++ b/version.go @@ -21,4 +21,4 @@ package fx // Version is exported for runtime compatibility checks. -const Version = "1.23.0-dev" +const Version = "1.22.1" From 74d9643901f29e17920d4c4324110deab06affb3 Mon Sep 17 00:00:00 2001 From: Jacob Oaks Date: Tue, 25 Jun 2024 16:21:39 -0400 Subject: [PATCH 3/4] Back to development (#1218) Modify changelog and version to reflect back to development. --- CHANGELOG.md | 3 +++ version.go | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 08642a59a..462877fe4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased +- No changes yet. + ## [1.22.1](https://github.com/uber-go/fx/compare/v1.22.0...v1.22.1) - 2024-06-25 ### Fixed diff --git a/version.go b/version.go index 9ca711601..8bb2b5b12 100644 --- a/version.go +++ b/version.go @@ -21,4 +21,4 @@ package fx // Version is exported for runtime compatibility checks. -const Version = "1.22.1" +const Version = "1.23.0-dev" From 6fde730b36d2518f9ee885f08e734acde77f10b7 Mon Sep 17 00:00:00 2001 From: Jacob Oaks Date: Tue, 2 Jul 2024 10:06:25 -0400 Subject: [PATCH 4/4] Fix deadlock caused by race while signal receivers are stopping (#1220) A user reported a possible deadlock within the signal receivers (#1219). This happens by: * `(*signalReceivers).Stop()` is called, by Shutdowner for instance. * `(*signalReceivers).Stop()` [acquires the lock](https://github.com/uber-go/fx/blob/master/signal.go#L121). * Separately, an OS signal is sent to the program. * There is a chance that `relayer()` is still running at this point if `(*signalReceivers).Stop()` has not yet sent along the `shutdown` channel. * The relayer [attempts to broadcast the signal](https://github.com/uber-go/fx/blob/master/signal.go#L93) received via the `signals` channel. * `Broadcast()` blocks on [trying to acquire the lock](https://github.com/uber-go/fx/blob/master/signal.go#L178). * `(*signalReceivers).Stop()` blocks on [waiting for the `relayer()` to finish](https://github.com/uber-go/fx/blob/master/signal.go#L132) by blocking on the `finished` channel. * Deadlock. Luckily, this is not a hard deadlock, as `Stop` will return if the context times out, but we should still fix it. This PR fixes this deadlock. The idea behind how it does it is based on the observation that the broadcasting logic does not necessarily seem to need to share a mutex with the rest of `signalReceivers`. Specifically, it seems like we can separate protection around the registered `wait` and `done` channels, `last`, and the rest of the fields, since the references to those fields are easily isolated. To avoid overcomplicating `signalReceivers` with multiple locks for different uses, this PR creates a separate `broadcaster` type in charge of keeping track of and broadcasting to `Wait` and `Done` channels. Most of the implementation of `broadcaster` is simply moved over from `signalReceivers`. Having a separate broadcaster type seems actually quite natural, so I opted for this to fix the deadlock. Absolutely open to feedback or taking other routes if folks have thoughts. Since broadcasting is protected separately, this deadlock no longer happens since `relayer()` is free to finish its broadcast and then exit. In addition to running the example provided in the original post to verify, I added a test and ran it before/after this change. Before: ``` $ go test -v -count=10 -run "TestSignal/stop_deadlock" . === RUN TestSignal/stop_deadlock signal_test.go:141: Error Trace: /home/user/go/src/github.com/uber-go/fx/signal_test.go:141 Error: Received unexpected error: context deadline exceeded Test: TestSignal/stop_deadlock ``` (the failure appeared roughly 1/3 of the time) After: ``` $ go test -v -count=100 -run "TestSignal/stop_deadlock" . --- PASS: TestSignal (0.00s) --- PASS: TestSignal/stop_deadlock (0.00s) ``` (no failures appeared) --- broadcast.go | 179 +++++++++++++++++++++++++++++++++++++++++++++++++ shutdown.go | 2 +- signal.go | 130 +++-------------------------------- signal_test.go | 27 +++++++- 4 files changed, 215 insertions(+), 123 deletions(-) create mode 100644 broadcast.go diff --git a/broadcast.go b/broadcast.go new file mode 100644 index 000000000..da2a207ae --- /dev/null +++ b/broadcast.go @@ -0,0 +1,179 @@ +// Copyright (c) 2024 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fx + +import ( + "fmt" + "os" + "sync" +) + +// broadcaster broadcasts signals to registered signal listeners. +// All methods on the broadcaster are concurrency-safe. +type broadcaster struct { + // This lock is used to protect all fields of broadcaster. + // Methods on broadcaster should protect all concurrent access + // by taking this lock when accessing its fields. + // Conversely, this lock should NOT be taken outside of broadcaster. + m sync.Mutex + + // last will contain a pointer to the last ShutdownSignal received, or + // nil if none, if a new channel is created by Wait or Done, this last + // signal will be immediately written to, this allows Wait or Done state + // to be read after application stop + last *ShutdownSignal + + // contains channels created by Done + done []chan os.Signal + + // contains channels created by Wait + wait []chan ShutdownSignal +} + +func (b *broadcaster) reset() { + b.m.Lock() + defer b.m.Unlock() + b.last = nil +} + +// Done creates a new channel that will receive signals being broadcast +// via the broadcaster. +// +// If a signal has been received prior to the call of Done, +// the signal will be sent to the new channel. +func (b *broadcaster) Done() <-chan os.Signal { + b.m.Lock() + defer b.m.Unlock() + + ch := make(chan os.Signal, 1) + // If we had received a signal prior to the call of done, send it's + // os.Signal to the new channel. + // However we still want to have the operating system notify signals to this + // channel should the application receive another. + if b.last != nil { + ch <- b.last.Signal + } + b.done = append(b.done, ch) + return ch +} + +// Wait creates a new channel that will receive signals being broadcast +// via the broadcaster. +// +// If a signal has been received prior to the call of Wait, +// the signal will be sent to the new channel. +func (b *broadcaster) Wait() <-chan ShutdownSignal { + b.m.Lock() + defer b.m.Unlock() + + ch := make(chan ShutdownSignal, 1) + + if b.last != nil { + ch <- *b.last + } + + b.wait = append(b.wait, ch) + return ch +} + +// Broadcast sends the given signal to all channels that have been created +// via Done or Wait. It does not block on sending, and returns an unsentSignalError +// if any send did not go through. +func (b *broadcaster) Broadcast(signal ShutdownSignal) error { + b.m.Lock() + defer b.m.Unlock() + + b.last = &signal + + channels, unsent := b.broadcast( + signal, + b.broadcastDone, + b.broadcastWait, + ) + + if unsent != 0 { + return &unsentSignalError{ + Signal: signal, + Total: channels, + Unsent: unsent, + } + } + + return nil +} + +func (b *broadcaster) broadcast( + signal ShutdownSignal, + anchors ...func(ShutdownSignal) (int, int), +) (int, int) { + var channels, unsent int + + for _, anchor := range anchors { + c, u := anchor(signal) + channels += c + unsent += u + } + + return channels, unsent +} + +func (b *broadcaster) broadcastDone(signal ShutdownSignal) (int, int) { + var unsent int + + for _, reader := range b.done { + select { + case reader <- signal.Signal: + default: + unsent++ + } + } + + return len(b.done), unsent +} + +func (b *broadcaster) broadcastWait(signal ShutdownSignal) (int, int) { + var unsent int + + for _, reader := range b.wait { + select { + case reader <- signal: + default: + unsent++ + } + } + + return len(b.wait), unsent +} + +type unsentSignalError struct { + Signal ShutdownSignal + Unsent int + Total int +} + +func (err *unsentSignalError) Error() string { + return fmt.Sprintf( + "send %v signal: %v/%v channels are blocked", + err.Signal, + err.Unsent, + err.Total, + ) +} diff --git a/shutdown.go b/shutdown.go index b5fda5cbd..525d2c78b 100644 --- a/shutdown.go +++ b/shutdown.go @@ -83,7 +83,7 @@ func (s *shutdowner) Shutdown(opts ...ShutdownOption) error { opt.apply(s) } - return s.app.receivers.Broadcast(ShutdownSignal{ + return s.app.receivers.b.Broadcast(ShutdownSignal{ Signal: _sigTERM, ExitCode: s.exitCode, }) diff --git a/signal.go b/signal.go index 595a847bc..249a35810 100644 --- a/signal.go +++ b/signal.go @@ -49,9 +49,12 @@ func newSignalReceivers() signalReceivers { notify: signal.Notify, stopNotify: signal.Stop, signals: make(chan os.Signal, 1), + b: &broadcaster{}, } } +// signalReceivers listens to OS signals and shutdown signals, +// and relays them to registered listeners when started. type signalReceivers struct { // this mutex protects writes and reads of this struct to prevent // race conditions in a parallel execution pattern @@ -68,17 +71,9 @@ type signalReceivers struct { notify func(c chan<- os.Signal, sig ...os.Signal) stopNotify func(c chan<- os.Signal) - // last will contain a pointer to the last ShutdownSignal received, or - // nil if none, if a new channel is created by Wait or Done, this last - // signal will be immediately written to, this allows Wait or Done state - // to be read after application stop - last *ShutdownSignal - - // contains channels created by Done - done []chan os.Signal - - // contains channels created by Wait - wait []chan ShutdownSignal + // used to register and broadcast to signal listeners + // created via Done and Wait + b *broadcaster } func (recv *signalReceivers) relayer() { @@ -90,7 +85,7 @@ func (recv *signalReceivers) relayer() { case <-recv.shutdown: return case signal := <-recv.signals: - recv.Broadcast(ShutdownSignal{ + recv.b.Broadcast(ShutdownSignal{ Signal: signal, }) } @@ -137,120 +132,15 @@ func (recv *signalReceivers) Stop(ctx context.Context) error { close(recv.finished) recv.shutdown = nil recv.finished = nil - recv.last = nil + recv.b.reset() return nil } } func (recv *signalReceivers) Done() <-chan os.Signal { - recv.m.Lock() - defer recv.m.Unlock() - - ch := make(chan os.Signal, 1) - - // If we had received a signal prior to the call of done, send it's - // os.Signal to the new channel. - // However we still want to have the operating system notify signals to this - // channel should the application receive another. - if recv.last != nil { - ch <- recv.last.Signal - } - - recv.done = append(recv.done, ch) - return ch + return recv.b.Done() } func (recv *signalReceivers) Wait() <-chan ShutdownSignal { - recv.m.Lock() - defer recv.m.Unlock() - - ch := make(chan ShutdownSignal, 1) - - if recv.last != nil { - ch <- *recv.last - } - - recv.wait = append(recv.wait, ch) - return ch -} - -func (recv *signalReceivers) Broadcast(signal ShutdownSignal) error { - recv.m.Lock() - defer recv.m.Unlock() - - recv.last = &signal - - channels, unsent := recv.broadcast( - signal, - recv.broadcastDone, - recv.broadcastWait, - ) - - if unsent != 0 { - return &unsentSignalError{ - Signal: signal, - Total: channels, - Unsent: unsent, - } - } - - return nil -} - -func (recv *signalReceivers) broadcast( - signal ShutdownSignal, - anchors ...func(ShutdownSignal) (int, int), -) (int, int) { - var channels, unsent int - - for _, anchor := range anchors { - c, u := anchor(signal) - channels += c - unsent += u - } - - return channels, unsent -} - -func (recv *signalReceivers) broadcastDone(signal ShutdownSignal) (int, int) { - var unsent int - - for _, reader := range recv.done { - select { - case reader <- signal.Signal: - default: - unsent++ - } - } - - return len(recv.done), unsent -} - -func (recv *signalReceivers) broadcastWait(signal ShutdownSignal) (int, int) { - var unsent int - - for _, reader := range recv.wait { - select { - case reader <- signal: - default: - unsent++ - } - } - - return len(recv.wait), unsent -} - -type unsentSignalError struct { - Signal ShutdownSignal - Unsent int - Total int -} - -func (err *unsentSignalError) Error() string { - return fmt.Sprintf( - "send %v signal: %v/%v channels are blocked", - err.Signal, - err.Unsent, - err.Total, - ) + return recv.b.Wait() } diff --git a/signal_test.go b/signal_test.go index 18d96f479..cd85b54e0 100644 --- a/signal_test.go +++ b/signal_test.go @@ -25,6 +25,7 @@ import ( "os" "syscall" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -56,9 +57,9 @@ func TestSignal(t *testing.T) { Signal: syscall.SIGTERM, } - require.NoError(t, recv.Broadcast(expected), "first broadcast should succeed") + require.NoError(t, recv.b.Broadcast(expected), "first broadcast should succeed") - assertUnsentSignalError(t, recv.Broadcast(expected), &unsentSignalError{ + assertUnsentSignalError(t, recv.b.Broadcast(expected), &unsentSignalError{ Signal: expected, Total: 2, Unsent: 2, @@ -117,4 +118,26 @@ func TestSignal(t *testing.T) { }) }) }) + + t.Run("stop deadlock", func(t *testing.T) { + recv := newSignalReceivers() + + var notify chan<- os.Signal + recv.notify = func(ch chan<- os.Signal, _ ...os.Signal) { + notify = ch + } + recv.Start() + + // Artificially create a race where the relayer receives an OS signal + // while Stop() holds the lock. If this leads to deadlock, + // we will receive a context timeout error. + gotErr := make(chan error, 1) + notify <- syscall.SIGTERM + go func() { + stopCtx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + gotErr <- recv.Stop(stopCtx) + }() + assert.NoError(t, <-gotErr) + }) }