Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: fix the pd client could be blocked in some cases #3283

Merged
merged 3 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func WithRetry(retry uint64) RegionsOption {

type tsoRequest struct {
start time.Time
client *client
zier-one marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context
done chan error
physical int64
Expand Down Expand Up @@ -567,6 +568,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
}
req := tsoReqPool.Get().(*tsoRequest)
req.ctx = ctx
req.client = c
req.start = time.Now()
req.dcLocation = dcLocation
c.waitForDispatcher()
Expand Down Expand Up @@ -630,6 +632,8 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) {
return
case <-req.ctx.Done():
return 0, 0, errors.WithStack(req.ctx.Err())
case <-req.client.ctx.Done():
return 0, 0, errors.WithStack(req.client.ctx.Err())
}
}

Expand Down
31 changes: 31 additions & 0 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/pkg/testutil"
"go.uber.org/goleak"
Expand Down Expand Up @@ -109,3 +110,33 @@ func (s *testClientDialOptionSuite) TestGRPCDialOption(c *C) {
c.Assert(err, NotNil)
c.Assert(time.Since(start), Greater, 500*time.Millisecond)
}

var _ = Suite(&testTsoRequestSuite{})

type testTsoRequestSuite struct{}

func (s *testTsoRequestSuite) TestTsoRequestWait(c *C) {
ctx, cancel := context.WithCancel(context.Background())
cli := &client{
baseClient: &baseClient{
urls: []string{"localhost:8080"},
checkLeaderCh: make(chan struct{}, 1),
ctx: ctx,
cancel: cancel,
security: SecurityOption{},
gRPCDialOptions: []grpc.DialOption{grpc.WithBlock()},
},
checkTSDeadlineCh: make(chan struct{}),
}

tsoRequest := &tsoRequest{
done: make(chan error, 1),
physical: 0,
logical: 0,
ctx: context.TODO(),
client: cli,
}
cancel()
_, _, err := tsoRequest.Wait()
c.Assert(errors.Cause(err), Equals, context.Canceled)
}