Skip to content

Commit

Permalink
status: fix/improve status handling
Browse files Browse the repository at this point in the history
API-layer:

1. If the user sets a status code outside the bounds defined in the `codes`
   package 0-16 (inclusive), set the status code to `codes.Unknown`.  This
   impacts statuses created locally as well as statuses received in RPC
   response trailers.  See grpc/grpc-java#10568 for
   evidence this may be happening in the wild.

Client-side:

1. When receiving a `grpc-status-details-bin` trailer:

   - If there is 1 value and it deserializes into a `google.rpc.Status`, ensure
     the code field matches the `grpc-status` header's code.  If it does not
     match, convert the code to `codes.Internal` and set a message indicating
     the mismatch.  If it does, the status will contain the full details of the
     `grpc-status-details-bin` proto.  (Note that `grpc-message` will not be
     checked against the proto's message field, and will be silently discarded
     if there is a mismatch.)

   - Otherwise, the status returned to the application will use the
     `grpc-status` and `grpc-message` values only.

   - In all cases, the raw `grpc-status-details-bin` trailer will be visible to
     the application via
     `metadata.FromIncomingContext(ctx)["grpc-status-details-bin"]`.

Server-side:

1. If the user manually sets `grpc-status-details-bin` in the trailers:

   - If the status returned by the method handler _does not_ include details
     (see `status.(*Status).WithDetails`), the transport will send the user's
     `grpc-status-details-bin` trailer(s) directly.

   - If the status returned by the method handler _does_ include details, the
     transport will disregard the user's trailer(s) and replace them with a
     `google.rpc.Status` proto version of the returned status.
  • Loading branch information
dfawley committed Sep 25, 2023
1 parent 4ced601 commit d0a7967
Show file tree
Hide file tree
Showing 9 changed files with 282 additions and 34 deletions.
38 changes: 37 additions & 1 deletion internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,45 @@ type Status struct {
s *spb.Status
}

// NewWithProto returns a new status including details from statusProto. This
// is meant to be used by the gRPC library only.
func NewWithProto(code codes.Code, message string, statusProto []string) *Status {
if len(statusProto) != 1 {
// No grpc-status-details bin header, or multiple; just ignore.
return &Status{s: &spb.Status{Code: normalizeCode(code), Message: message}}
}
st := &spb.Status{}
if err := proto.Unmarshal([]byte(statusProto[0]), st); err != nil {
// Probably not a google.rpc.Status proto; do not provide details.
return &Status{s: &spb.Status{Code: normalizeCode(code), Message: message}}
}
if st.Code == int32(code) {
// The codes match between the grpc-status header and the
// grpc-status-details-bin header; use the full details proto.
st.Code = normalizeCode(codes.Code(st.Code))
return &Status{s: st}
}
return &Status{
s: &spb.Status{
Code: int32(codes.Internal),
Message: fmt.Sprintf(
"grpc-status-details-bin mismatch: grpc-status=%v, grpc-message=%q, grpc-status-details-bin=%+v",
code, message, st,
),
},
}
}

func normalizeCode(c codes.Code) int32 {
if c > 16 {
return int32(codes.Unknown)
}
return int32(c)
}

// New returns a Status representing c and msg.
func New(c codes.Code, msg string) *Status {
return &Status{s: &spb.Status{Code: int32(c), Message: msg}}
return &Status{s: &spb.Status{Code: normalizeCode(c), Message: msg}}
}

// Newf returns New(c, fmt.Sprintf(format, a...)).
Expand Down
53 changes: 53 additions & 0 deletions internal/stubserver/stubserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"testing"
"time"

"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
Expand Down Expand Up @@ -110,7 +111,59 @@ func RegisterServiceServerOption(f func(*grpc.Server)) grpc.ServerOption {
return &registerServiceServerOption{f: f}
}

// StartHandlerServer only starts an HTTP server with a gRPC server as the
// handler. It does not create a client to it. Cannot be used in a StubServer
// that also used StartServer.
func (ss *StubServer) StartHandlerServer(sopts ...grpc.ServerOption) error {
if ss.Network == "" {
ss.Network = "tcp"
}
if ss.Address == "" {
ss.Address = "localhost:0"
}
if ss.Target == "" {
ss.R = manual.NewBuilderWithScheme("whatever")
}

lis := ss.Listener
if lis == nil {
var err error
lis, err = net.Listen(ss.Network, ss.Address)
if err != nil {
return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
}
}
ss.Address = lis.Addr().String()

s := grpc.NewServer(sopts...)
for _, so := range sopts {
switch x := so.(type) {
case *registerServiceServerOption:
x.f(s)
}
}

testgrpc.RegisterTestServiceServer(s, ss)

go func() {
hs := &http2.Server{}
opts := &http2.ServeConnOpts{Handler: s}
for {
conn, err := lis.Accept()
if err != nil {
return
}
hs.ServeConn(conn, opts)
}
}()

ss.cleanups = append(ss.cleanups, s.Stop, func() { lis.Close() })
ss.S = s
return nil
}

// StartServer only starts the server. It does not create a client to it.
// Cannot be used in a StubServer that also used StartHandlerServer.
func (ss *StubServer) StartServer(sopts ...grpc.ServerOption) error {
if ss.Network == "" {
ss.Network = "tcp"
Expand Down
11 changes: 7 additions & 4 deletions internal/transport/handler_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,20 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
h.Set("Grpc-Message", encodeGrpcMessage(m))
}

s.hdrMu.Lock()
if p := st.Proto(); p != nil && len(p.Details) > 0 {
delete(s.trailer, grpcStatusDetailsBinHeader)
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
panic(err)
}

h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes))
h.Set(grpcStatusDetailsBinHeader, encodeBinHeader(stBytes))
}

if md := s.Trailer(); len(md) > 0 {
for k, vv := range md {
if len(s.trailer) > 0 {
for k, vv := range s.trailer {
// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
if isReservedHeader(k) {
continue
Expand All @@ -243,6 +245,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}
}
}
s.hdrMu.Unlock()
})

if err == nil { // transport has not been closed
Expand Down Expand Up @@ -287,7 +290,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
}

// writeCustomHeaders sets custom headers set on the stream via SetHeader
// on the first write call (Write, WriteHeader, or WriteStatus).
// on the first write call (Write, WriteHeader, or WriteStatus)
func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
h := ht.rw.Header()

Expand Down
13 changes: 2 additions & 11 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
mdata = make(map[string][]string)
contentTypeErr = "malformed header: missing HTTP content-type"
grpcMessage string
statusGen *status.Status
recvCompress string
httpStatusCode *int
httpStatusErr string
Expand Down Expand Up @@ -1434,12 +1433,6 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
rawStatusCode = codes.Code(uint32(code))
case "grpc-message":
grpcMessage = decodeGrpcMessage(hf.Value)
case "grpc-status-details-bin":
var err error
statusGen, err = decodeGRPCStatusDetails(hf.Value)
if err != nil {
headerError = fmt.Sprintf("transport: malformed grpc-status-details-bin: %v", err)
}
case ":status":
if hf.Value == "200" {
httpStatusErr = ""
Expand Down Expand Up @@ -1548,14 +1541,12 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}

if statusGen == nil {
statusGen = status.New(rawStatusCode, grpcMessage)
}
status := istatus.NewWithProto(rawStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader])

// If client received END_STREAM from server while stream was still active,
// send RST_STREAM.
rstStream := s.getState() == streamActive
t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, statusGen, mdata, true)
t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, status, mdata, true)
}

// readServerPreface reads and handles the initial settings frame from the
Expand Down
5 changes: 4 additions & 1 deletion internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1057,12 +1057,15 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})

if p := st.Proto(); p != nil && len(p.Details) > 0 {
// Do not use the user's grpc-status-details-bin (if present) if we are
// even attempting to set our own.
delete(s.trailer, grpcStatusDetailsBinHeader)
stBytes, err := proto.Marshal(p)
if err != nil {
// TODO: return error instead, when callers are able to handle it.
t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
} else {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)})
}
}

Expand Down
18 changes: 2 additions & 16 deletions internal/transport/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ import (
"time"
"unicode/utf8"

"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
Expand Down Expand Up @@ -88,6 +85,8 @@ var (
}
)

var grpcStatusDetailsBinHeader = "grpc-status-details-bin"

// isReservedHeader checks whether hdr belongs to HTTP2 headers
// reserved by gRPC protocol. Any other headers are classified as the
// user-specified metadata.
Expand All @@ -103,7 +102,6 @@ func isReservedHeader(hdr string) bool {
"grpc-message",
"grpc-status",
"grpc-timeout",
"grpc-status-details-bin",
// Intentionally exclude grpc-previous-rpc-attempts and
// grpc-retry-pushback-ms, which are "reserved", but their API
// intentionally works via metadata.
Expand Down Expand Up @@ -154,18 +152,6 @@ func decodeMetadataHeader(k, v string) (string, error) {
return v, nil
}

func decodeGRPCStatusDetails(rawDetails string) (*status.Status, error) {
v, err := decodeBinHeader(rawDetails)
if err != nil {
return nil, err
}
st := &spb.Status{}
if err = proto.Unmarshal(v, st); err != nil {
return nil, err
}
return status.FromProto(st), nil
}

type timeoutUnit uint8

const (
Expand Down
Loading

0 comments on commit d0a7967

Please sign in to comment.