Skip to content

Commit

Permalink
Update flow control test to have multiple concurrent streams. (#2170)
Browse files Browse the repository at this point in the history
MakMukhi authored Jun 22, 2018

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 92d38b0 commit 8e18752
Showing 1 changed file with 71 additions and 43 deletions.
114 changes: 71 additions & 43 deletions transport/transport_test.go
Original file line number Diff line number Diff line change
@@ -1812,47 +1812,69 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
}
server.mu.Unlock()
ct := client.(*http2Client)
cstream, err := client.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("Failed to create stream. Err: %v", err)
}
msg := make([]byte, msgSize)
buf := make([]byte, msgSize+5)
buf[0] = byte(0)
binary.BigEndian.PutUint32(buf[1:], uint32(msgSize))
copy(buf[5:], msg)
opts := Options{}
header := make([]byte, 5)
for i := 1; i <= 10; i++ {
if err := ct.Write(cstream, nil, buf, &opts); err != nil {
t.Fatalf("Error on client while writing message: %v", err)
}
if _, err := cstream.Read(header); err != nil {
t.Fatalf("Error on client while reading data frame header: %v", err)
}
sz := binary.BigEndian.Uint32(header[1:])
recvMsg := make([]byte, int(sz))
if _, err := cstream.Read(recvMsg); err != nil {
t.Fatalf("Error on client while reading data: %v", err)
}
if len(recvMsg) != len(msg) {
t.Fatalf("Length of message received by client: %v, want: %v", len(recvMsg), len(msg))
const numStreams = 10
clientStreams := make([]*Stream, numStreams)
for i := 0; i < numStreams; i++ {
var err error
clientStreams[i], err = client.NewStream(context.Background(), &CallHdr{})
if err != nil {
t.Fatalf("Failed to create stream. Err: %v", err)
}
}
var sstream *Stream
var wg sync.WaitGroup
// For each stream send pingpong messages to the server.
for _, stream := range clientStreams {
wg.Add(1)
go func(stream *Stream) {
defer wg.Done()
buf := make([]byte, msgSize+5)
buf[0] = byte(0)
binary.BigEndian.PutUint32(buf[1:], uint32(msgSize))
opts := Options{}
header := make([]byte, 5)
for i := 1; i <= 10; i++ {
if err := ct.Write(stream, nil, buf, &opts); err != nil {
t.Errorf("Error on client while writing message: %v", err)
return
}
if _, err := stream.Read(header); err != nil {
t.Errorf("Error on client while reading data frame header: %v", err)
return
}
sz := binary.BigEndian.Uint32(header[1:])
recvMsg := make([]byte, int(sz))
if _, err := stream.Read(recvMsg); err != nil {
t.Errorf("Error on client while reading data: %v", err)
return
}
if len(recvMsg) != msgSize {
t.Errorf("Length of message received by client: %v, want: %v", len(recvMsg), msgSize)
return
}
}
}(stream)
}
wg.Wait()
serverStreams := map[uint32]*Stream{}
loopyClientStreams := map[uint32]*outStream{}
loopyServerStreams := map[uint32]*outStream{}
// Get all the streams from server reader and writer and client writer.
st.mu.Lock()
for _, v := range st.activeStreams {
sstream = v
for _, stream := range clientStreams {
id := stream.id
serverStreams[id] = st.activeStreams[id]
loopyServerStreams[id] = st.loopy.estdStreams[id]
loopyClientStreams[id] = ct.loopy.estdStreams[id]

}
st.mu.Unlock()
loopyServerStream := st.loopy.estdStreams[sstream.id]
loopyClientStream := ct.loopy.estdStreams[cstream.id]
ct.Write(cstream, nil, nil, &Options{Last: true}) // Close the stream.
if _, err := cstream.Read(header); err != io.EOF {
t.Fatalf("Client expected an EOF from the server. Got: %v", err)
}
// Sleep for a little to make sure both sides flush out their buffers.
time.Sleep(time.Millisecond * 500)
// Close all streams
for _, stream := range clientStreams {
ct.Write(stream, nil, nil, &Options{Last: true})
if _, err := stream.Read(make([]byte, 5)); err != io.EOF {
t.Fatalf("Client expected an EOF from the server. Got: %v", err)
}
}
// Close down both server and client so that their internals can be read without data
// races.
ct.Close()
@@ -1861,20 +1883,26 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
<-st.writerDone
<-ct.readerDone
<-ct.writerDone
for _, cstream := range clientStreams {
id := cstream.id
sstream := serverStreams[id]
loopyServerStream := loopyServerStreams[id]
loopyClientStream := loopyClientStreams[id]
// Check stream flow control.
if int(cstream.fc.limit+cstream.fc.delta-cstream.fc.pendingData-cstream.fc.pendingUpdate) != int(st.loopy.oiws)-loopyServerStream.bytesOutStanding {
t.Fatalf("Account mismatch: client stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != server outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", cstream.fc.limit, cstream.fc.delta, cstream.fc.pendingData, cstream.fc.pendingUpdate, st.loopy.oiws, loopyServerStream.bytesOutStanding)
}
if int(sstream.fc.limit+sstream.fc.delta-sstream.fc.pendingData-sstream.fc.pendingUpdate) != int(ct.loopy.oiws)-loopyClientStream.bytesOutStanding {
t.Fatalf("Account mismatch: server stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != client outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", sstream.fc.limit, sstream.fc.delta, sstream.fc.pendingData, sstream.fc.pendingUpdate, ct.loopy.oiws, loopyClientStream.bytesOutStanding)
}
}
// Check transport flow control.
if ct.fc.limit != ct.fc.unacked+st.loopy.sendQuota {
t.Fatalf("Account mismatch: client transport inflow(%d) != client unacked(%d) + server sendQuota(%d)", ct.fc.limit, ct.fc.unacked, st.loopy.sendQuota)
}
if st.fc.limit != st.fc.unacked+ct.loopy.sendQuota {
t.Fatalf("Account mismatch: server transport inflow(%d) != server unacked(%d) + client sendQuota(%d)", st.fc.limit, st.fc.unacked, ct.loopy.sendQuota)
}
// Check stream flow control.
if int(cstream.fc.limit+cstream.fc.delta-cstream.fc.pendingData-cstream.fc.pendingUpdate) != int(st.loopy.oiws)-loopyServerStream.bytesOutStanding {
t.Fatalf("Account mismatch: client stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != server outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", cstream.fc.limit, cstream.fc.delta, cstream.fc.pendingData, cstream.fc.pendingUpdate, st.loopy.oiws, loopyServerStream.bytesOutStanding)
}
if int(sstream.fc.limit+sstream.fc.delta-sstream.fc.pendingData-sstream.fc.pendingUpdate) != int(ct.loopy.oiws)-loopyClientStream.bytesOutStanding {
t.Fatalf("Account mismatch: server stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != client outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", sstream.fc.limit, sstream.fc.delta, sstream.fc.pendingData, sstream.fc.pendingUpdate, ct.loopy.oiws, loopyClientStream.bytesOutStanding)
}
}

func waitWhileTrue(t *testing.T, condition func() (bool, error)) {

0 comments on commit 8e18752

Please sign in to comment.