Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel container startup with deferred values #315

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 55 additions & 26 deletions constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.uber.org/dig/internal/digerror"
"go.uber.org/dig/internal/digreflect"
"go.uber.org/dig/internal/dot"
"go.uber.org/dig/internal/promise"
)

// constructorNode is a node in the dependency graph that represents
Expand All @@ -45,12 +46,15 @@ type constructorNode struct {
// id uniquely identifies the constructor that produces a node.
id dot.CtorID

// Whether the constructor owned by this node was already called.
called bool
// State of the underlying constructor function
state functionState

// Type information about constructor parameters.
paramList paramList

// The result of calling the constructor
deferred promise.Deferred

// Type information about constructor results.
resultList resultList

Expand Down Expand Up @@ -123,47 +127,72 @@ func (n *constructorNode) ID() dot.CtorID { return n.id }
func (n *constructorNode) CType() reflect.Type { return n.ctype }
func (n *constructorNode) Order(s *Scope) int { return n.orders[s] }
func (n *constructorNode) OrigScope() *Scope { return n.origS }
func (n *constructorNode) State() functionState { return n.state }

func (n *constructorNode) String() string {
return fmt.Sprintf("deps: %v, ctor: %v", n.paramList, n.ctype)
}

// Call calls this constructor if it hasn't already been called and
// injects any values produced by it into the provided container.
func (n *constructorNode) Call(c containerStore) error {
if n.called {
return nil
// Call calls this constructor if it hasn't already been called and injects any values produced by it into the container
// passed to newConstructorNode.
//
// If constructorNode has a unresolved deferred already in the process of building, it will return that one. If it has
// already been called, it will return an already-resolved deferred. errMissingDependencies is non-fatal; any other
// errors means this node is permanently in an error state.
//
// Don't store the returned pointer; it points into a field that may be reused on non-fatal errors.
func (n *constructorNode) Call(c containerStore) *promise.Deferred {
if n.State() == functionCalled || n.State() == functionOnStack {
return &n.deferred
}

n.state = functionVisited
n.deferred = promise.Deferred{}

if err := shallowCheckDependencies(c, n.paramList); err != nil {
return errMissingDependencies{
n.deferred.Resolve(errMissingDependencies{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably need to return here and in other error cases?
Or rather, not set calling and deferred until after the check,
with the check itself returning promise.Fail.

Func: n.location,
Reason: err,
}
})
return &n.deferred
}

args, err := n.paramList.BuildList(c)
if err != nil {
var args []reflect.Value
var results []reflect.Value

d := n.paramList.BuildList(c, &args)

n.state = functionOnStack

d.Catch(func(err error) error {
return errArgumentsFailed{
Func: n.location,
Reason: err,
}
}

receiver := newStagingContainerWriter()
results := c.invoker()(reflect.ValueOf(n.ctor), args)
if err := n.resultList.ExtractList(receiver, false /* decorating */, results); err != nil {
return errConstructorFailed{Func: n.location, Reason: err}
}

// Commit the result to the original container that this constructor
// was supplied to. The provided constructor is only used for a view of
// the rest of the graph to instantiate the dependencies of this
// container.
receiver.Commit(n.s)
n.called = true
}).Then(func() *promise.Deferred {
return c.scheduler().Schedule(func() {
results = c.invoker()(reflect.ValueOf(n.ctor), args)
})
}).Then(func() *promise.Deferred {
receiver := newStagingContainerWriter()
if err := n.resultList.ExtractList(receiver, false /* decorating */, results); err != nil {
return promise.Fail(errConstructorFailed{Func: n.location, Reason: err})
}

return nil
// Commit the result to the original container that this constructor
// was supplied to. The provided container is only used for a view of
// the rest of the graph to instantiate the dependencies of this
// container.
receiver.Commit(n.s)
n.state = functionCalled
n.deferred.Resolve(nil)
return promise.Done
}).Catch(func(err error) error {
n.state = functionCalled
n.deferred.Resolve(err)
return nil
})
return &n.deferred
}

// stagingContainerWriter is a containerWriter that records the changes that
Expand Down
19 changes: 15 additions & 4 deletions constructor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,21 @@ func TestNodeAlreadyCalled(t *testing.T) {
s := newScope()
n, err := newConstructorNode(f, s, s, constructorOptions{})
require.NoError(t, err, "failed to build node")
require.False(t, n.called, "node must not have been called")
require.False(t, n.State() == functionCalled, "node must not have been called")

c := New()
require.NoError(t, n.Call(c.scope), "invoke failed")
require.True(t, n.called, "node must be called")
require.NoError(t, n.Call(c.scope), "calling again should be okay")
d := n.Call(c.scope)
c.scope.sched.Flush()

ok, err := d.Resolved()
require.True(t, ok, "deferred must be resolved")
require.NoError(t, err, "invoke failed")

require.True(t, n.State() == functionCalled, "node must be called")
d = n.Call(c.scope)
c.scope.sched.Flush()

ok, err = d.Resolved()
require.True(t, ok, "deferred must be resolved")
require.NoError(t, err, "calling again should be okay")
}
33 changes: 33 additions & 0 deletions container.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"reflect"

"go.uber.org/dig/internal/dot"
"go.uber.org/dig/internal/scheduler"
)

const (
Expand Down Expand Up @@ -142,6 +143,9 @@ type containerStore interface {

// Returns invokerFn function to use when calling arguments.
invoker() invokerFn

// Returns the scheduler to use for this scope.
scheduler() scheduler.Scheduler
}

// New constructs a Container.
Expand Down Expand Up @@ -231,6 +235,35 @@ func dryInvoker(fn reflect.Value, _ []reflect.Value) []reflect.Value {
return results
}

type maxConcurrencyOption int

// MaxConcurrency run constructors in this container with the given level of
// concurrency:
//
// - max = 0 or 1: run one constructor at a time (this is the default)
//
// - max > 1: run at most 'max' constructors at a time
//
// - max < 0: run an unlimited number of constructors at a time
//
// Concurrency is limited by how many constructors' dependencies are satisfied at
// once and Go's own allocation of OS threads to Goroutines. This is useful for
// applications that have many slow, independent constructors.
func MaxConcurrency(max int) Option {
return maxConcurrencyOption(max)
}

func (m maxConcurrencyOption) applyOption(container *Container) {
switch {
case m == 0, m == 1:
container.scope.sched = scheduler.Synchronous
case m > 1:
container.scope.sched = scheduler.NewParallel(int(m))
case m < 0:
container.scope.sched = new(scheduler.Unbounded)
}
}

// String representation of the entire Container
func (c *Container) String() string {
return c.scope.String()
Expand Down
88 changes: 62 additions & 26 deletions decorate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,23 @@ import (

"go.uber.org/dig/internal/digreflect"
"go.uber.org/dig/internal/dot"
"go.uber.org/dig/internal/promise"
)

type decoratorState int
type functionState int

const (
decoratorReady decoratorState = iota
decoratorOnStack
decoratorCalled
functionReady functionState = iota
functionVisited // For avoiding cycles
functionOnStack // For telling that this function is already scheduled
functionCalled
)

type decorator interface {
Call(c containerStore) error
Call(c containerStore) *promise.Deferred
ID() dot.CtorID
State() decoratorState
State() functionState
OrigScope() *Scope
}

type decoratorNode struct {
Expand All @@ -53,11 +56,14 @@ type decoratorNode struct {
location *digreflect.Func

// Current state of this decorator
state decoratorState
state functionState

// Parameters of the decorator.
params paramList

// The result of calling the constructor
deferred promise.Deferred

// Results of the decorator.
results resultList

Expand Down Expand Up @@ -96,39 +102,69 @@ func newDecoratorNode(dcor interface{}, s *Scope) (*decoratorNode, error) {
return n, nil
}

func (n *decoratorNode) Call(s containerStore) error {
if n.state == decoratorCalled {
return nil
// Call calls this decorator if it hasn't already been called and injects any values produced by it into the container
// passed to newConstructorNode.
//
// If constructorNode has a unresolved deferred already in the process of building, it will return that one. If it has
// already been successfully called, it will return an already-resolved deferred. Together these mean it will try the
// call again if it failed last time.
//
// On failure, the returned pointer is not guaranteed to stay in a failed state; another call will reset it back to its
// zero value; don't store the returned pointer. (It will still call each observer only once.)
func (n *decoratorNode) Call(s containerStore) *promise.Deferred {
if n.state == functionOnStack || n.state == functionCalled {
return &n.deferred
}

n.state = decoratorOnStack
// We mark it as "visited" to avoid cycles
n.state = functionVisited
n.deferred = promise.Deferred{}

if err := shallowCheckDependencies(s, n.params); err != nil {
return errMissingDependencies{
n.deferred.Resolve(errMissingDependencies{
Func: n.location,
Reason: err,
}
})
}

args, err := n.params.BuildList(n.s)
if err != nil {
return errArgumentsFailed{
Func: n.location,
Reason: err,
var args []reflect.Value
d := n.params.BuildList(s, &args)

n.state = functionOnStack

d.Observe(func(err error) {
if err != nil {
n.state = functionCalled
n.deferred.Resolve(errArgumentsFailed{
Func: n.location,
Reason: err,
})
return
}
}

results := reflect.ValueOf(n.dcor).Call(args)
if err := n.results.ExtractList(n.s, true /* decorated */, results); err != nil {
return err
}
n.state = decoratorCalled
return nil
var results []reflect.Value

s.scheduler().Schedule(func() {
results = s.invoker()(reflect.ValueOf(n.dcor), args)
}).Observe(func(_ error) {
if err := n.results.ExtractList(n.s, true /* decorated */, results); err != nil {
n.deferred.Resolve(err)
return
}

n.state = functionCalled
n.deferred.Resolve(nil)
})
})

return &n.deferred
}

func (n *decoratorNode) ID() dot.CtorID { return n.id }

func (n *decoratorNode) State() decoratorState { return n.state }
func (n *decoratorNode) State() functionState { return n.state }

func (n *decoratorNode) OrigScope() *Scope { return n.s }

// DecorateOption modifies the default behavior of Decorate.
type DecorateOption interface {
Expand Down
Loading