From 39c68200779a8ef1021f72d7bae83d53d9db9b69 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 22 Jun 2018 12:53:21 -0700 Subject: [PATCH] Update flow control test to have multiple concurrent streams. --- transport/transport_test.go | 114 ++++++++++++++++++++++-------------- 1 file changed, 71 insertions(+), 43 deletions(-) diff --git a/transport/transport_test.go b/transport/transport_test.go index df695cbcf7e4..a7654d9faa42 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -1800,47 +1800,69 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) } server.mu.Unlock() ct := client.(*http2Client) - cstream, err := client.NewStream(context.Background(), &CallHdr{}) - if err != nil { - t.Fatalf("Failed to create stream. Err: %v", err) - } - msg := make([]byte, msgSize) - buf := make([]byte, msgSize+5) - buf[0] = byte(0) - binary.BigEndian.PutUint32(buf[1:], uint32(msgSize)) - copy(buf[5:], msg) - opts := Options{} - header := make([]byte, 5) - for i := 1; i <= 10; i++ { - if err := ct.Write(cstream, nil, buf, &opts); err != nil { - t.Fatalf("Error on client while writing message: %v", err) - } - if _, err := cstream.Read(header); err != nil { - t.Fatalf("Error on client while reading data frame header: %v", err) - } - sz := binary.BigEndian.Uint32(header[1:]) - recvMsg := make([]byte, int(sz)) - if _, err := cstream.Read(recvMsg); err != nil { - t.Fatalf("Error on client while reading data: %v", err) - } - if len(recvMsg) != len(msg) { - t.Fatalf("Length of message received by client: %v, want: %v", len(recvMsg), len(msg)) + const numStreams = 10 + clientStreams := make([]*Stream, numStreams) + for i := 0; i < numStreams; i++ { + var err error + clientStreams[i], err = client.NewStream(context.Background(), &CallHdr{}) + if err != nil { + t.Fatalf("Failed to create stream. Err: %v", err) } } - var sstream *Stream + var wg sync.WaitGroup + // For each stream send pingpong messages to the server. + for _, stream := range clientStreams { + wg.Add(1) + go func(stream *Stream) { + defer wg.Done() + buf := make([]byte, msgSize+5) + buf[0] = byte(0) + binary.BigEndian.PutUint32(buf[1:], uint32(msgSize)) + opts := Options{} + header := make([]byte, 5) + for i := 1; i <= 10; i++ { + if err := ct.Write(stream, nil, buf, &opts); err != nil { + t.Errorf("Error on client while writing message: %v", err) + return + } + if _, err := stream.Read(header); err != nil { + t.Errorf("Error on client while reading data frame header: %v", err) + return + } + sz := binary.BigEndian.Uint32(header[1:]) + recvMsg := make([]byte, int(sz)) + if _, err := stream.Read(recvMsg); err != nil { + t.Errorf("Error on client while reading data: %v", err) + return + } + if len(recvMsg) != msgSize { + t.Errorf("Length of message received by client: %v, want: %v", len(recvMsg), msgSize) + return + } + } + }(stream) + } + wg.Wait() + serverStreams := map[uint32]*Stream{} + loopyClientStreams := map[uint32]*outStream{} + loopyServerStreams := map[uint32]*outStream{} + // Get all the streams from server reader and writer and client writer. st.mu.Lock() - for _, v := range st.activeStreams { - sstream = v + for _, stream := range clientStreams { + id := stream.id + serverStreams[id] = st.activeStreams[id] + loopyServerStreams[id] = st.loopy.estdStreams[id] + loopyClientStreams[id] = ct.loopy.estdStreams[id] + } st.mu.Unlock() - loopyServerStream := st.loopy.estdStreams[sstream.id] - loopyClientStream := ct.loopy.estdStreams[cstream.id] - ct.Write(cstream, nil, nil, &Options{Last: true}) // Close the stream. - if _, err := cstream.Read(header); err != io.EOF { - t.Fatalf("Client expected an EOF from the server. Got: %v", err) - } - // Sleep for a little to make sure both sides flush out their buffers. - time.Sleep(time.Millisecond * 500) + // Close all streams + for _, stream := range clientStreams { + ct.Write(stream, nil, nil, &Options{Last: true}) + if _, err := stream.Read(make([]byte, 5)); err != io.EOF { + t.Fatalf("Client expected an EOF from the server. Got: %v", err) + } + } // Close down both server and client so that their internals can be read without data // races. ct.Close() @@ -1849,6 +1871,19 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) <-st.writerDone <-ct.readerDone <-ct.writerDone + for _, cstream := range clientStreams { + id := cstream.id + sstream := serverStreams[id] + loopyServerStream := loopyServerStreams[id] + loopyClientStream := loopyClientStreams[id] + // Check stream flow control. + if int(cstream.fc.limit+cstream.fc.delta-cstream.fc.pendingData-cstream.fc.pendingUpdate) != int(st.loopy.oiws)-loopyServerStream.bytesOutStanding { + t.Fatalf("Account mismatch: client stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != server outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", cstream.fc.limit, cstream.fc.delta, cstream.fc.pendingData, cstream.fc.pendingUpdate, st.loopy.oiws, loopyServerStream.bytesOutStanding) + } + if int(sstream.fc.limit+sstream.fc.delta-sstream.fc.pendingData-sstream.fc.pendingUpdate) != int(ct.loopy.oiws)-loopyClientStream.bytesOutStanding { + t.Fatalf("Account mismatch: server stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != client outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", sstream.fc.limit, sstream.fc.delta, sstream.fc.pendingData, sstream.fc.pendingUpdate, ct.loopy.oiws, loopyClientStream.bytesOutStanding) + } + } // Check transport flow control. if ct.fc.limit != ct.fc.unacked+st.loopy.sendQuota { t.Fatalf("Account mismatch: client transport inflow(%d) != client unacked(%d) + server sendQuota(%d)", ct.fc.limit, ct.fc.unacked, st.loopy.sendQuota) @@ -1856,13 +1891,6 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) if st.fc.limit != st.fc.unacked+ct.loopy.sendQuota { t.Fatalf("Account mismatch: server transport inflow(%d) != server unacked(%d) + client sendQuota(%d)", st.fc.limit, st.fc.unacked, ct.loopy.sendQuota) } - // Check stream flow control. - if int(cstream.fc.limit+cstream.fc.delta-cstream.fc.pendingData-cstream.fc.pendingUpdate) != int(st.loopy.oiws)-loopyServerStream.bytesOutStanding { - t.Fatalf("Account mismatch: client stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != server outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", cstream.fc.limit, cstream.fc.delta, cstream.fc.pendingData, cstream.fc.pendingUpdate, st.loopy.oiws, loopyServerStream.bytesOutStanding) - } - if int(sstream.fc.limit+sstream.fc.delta-sstream.fc.pendingData-sstream.fc.pendingUpdate) != int(ct.loopy.oiws)-loopyClientStream.bytesOutStanding { - t.Fatalf("Account mismatch: server stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != client outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", sstream.fc.limit, sstream.fc.delta, sstream.fc.pendingData, sstream.fc.pendingUpdate, ct.loopy.oiws, loopyClientStream.bytesOutStanding) - } } func waitWhileTrue(t *testing.T, condition func() (bool, error)) {