Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/npm_and_yarn/docs/ws-6.2.3
Browse files Browse the repository at this point in the history
  • Loading branch information
sywhang authored Jul 2, 2024
2 parents 4dd03a8 + 6fde730 commit ad7c1ef
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 132 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased
- No changes yet.

## [1.22.1](https://github.com/uber-go/fx/compare/v1.22.0...v1.22.1) - 2024-06-25

### Fixed
- Fx apps will only listen to signals when `.Run()`, `.Wait()`, or `.Done()`
are called, fixing a regression introduced in v1.19.0.

## [1.22.0](https://github.com/uber-go/fx/compare/v1.21.1...v1.22.0) - 2024-05-30

### Added
Expand Down
3 changes: 2 additions & 1 deletion app.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,6 @@ func (app *App) start(ctx context.Context) error {
if err := app.lifecycle.Start(ctx); err != nil {
return err
}
app.receivers.Start(ctx)
return nil
})
}
Expand Down Expand Up @@ -742,6 +741,7 @@ func (app *App) Stop(ctx context.Context) (err error) {
// Alternatively, a signal can be broadcast to all done channels manually by
// using the Shutdown functionality (see the [Shutdowner] documentation for details).
func (app *App) Done() <-chan os.Signal {
app.receivers.Start() // No-op if running
return app.receivers.Done()
}

Expand All @@ -752,6 +752,7 @@ func (app *App) Done() <-chan os.Signal {
// in the [ShutdownSignal] struct.
// Otherwise, the signal that was received will be set.
func (app *App) Wait() <-chan ShutdownSignal {
app.receivers.Start() // No-op if running
return app.receivers.Wait()
}

Expand Down
23 changes: 23 additions & 0 deletions app_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
package fx

import (
"context"
"errors"
"fmt"
"os"
"sync"
"testing"

Expand Down Expand Up @@ -115,3 +117,24 @@ func TestAnnotationError(t *testing.T) {
assert.ErrorIs(t, err, wantErr)
assert.Contains(t, err.Error(), wantErr.Error())
}

// TestStartDoesNotRegisterSignals verifies that signal.Notify is not called
// when a user starts an app. signal.Notify should only be called when the
// .Wait/.Done are called. Note that app.Run calls .Wait() implicitly.
func TestStartDoesNotRegisterSignals(t *testing.T) {
app := New()
calledNotify := false

// Mock notify function to spy when this is called.
app.receivers.notify = func(c chan<- os.Signal, sig ...os.Signal) {
calledNotify = true
}
app.receivers.stopNotify = func(c chan<- os.Signal) {}

app.Start(context.Background())
defer app.Stop(context.Background())
assert.False(t, calledNotify, "notify should not be called when app starts")

_ = app.Wait() // User signals intent have fx listen for signals. This should call notify
assert.True(t, calledNotify, "notify should be called after Wait")
}
36 changes: 35 additions & 1 deletion app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2331,7 +2331,9 @@ func TestHookConstructors(t *testing.T) {
func TestDone(t *testing.T) {
t.Parallel()

done := fxtest.New(t).Done()
app := fxtest.New(t)
defer app.RequireStop()
done := app.Done()
require.NotNil(t, done, "Got a nil channel.")
select {
case sig := <-done:
Expand All @@ -2340,6 +2342,38 @@ func TestDone(t *testing.T) {
}
}

// TestShutdownThenWait tests that if we call .Shutdown before waiting, the wait
// will still return the last shutdown signal.
func TestShutdownThenWait(t *testing.T) {
t.Parallel()

var (
s Shutdowner
stopped bool
)
app := fxtest.New(
t,
Populate(&s),
Invoke(func(lc Lifecycle) {
lc.Append(StopHook(func() {
stopped = true
}))
}),
).RequireStart()
require.NotNil(t, s)

err := s.Shutdown(ExitCode(1337))
assert.NoError(t, err)
assert.False(t, stopped)

shutdownSig := <-app.Wait()
assert.Equal(t, 1337, shutdownSig.ExitCode)
assert.False(t, stopped)

app.RequireStop()
assert.True(t, stopped)
}

func TestReplaceLogger(t *testing.T) {
t.Parallel()

Expand Down
179 changes: 179 additions & 0 deletions broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright (c) 2024 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package fx

import (
"fmt"
"os"
"sync"
)

// broadcaster broadcasts signals to registered signal listeners.
// All methods on the broadcaster are concurrency-safe.
type broadcaster struct {
// This lock is used to protect all fields of broadcaster.
// Methods on broadcaster should protect all concurrent access
// by taking this lock when accessing its fields.
// Conversely, this lock should NOT be taken outside of broadcaster.
m sync.Mutex

// last will contain a pointer to the last ShutdownSignal received, or
// nil if none, if a new channel is created by Wait or Done, this last
// signal will be immediately written to, this allows Wait or Done state
// to be read after application stop
last *ShutdownSignal

// contains channels created by Done
done []chan os.Signal

// contains channels created by Wait
wait []chan ShutdownSignal
}

func (b *broadcaster) reset() {
b.m.Lock()
defer b.m.Unlock()
b.last = nil
}

// Done creates a new channel that will receive signals being broadcast
// via the broadcaster.
//
// If a signal has been received prior to the call of Done,
// the signal will be sent to the new channel.
func (b *broadcaster) Done() <-chan os.Signal {
b.m.Lock()
defer b.m.Unlock()

ch := make(chan os.Signal, 1)
// If we had received a signal prior to the call of done, send it's
// os.Signal to the new channel.
// However we still want to have the operating system notify signals to this
// channel should the application receive another.
if b.last != nil {
ch <- b.last.Signal
}
b.done = append(b.done, ch)
return ch
}

// Wait creates a new channel that will receive signals being broadcast
// via the broadcaster.
//
// If a signal has been received prior to the call of Wait,
// the signal will be sent to the new channel.
func (b *broadcaster) Wait() <-chan ShutdownSignal {
b.m.Lock()
defer b.m.Unlock()

ch := make(chan ShutdownSignal, 1)

if b.last != nil {
ch <- *b.last
}

b.wait = append(b.wait, ch)
return ch
}

// Broadcast sends the given signal to all channels that have been created
// via Done or Wait. It does not block on sending, and returns an unsentSignalError
// if any send did not go through.
func (b *broadcaster) Broadcast(signal ShutdownSignal) error {
b.m.Lock()
defer b.m.Unlock()

b.last = &signal

channels, unsent := b.broadcast(
signal,
b.broadcastDone,
b.broadcastWait,
)

if unsent != 0 {
return &unsentSignalError{
Signal: signal,
Total: channels,
Unsent: unsent,
}
}

return nil
}

func (b *broadcaster) broadcast(
signal ShutdownSignal,
anchors ...func(ShutdownSignal) (int, int),
) (int, int) {
var channels, unsent int

for _, anchor := range anchors {
c, u := anchor(signal)
channels += c
unsent += u
}

return channels, unsent
}

func (b *broadcaster) broadcastDone(signal ShutdownSignal) (int, int) {
var unsent int

for _, reader := range b.done {
select {
case reader <- signal.Signal:
default:
unsent++
}
}

return len(b.done), unsent
}

func (b *broadcaster) broadcastWait(signal ShutdownSignal) (int, int) {
var unsent int

for _, reader := range b.wait {
select {
case reader <- signal:
default:
unsent++
}
}

return len(b.wait), unsent
}

type unsentSignalError struct {
Signal ShutdownSignal
Unsent int
Total int
}

func (err *unsentSignalError) Error() string {
return fmt.Sprintf(
"send %v signal: %v/%v channels are blocked",
err.Signal,
err.Unsent,
err.Total,
)
}
2 changes: 1 addition & 1 deletion shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *shutdowner) Shutdown(opts ...ShutdownOption) error {
opt.apply(s)
}

return s.app.receivers.Broadcast(ShutdownSignal{
return s.app.receivers.b.Broadcast(ShutdownSignal{
Signal: _sigTERM,
ExitCode: s.exitCode,
})
Expand Down
1 change: 1 addition & 0 deletions shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func TestShutdown(t *testing.T) {
)

require.NoError(t, app.Start(context.Background()), "error starting app")
t.Cleanup(func() { app.Stop(context.Background()) }) // in t.Cleanup so this happens after all subtests return (not just this function)
defer require.NoError(t, app.Stop(context.Background()))

for i := 0; i < 10; i++ {
Expand Down
Loading

0 comments on commit ad7c1ef

Please sign in to comment.