-
Notifications
You must be signed in to change notification settings - Fork 493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow propagation of errors from Subscriptions channels into Request.… #317
Changes from all commits
f09489e
28e4eac
fabaffb
131dc53
d60098a
25614ea
a501eb0
316d2ec
c8f6320
ae75827
f6ce3eb
848e84a
96db37f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |
"encoding/json" | ||
"fmt" | ||
"reflect" | ||
"time" | ||
|
||
"github.com/graph-gophers/graphql-go/errors" | ||
"github.com/graph-gophers/graphql-go/internal/common" | ||
|
@@ -41,7 +42,7 @@ func ParseSchema(schemaString string, resolver interface{}, opts ...SchemaOpt) ( | |
return nil, err | ||
} | ||
|
||
r, err := resolvable.ApplyResolver(s.schema, resolver) | ||
r, err := resolvable.ApplyResolver(s.schema, resolver, s.prefixRootFunctions) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -64,13 +65,15 @@ type Schema struct { | |
schema *schema.Schema | ||
res *resolvable.Schema | ||
|
||
maxDepth int | ||
maxParallelism int | ||
tracer trace.Tracer | ||
validationTracer trace.ValidationTracer | ||
logger log.Logger | ||
useStringDescriptions bool | ||
disableIntrospection bool | ||
maxDepth int | ||
maxParallelism int | ||
tracer trace.Tracer | ||
validationTracer trace.ValidationTracer | ||
logger log.Logger | ||
useStringDescriptions bool | ||
disableIntrospection bool | ||
prefixRootFunctions bool | ||
subscribeResolverTimeout time.Duration | ||
} | ||
|
||
// SchemaOpt is an option to pass to ParseSchema or MustParseSchema. | ||
|
@@ -100,6 +103,13 @@ func MaxDepth(n int) SchemaOpt { | |
} | ||
} | ||
|
||
// Add the Query, Subscription and Mutation prefixes to the root resolver function when doing reflection from schema to Go code. | ||
func PrefixRootFunctions() SchemaOpt { | ||
return func(s *Schema) { | ||
s.prefixRootFunctions = true | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this change needed in order to propagate subscription errors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need this? |
||
|
||
// MaxParallelism specifies the maximum number of resolvers per request allowed to run in parallel. The default is 10. | ||
func MaxParallelism(n int) SchemaOpt { | ||
return func(s *Schema) { | ||
|
@@ -135,6 +145,15 @@ func DisableIntrospection() SchemaOpt { | |
} | ||
} | ||
|
||
// SubscribeResolverTimeout is an option to control the amount of time | ||
// we allow for a single subscribe message resolver to complete it's job | ||
// before it times out and returns an error to the subscriber. | ||
func SubscribeResolverTimeout(timeout time.Duration) SchemaOpt { | ||
return func(s *Schema) { | ||
s.subscribeResolverTimeout = timeout | ||
} | ||
} | ||
|
||
// Response represents a typical response of a GraphQL server. It may be encoded to JSON directly or | ||
// it may be further processed to a custom response type, for example to include custom error data. | ||
// Errors are intentionally serialized first based on the advice in https://github.com/facebook/graphql/commit/7b40390d48680b15cb93e02d46ac5eb249689876#diff-757cea6edf0288677a9eea4cfc801d87R107 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -225,8 +225,9 @@ func (p *StructPacker) Pack(value interface{}) (reflect.Value, error) { | |
for _, f := range p.fields { | ||
if value, ok := values[f.field.Name.Name]; ok { | ||
packed, err := f.fieldPacker.Pack(value) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unnecessary empty line, Please, remove it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. of course :) I will.. |
||
if err != nil { | ||
return reflect.Value{}, err | ||
return reflect.Value{}, fmt.Errorf("field [%s]: %s", f.field.Name.Name, err) | ||
} | ||
v.Elem().FieldByIndex(f.fieldIndex).Set(packed) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,17 +23,22 @@ type Response struct { | |
func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *query.Operation) <-chan *Response { | ||
var result reflect.Value | ||
var f *fieldToExec | ||
var err *errors.QueryError | ||
var errs []*errors.QueryError | ||
func() { | ||
defer r.handlePanic(ctx) | ||
|
||
sels := selected.ApplyOperation(&r.Request, s, op) | ||
var fields []*fieldToExec | ||
collectFieldsToResolve(sels, s, s.Resolver, &fields, make(map[string]*fieldToExec)) | ||
|
||
if len(r.Errs) > 0 { | ||
errs = r.Errs | ||
return | ||
} | ||
|
||
// TODO: move this check into validation.Validate | ||
if len(fields) != 1 { | ||
err = errors.Errorf("%s", "can subscribe to at most one subscription at a time") | ||
errs = []*errors.QueryError{errors.Errorf("%s", "can subscribe to at most one subscription at a time")} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Under what circumstances can we have more than one error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was tweaked to accomodate line 34 up here.. it can hold multiple errors, so it was easier to pass down a list of errors.. instead of assuming there would only be one in there. |
||
return | ||
} | ||
f = fields[0] | ||
|
@@ -49,21 +54,34 @@ func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *query | |
result = callOut[0] | ||
|
||
if f.field.HasError && !callOut[1].IsNil() { | ||
resolverErr := callOut[1].Interface().(error) | ||
err = errors.Errorf("%s", resolverErr) | ||
err.ResolverError = resolverErr | ||
errIface := callOut[1].Interface() | ||
switch resolverErr := errIface.(type) { | ||
case *errors.QueryError: | ||
errs = []*errors.QueryError{resolverErr} | ||
case error: | ||
err := errors.Errorf("%s", resolverErr) | ||
err.ResolverError = resolverErr | ||
errs = []*errors.QueryError{err} | ||
default: | ||
panic("dead code path") | ||
} | ||
} | ||
}() | ||
|
||
// Handles the case where the locally executed func above panicked | ||
if len(r.Request.Errs) > 0 { | ||
return sendAndReturnClosed(&Response{Errors: r.Request.Errs}) | ||
} | ||
|
||
if f == nil { | ||
return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{err}}) | ||
return sendAndReturnClosed(&Response{Errors: errs}) | ||
} | ||
|
||
if err != nil { | ||
if len(errs) > 0 { | ||
if _, nonNullChild := f.field.Type.(*common.NonNull); nonNullChild { | ||
return sendAndReturnClosed(&Response{Errors: []*errors.QueryError{err}}) | ||
return sendAndReturnClosed(&Response{Errors: errs}) | ||
} | ||
return sendAndReturnClosed(&Response{Data: []byte(fmt.Sprintf(`{"%s":null}`, f.field.Alias)), Errors: []*errors.QueryError{err}}) | ||
return sendAndReturnClosed(&Response{Data: []byte(fmt.Sprintf(`{"%s":null}`, f.field.Alias)), Errors: errs}) | ||
} | ||
|
||
if ctxErr := ctx.Err(); ctxErr != nil { | ||
|
@@ -103,6 +121,17 @@ func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *query | |
return | ||
} | ||
|
||
if subErr, ok := resp.Interface().(errors.SubscriptionError); ok { | ||
if err := subErr.SubscriptionError(); err != nil { | ||
if gqlError, ok := err.(*errors.QueryError); ok { | ||
c <- &Response{Errors: []*errors.QueryError{gqlError}} | ||
} else { | ||
c <- &Response{Errors: []*errors.QueryError{errors.Errorf("%s", err)}} | ||
} | ||
return | ||
} | ||
} | ||
|
||
subR := &Request{ | ||
Request: selected.Request{ | ||
Doc: r.Request.Doc, | ||
|
@@ -115,8 +144,12 @@ func (r *Request) Subscribe(ctx context.Context, s *resolvable.Schema, op *query | |
} | ||
var out bytes.Buffer | ||
func() { | ||
// TODO: configurable timeout | ||
subCtx, cancel := context.WithTimeout(ctx, time.Second) | ||
timeout := r.SubscribeResolverTimeout | ||
if timeout == 0 { | ||
timeout = time.Second | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like this one and I'd be happy to merge it. Not sure how I've missed the hard-coded There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Extracted into it's own PR #418 |
||
|
||
subCtx, cancel := context.WithTimeout(ctx, timeout) | ||
defer cancel() | ||
|
||
// resolve response | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used below with streaming responses.. so the object you stream back can actually transform the response into a proper
error
(GraphQL-style).. otherwise, it was impossible to return an object with anerror
.. you were forced to make thedata
contain someerror
field or whatnot.