Skip to content

Commit

Permalink
ringhash: more e2e tests from c-core
Browse files Browse the repository at this point in the history
Follow up to grpc#7271 to fix
grpc#6072.

This adds a dozen more end to end tests.

There are tests that I did not port, specifically:

- TestRingHash_TransientFailureSkipToAvailableReady was flaky when I ported it,
so I removed it while investigating.

- TestRingHash_SwitchToLowerPriorityAndThenBack was also flaky, I also removed
it while investigating.

- TestRingHash_ContinuesConnectingWithoutPicksOneSubchannelAtATime, I'm not sure
we implement this behavior, and if we do, it's not working the same way as in
c-core, where the order of subchannel connection attempts is based on the
resolver address order rather than the ring order.

I will follow up with fixes for each one of the remaining tests.
  • Loading branch information
atollena committed Jun 19, 2024
1 parent c04b085 commit 8dafccc
Show file tree
Hide file tree
Showing 3 changed files with 1,222 additions and 120 deletions.
89 changes: 79 additions & 10 deletions internal/testutils/blocking_context_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,105 @@ package testutils
import (
"context"
"net"
"sync"

"google.golang.org/grpc/grpclog"
)

var logger = grpclog.Component("testutils")

// BlockingDialer is a dialer that waits for Resume() to be called before
// dialing.
type BlockingDialer struct {
dialer *net.Dialer
blockCh chan struct{}
mu sync.Mutex // protects holds
holds map[string][]*Hold

dialer *net.Dialer
}

// NewBlockingDialer returns a dialer that waits for Resume() to be called
// before dialing.
func NewBlockingDialer() *BlockingDialer {
return &BlockingDialer{
dialer: &net.Dialer{},
blockCh: make(chan struct{}),
dialer: &net.Dialer{},
holds: make(map[string][]*Hold),
}
}

// DialContext implements a context dialer for use with grpc.WithContextDialer
// dial option for a BlockingDialer.
func (d *BlockingDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) {
d.mu.Lock()
holds := d.holds[addr]
if len(holds) > 0 {
logger.Info("Intercepted connection attempt to addr %s", addr)
hold := holds[0]
d.holds[addr] = holds[1:]
d.mu.Unlock()

close(hold.waitCh)
select {
case <-hold.blockCh:
if hold.err != nil {
return nil, hold.err
}
return d.dialer.DialContext(ctx, "tcp", addr)
case <-ctx.Done():
return nil, ctx.Err()
}
}
// No hold for this addr.
d.mu.Unlock()
return d.dialer.DialContext(ctx, "tcp", addr)
}

// Hold is a connection hold that blocks the dialer when a connection attempt is
// made to the given addr.
type Hold struct {
dialer *BlockingDialer
blockCh chan error
waitCh chan struct{}
err error
addr string
}

// Hold blocks the dialer when a connection attempt is made to the given addr.
// A hold is valid for exactly one connection attempt. Multiple holds for an
// addr can be added, and they will apply in the order that the connection are
// attempted.
func (d *BlockingDialer) Hold(addr string) *Hold {
d.mu.Lock()
defer d.mu.Unlock()

h := Hold{dialer: d, blockCh: make(chan error), waitCh: make(chan struct{}), addr: addr}
d.holds[addr] = append(d.holds[addr], &h)
return &h
}

// Wait returns a channel that blocks until there is a connection attempt on
// this Hold. Return false if the context has expired, true otherwise.
func (h *Hold) Wait(ctx context.Context) bool {
logger.Infof("Waiting for a connection attempt to addr %s", h.addr)
select {
case <-d.blockCh:
case <-ctx.Done():
return nil, ctx.Err()
return false
case <-h.waitCh:
}
return d.dialer.DialContext(ctx, "tcp", addr)
logger.Infof("Connection attempt started to addr %s", h.addr)
return true
}

// Resume unblocks the dialer for the given addr. If called multiple times on
// the same hold, Resume panics.
func (h *Hold) Resume() {
logger.Infof("Resuming connection attempt to addr %s", h.addr)
close(h.blockCh)
}

// Resume unblocks the dialer. It panics if called more than once.
func (d *BlockingDialer) Resume() {
close(d.blockCh)
// Fail fails the connection attempt. If called multiple times on the same hold,
// Fail panics.
func (h *Hold) Fail(err error) {
logger.Infof("Failing connection attempt to addr %s", h.addr)
h.err = err
close(h.blockCh)
}
194 changes: 194 additions & 0 deletions internal/testutils/blocking_context_dialer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package testutils

import (
"context"
"errors"
"testing"
"time"
)

const testTimeout = 5 * time.Second

func (s) TestBlockingDialer_NoHold(t *testing.T) {
lis, err := LocalTCPListener()
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer lis.Close()

d := NewBlockingDialer()

// This should not block.
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
conn, err := d.DialContext(ctx, lis.Addr().String())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
conn.Close()
}

func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) {
lis, err := LocalTCPListener()
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer lis.Close()

d := NewBlockingDialer()
h := d.Hold(lis.Addr().String())

done := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
go func() {
conn, err := d.DialContext(ctx, lis.Addr().String())
if err != nil {
t.Errorf("BlockingDialer.DialContext() got error: %v, want success", err)
}
conn.Close()
done <- struct{}{}
}()

// This should block until the goroutine above is scheduled.
if !h.Wait(ctx) {
t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr)
}
select {
case <-done:
t.Errorf("Expected dialer to be blocked.")
default:
}

h.Resume() // Unblock the above goroutine.

select {
case <-done:
case <-ctx.Done():
t.Errorf("Timeout waiting for connection attempt to resume.")
}
}

func (s) TestBlockingDialer_HoldWaitFail(t *testing.T) {
lis, err := LocalTCPListener()
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer lis.Close()

d := NewBlockingDialer()
h := d.Hold(lis.Addr().String())

wantErr := errors.New("test error")

done := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
go func() {
_, err := d.DialContext(ctx, lis.Addr().String())
if !errors.Is(err, wantErr) {
t.Errorf("BlockingDialer.DialContext() after Fail(): got error %v, want %v", err, wantErr)
}
done <- struct{}{}
}()

if !h.Wait(ctx) {
t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr)
}
select {
case <-done:
t.Errorf("Expected dialer to still be blocked after Wait()")
default:
}

h.Fail(wantErr)

select {
case <-done:
case <-ctx.Done():
t.Errorf("Timeout waiting for connection attempt to fail.")
}
}

func (s) TestBlockingDialer_ContextCanceled(t *testing.T) {
lis, err := LocalTCPListener()
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer lis.Close()

d := NewBlockingDialer()
h := d.Hold(lis.Addr().String())

done := make(chan struct{})
testCtx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

ctx, cancel := context.WithCancel(testCtx)
defer cancel()
go func() {
_, err := d.DialContext(ctx, lis.Addr().String())
if !errors.Is(err, context.Canceled) {
t.Errorf("BlockingDialer.DialContext() after context cancel: got error %v, want %v", err, context.Canceled)
}
done <- struct{}{}
}()
if !h.Wait(ctx) {
t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr)
}
cancel()

select {
case <-done:
case <-testCtx.Done():
t.Errorf("Timeout while waiting for Wait to return.")
}
}

func (s) TestBlockingDialer_CancelWait(t *testing.T) {
lis, err := LocalTCPListener()
if err != nil {
t.Fatalf("failed to listen: %v", err)
}
defer lis.Close()

d := NewBlockingDialer()
h := d.Hold(lis.Addr().String())

testCtx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

ctx, cancel := context.WithTimeout(testCtx, 0)
defer cancel()
done := make(chan struct{})
go func() {
if h.Wait(ctx) {
t.Errorf("Expected cancel to return false when context expires")
}
done <- struct{}{}
}()

select {
case <-done:
case <-testCtx.Done():
t.Errorf("Timeout while waiting for Wait to return.")
}
}
Loading

0 comments on commit 8dafccc

Please sign in to comment.