Skip to content

Commit

Permalink
cherry pick pingcap#1196 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
amyangfei authored and ti-srebot committed Dec 11, 2020
1 parent 402379a commit 78c0e74
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,20 @@ func (c *CDCClient) newStream(ctx context.Context, addr string, storeID uint64)
}
err = version.CheckStoreVersion(ctx, c.pd, storeID)
if err != nil {
conn.Close()
// TODO: we don't close gPRC conn here, let it goes into TransientFailure
// state. If the store recovers, the gPRC conn can be reused. But if
// store goes away forever, the conn will be leaked, we need a better
// connection pool.
log.Error("check tikv version failed", zap.Error(err), zap.Uint64("storeID", storeID))
return errors.Trace(err)
}
client := cdcpb.NewChangeDataClient(conn)
stream, err = client.EventFeed(ctx)
if err != nil {
conn.Close()
// TODO: we don't close gPRC conn here, let it goes into TransientFailure
// state. If the store recovers, the gPRC conn can be reused. But if
// store goes away forever, the conn will be leaked, we need a better
// connection pool.
err = cerror.WrapError(cerror.ErrTiKVEventFeed, err)
log.Info("establish stream to store failed, retry later", zap.String("addr", addr), zap.Error(err))
return err
Expand Down

0 comments on commit 78c0e74

Please sign in to comment.