Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: fix ClientStream.Header() behavior #6557

Merged
merged 5 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1505,30 +1505,25 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}

isHeader := false

// If headerChan hasn't been closed yet
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
s.headerValid = true
if !endStream {
// HEADERS frame block carries a Response-Headers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: keep this comment?

isHeader = true
if !endStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional (use some/all/none): mention this is headers flow, and that for trailers flow, header chan gets closed in closeStream() which writes status before unblocking the header chan, letting client stream read the status after it's block on header chan (in csAttempt).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment

// If headerChan hasn't been closed yet (expected, given we checked it
// above, but something else could have potentially closed the whole
// stream).
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
s.headerValid = true
// These values can be set without any synchronization because
// stream goroutine will read it only after seeing a closed
// headerChan which we'll close after setting this.
s.recvCompress = recvCompress
if len(mdata) > 0 {
s.header = mdata
}
} else {
// HEADERS frame block carries a Trailers-Only.
s.noHeaders = true
close(s.headerChan)
}
close(s.headerChan)
}

for _, sh := range t.statsHandlers {
if isHeader {
if !endStream {
inHeader := &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
Expand All @@ -1554,9 +1549,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
statusGen = status.New(rawStatusCode, grpcMessage)
}

// if client received END_STREAM from server while stream was still active, send RST_STREAM
rst := s.getState() == streamActive
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, statusGen, mdata, true)
// If client received END_STREAM from server while stream was still active,
// send RST_STREAM.
rstStream := s.getState() == streamActive
t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, statusGen, mdata, true)
}

// readServerPreface reads and handles the initial settings frame from the
Expand Down
10 changes: 1 addition & 9 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ import (
"google.golang.org/grpc/tap"
)

// ErrNoHeaders is used as a signal that a trailers only response was received,
// and is not a real error.
var ErrNoHeaders = errors.New("stream has no headers")

const logLevel = 2

type bufferPool struct {
Expand Down Expand Up @@ -390,14 +386,10 @@ func (s *Stream) Header() (metadata.MD, error) {
}
s.waitOnHeader()

if !s.headerValid {
if !s.headerValid || s.noHeaders {
return nil, s.status.Err()
}

if s.noHeaders {
return nil, ErrNoHeaders
}

return s.header.Copy(), nil
}

Expand Down
60 changes: 29 additions & 31 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,23 +789,23 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())

func (cs *clientStream) Header() (metadata.MD, error) {
var m metadata.MD
noHeader := false
err := cs.withRetry(func(a *csAttempt) error {
var err error
m, err = a.s.Header()
if err == transport.ErrNoHeaders {
noHeader = true
return nil
}
return toRPCErr(err)
}, cs.commitAttemptLocked)

if m == nil && err == nil {
// The stream ended with success. Finish the clientStream.
err = io.EOF
}

if err != nil {
cs.finish(err)
return nil, err
}

if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && !noHeader {
if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil {
// Only log if binary log is on and header has not been logged, and
// there is actually headers to log.
logEntry := &binarylog.ServerHeader{
Expand All @@ -821,6 +821,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
binlog.Log(cs.ctx, logEntry)
}
}

return m, nil
}

Expand Down Expand Up @@ -929,24 +930,6 @@ func (cs *clientStream) RecvMsg(m any) error {
if err != nil || !cs.desc.ServerStreams {
// err != nil or non-server-streaming indicates end of stream.
cs.finish(err)

if len(cs.binlogs) != 0 {
// finish will not log Trailer. Log Trailer here.
logEntry := &binarylog.ServerTrailer{
OnClientSide: true,
Trailer: cs.Trailer(),
Err: err,
}
if logEntry.Err == io.EOF {
logEntry.Err = nil
}
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, logEntry)
}
}
}
return err
}
Expand Down Expand Up @@ -1002,18 +985,33 @@ func (cs *clientStream) finish(err error) {
}
}
}

cs.mu.Unlock()
// For binary logging. only log cancel in finish (could be caused by RPC ctx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it logs cancel and server trailers :).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think the logic here is a bit wrong, since the server could return a Canceled status with trailers which is distinct from the client initiating cancelation. I'll have to work on this and add a test since I think it worked properly before.

// canceled or ClientConn closed). Trailer will be logged in RecvMsg.
// canceled or ClientConn closed).
//
// Only one of cancel or trailer needs to be logged. In the cases where
// users don't call RecvMsg, users must have already canceled the RPC.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: delete this comment, no longer applies since we don't log both events in different places: "In the cases where users don't call RecvMsg, users must have already canceled the RPC."

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

if len(cs.binlogs) != 0 && status.Code(err) == codes.Canceled {
c := &binarylog.Cancel{
OnClientSide: true,
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, c)
if len(cs.binlogs) != 0 {
if status.Code(err) == codes.Canceled {
c := &binarylog.Cancel{
OnClientSide: true,
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, c)
}
} else {
logEntry := &binarylog.ServerTrailer{
OnClientSide: true,
Trailer: cs.Trailer(),
Err: err,
}
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
for _, binlog := range cs.binlogs {
binlog.Log(cs.ctx, logEntry)
}
}
}
if err == nil {
Expand Down
9 changes: 4 additions & 5 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6339,12 +6339,11 @@ func (s) TestGlobalBinaryLoggingOptions(t *testing.T) {
return &testpb.SimpleResponse{}, nil
},
FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
for {
_, err := stream.Recv()
if err == io.EOF {
return nil
}
_, err := stream.Recv()
if err == io.EOF {
return nil
}
return status.Errorf(codes.Unknown, "expected client to CloseSend")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: s/CloseSend/call CloseSend().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

},
}

Expand Down
20 changes: 17 additions & 3 deletions test/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ func (s) TestRetryStreaming(t *testing.T) {
return nil
}
}
sHdr := func() serverOp {
return func(stream testgrpc.TestService_FullDuplexCallServer) error {
return stream.SendHeader(metadata.Pairs("test_header", "test_value"))
}
}
sRes := func(b byte) serverOp {
return func(stream testgrpc.TestService_FullDuplexCallServer) error {
msg := res(b)
Expand All @@ -222,7 +227,7 @@ func (s) TestRetryStreaming(t *testing.T) {
}
sErr := func(c codes.Code) serverOp {
return func(stream testgrpc.TestService_FullDuplexCallServer) error {
return status.New(c, "").Err()
return status.New(c, "this is a test error").Err()
}
}
sCloseSend := func() serverOp {
Expand Down Expand Up @@ -270,7 +275,7 @@ func (s) TestRetryStreaming(t *testing.T) {
}
cErr := func(c codes.Code) clientOp {
return func(stream testgrpc.TestService_FullDuplexCallClient) error {
want := status.New(c, "").Err()
want := status.New(c, "this is a test error").Err()
if c == codes.OK {
want = io.EOF
}
Expand Down Expand Up @@ -309,6 +314,11 @@ func (s) TestRetryStreaming(t *testing.T) {
cHdr := func() clientOp {
return func(stream testgrpc.TestService_FullDuplexCallClient) error {
_, err := stream.Header()
if err == io.EOF {
// The stream ended successfully; convert to nil to avoid
// erroring the test case.
err = nil
}
return err
}
}
Expand Down Expand Up @@ -362,9 +372,13 @@ func (s) TestRetryStreaming(t *testing.T) {
sReq(1), sRes(3), sErr(codes.Unavailable),
},
clientOps: []clientOp{cReq(1), cRes(3), cErr(codes.Unavailable)},
}, {
desc: "Retry via ClientStream.Header()",
serverOps: []serverOp{sReq(1), sErr(codes.Unavailable), sReq(1), sAttempts(1)},
clientOps: []clientOp{cReq(1), cHdr() /* this should cause a retry */, cErr(codes.OK)},
}, {
desc: "No retry after header",
serverOps: []serverOp{sReq(1), sErr(codes.Unavailable)},
serverOps: []serverOp{sReq(1), sHdr(), sErr(codes.Unavailable)},
clientOps: []clientOp{cReq(1), cHdr(), cErr(codes.Unavailable)},
}, {
desc: "No retry after context",
Expand Down