Skip to content

Commit

Permalink
Merge branch 'release-4.0' into cherry-pick-2115-to-release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored Jul 8, 2021
2 parents c9b0879 + 4e1190a commit 723051d
Show file tree
Hide file tree
Showing 16 changed files with 810 additions and 526 deletions.
95 changes: 52 additions & 43 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/ticdc/pkg/retry"
Expand All @@ -53,13 +54,15 @@ import (
)

const (
dialTimeout = 10 * time.Second
maxRetry = 100
tikvRequestMaxBackoff = 20000 // Maximum total sleep time(in ms)
grpcInitialWindowSize = 1 << 27 // 128 MB The value for initial window size on a stream
dialTimeout = 10 * time.Second
maxRetry = 100
tikvRequestMaxBackoff = 20000 // Maximum total sleep time(in ms)
// TODO find optimal values and test extensively before releasing
// The old values cause the gRPC stream to stall for some unknown reason.
grpcInitialWindowSize = 1 << 26 // 64 MB The value for initial window size on a stream
grpcInitialConnWindowSize = 1 << 27 // 128 MB The value for initial window size on a connection
grpcMaxCallRecvMsgSize = 1 << 30 // 1024 MB The maximum message size the client can receive
grpcConnCount = 10
grpcMaxCallRecvMsgSize = 1 << 28 // 256 MB The maximum message size the client can receive
grpcConnCount = 2

// The threshold of warning a message is too large. TiKV split events into 6MB per-message.
warnRecvMsgSizeThreshold = 12 * 1024 * 1024
Expand All @@ -69,12 +72,16 @@ const (
// don't need to force reload region any more.
regionScheduleReload = false

// defines the scan region limit for each table
regionScanLimitPerTable = 6
// defaultRegionChanSize is the default channel size for region channel, including
// range request, region request and region error.
// Note the producer of region error channel, and the consumer of range request
// channel work in an asynchronous way, the larger channel can decrease the
// frequency of creating new goroutine.
defaultRegionChanSize = 128
)

// time interval to force kv client to terminate gRPC stream and reconnect
var reconnectInterval = 15 * time.Minute
var reconnectInterval = 60 * time.Minute

// hard code switch
// true: use kv client v2, which has a region worker for each stream
Expand Down Expand Up @@ -177,12 +184,6 @@ func (s *regionFeedState) isStopped() bool {
return atomic.LoadInt32(&s.stopped) > 0
}

func (s *regionFeedState) isInitialized() bool {
s.lock.RLock()
defer s.lock.RUnlock()
return s.initialized
}

func (s *regionFeedState) getLastResolvedTs() uint64 {
s.lock.RLock()
defer s.lock.RUnlock()
Expand Down Expand Up @@ -536,9 +537,6 @@ type eventFeedSession struct {
streams map[string]cdcpb.ChangeData_EventFeedClient
streamsLock sync.RWMutex
streamsCanceller map[string]context.CancelFunc

workers map[string]*regionWorker
workersLock sync.RWMutex
}

type rangeRequestTask struct {
Expand All @@ -559,16 +557,17 @@ func newEventFeedSession(
eventCh chan<- *model.RegionFeedEvent,
) *eventFeedSession {
id := strconv.FormatUint(allocID(), 10)
kvClientCfg := config.GetGlobalServerConfig().KVClient
return &eventFeedSession{
client: client,
regionCache: regionCache,
kvStorage: kvStorage,
totalSpan: totalSpan,
eventCh: eventCh,
regionRouter: NewSizedRegionRouter(ctx, regionScanLimitPerTable),
regionCh: make(chan singleRegionInfo, 16),
errCh: make(chan regionErrorInfo, 16),
requestRangeCh: make(chan rangeRequestTask, 16),
regionRouter: NewSizedRegionRouter(ctx, kvClientCfg.RegionScanLimit),
regionCh: make(chan singleRegionInfo, defaultRegionChanSize),
errCh: make(chan regionErrorInfo, defaultRegionChanSize),
requestRangeCh: make(chan rangeRequestTask, defaultRegionChanSize),
rangeLock: regionspan.NewRegionRangeLock(totalSpan.Start, totalSpan.End, startTs),
enableOldValue: enableOldValue,
enableKVClientV2: enableKVClientV2,
Expand All @@ -580,7 +579,6 @@ func newEventFeedSession(
rangeChSizeGauge: clientChannelSize.WithLabelValues(id, "range"),
streams: make(map[string]cdcpb.ChangeData_EventFeedClient),
streamsCanceller: make(map[string]context.CancelFunc),
workers: make(map[string]*regionWorker),
}
}

Expand Down Expand Up @@ -887,6 +885,9 @@ func (s *eventFeedSession) requestRegionToStore(
// `receiveFromStream`, so no need to retry here.
_, ok := pendingRegions.take(requestID)
if !ok {
// since this pending region has been removed, the token has been
// released in advance, re-add one token here.
s.regionRouter.Acquire(rpcCtx.Addr)
continue
}

Expand Down Expand Up @@ -928,6 +929,29 @@ func (s *eventFeedSession) dispatchRequest(

log.Debug("dispatching region", zap.Uint64("regionID", sri.verID.GetID()))

// Send a resolved ts to event channel first, for two reasons:
// 1. Since we have locked the region range, and have maintained correct
// checkpoint ts for the range, it is safe to report the resolved ts
// to puller at this moment.
// 2. Before the kv client gets region rpcCtx, sends request to TiKV and
// receives the first kv event from TiKV, the region could split or
// merge in advance, which should cause the change of resolved ts
// distribution in puller, so this resolved ts event is needed.
// After this resolved ts event is sent, we don't need to send one more
// resolved ts event when the region starts to work.
resolvedEv := &model.RegionFeedEvent{
RegionID: sri.verID.GetID(),
Resolved: &model.ResolvedSpan{
Span: sri.span,
ResolvedTs: sri.ts,
},
}
select {
case s.eventCh <- resolvedEv:
case <-ctx.Done():
return errors.Trace(ctx.Err())
}

rpcCtx, err := s.getRPCContextForRegion(ctx, sri.verID)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1384,7 +1408,7 @@ func (s *eventFeedSession) sendResolvedTs(
state, ok := regionStates[regionID]
if ok {
if state.isStopped() {
log.Warn("drop resolved ts due to region feed stopped",
log.Debug("drop resolved ts due to region feed stopped",
zap.Uint64("regionID", regionID),
zap.Uint64("requestID", state.requestID),
zap.String("addr", addr))
Expand Down Expand Up @@ -1464,32 +1488,17 @@ func (s *eventFeedSession) singleEventFeed(
return nil
}

select {
case s.eventCh <- &model.RegionFeedEvent{
RegionID: regionID,
Resolved: &model.ResolvedSpan{
Span: span,
ResolvedTs: startTs,
},
}:
case <-ctx.Done():
err = errors.Trace(ctx.Err())
return
}
resolveLockInterval := 20 * time.Second
failpoint.Inject("kvClientResolveLockInterval", func(val failpoint.Value) {
resolveLockInterval = time.Duration(val.(int)) * time.Second
})
failpoint.Inject("kvClientReconnectInterval", func(val failpoint.Value) {
reconnectInterval = time.Duration(val.(int)) * time.Second
})

for {
var event *regionEvent
var ok bool
select {
case <-ctx.Done():
err = errors.Trace(err)
err = errors.Trace(ctx.Err())
return
case <-advanceCheckTicker.C:
if time.Since(startFeedTime) < resolveLockInterval {
Expand All @@ -1504,7 +1513,7 @@ func (s *eventFeedSession) singleEventFeed(
log.Warn("region not receiving event from tikv for too long time",
zap.Uint64("regionID", regionID), zap.Stringer("span", span), zap.Duration("duration", sinceLastEvent))
}
if sinceLastEvent > reconnectInterval {
if sinceLastEvent > reconnectInterval && initialized {
log.Warn("kv client reconnect triggered", zap.Duration("duration", sinceLastEvent))
err = errReconnect
return
Expand All @@ -1520,6 +1529,7 @@ func (s *eventFeedSession) singleEventFeed(
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
zap.Uint64("regionID", regionID), zap.Stringer("span", span),
zap.Duration("duration", sinceLastResolvedTs),
zap.Duration("lastEvent", sinceLastEvent),
zap.Uint64("resolvedTs", lastResolvedTs))
maxVersion := oracle.ComposeTS(oracle.GetPhysical(currentTimeFromPD.Add(-10*time.Second)), 0)
err = s.lockResolver.Resolve(ctx, regionID, maxVersion)
Expand Down Expand Up @@ -1567,14 +1577,13 @@ func (s *eventFeedSession) singleEventFeed(
for _, cachedEvent := range cachedEvents {
revent, err = assembleRowEvent(regionID, cachedEvent, s.enableOldValue)
if err != nil {
err = errors.Trace(err)
return
}
select {
case s.eventCh <- revent:
metricSendEventCommitCounter.Inc()
case <-ctx.Done():
err = errors.Trace(err)
err = errors.Trace(ctx.Err())
return
}
}
Expand Down
48 changes: 46 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,34 @@ func benchmarkSingleWorkerResolvedTs(b *testing.B, clientV2 bool) {
}
}

func benchmarkResolvedTsClientV2(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
InitWorkerPool()
go func() {
RunWorkerPool(ctx) //nolint:errcheck
}()
benchmarkSingleWorkerResolvedTs(b, true /* clientV2 */)
}

func BenchmarkResolvedTsClientV1(b *testing.B) {
benchmarkSingleWorkerResolvedTs(b, false /* clientV1 */)
}

func BenchmarkResolvedTsClientV2(b *testing.B) {
benchmarkSingleWorkerResolvedTs(b, true /* clientV2 */)
benchmarkResolvedTsClientV2(b)
}

func BenchmarkResolvedTsClientV2WorkerPool(b *testing.B) {
hwm := regionWorkerHighWatermark
lwm := regionWorkerLowWatermark
regionWorkerHighWatermark = 10000
regionWorkerLowWatermark = 2000
defer func() {
regionWorkerHighWatermark = hwm
regionWorkerLowWatermark = lwm
}()
benchmarkResolvedTsClientV2(b)
}

func benchmarkMultipleStoreResolvedTs(b *testing.B, clientV2 bool) {
Expand Down Expand Up @@ -493,10 +515,32 @@ func benchmarkMultipleStoreResolvedTs(b *testing.B, clientV2 bool) {
}
}

func benchmarkMultiStoreResolvedTsClientV2(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
InitWorkerPool()
go func() {
RunWorkerPool(ctx) //nolint:errcheck
}()
benchmarkMultipleStoreResolvedTs(b, true /* clientV2 */)
}

func BenchmarkMultiStoreResolvedTsClientV1(b *testing.B) {
benchmarkMultipleStoreResolvedTs(b, false /* clientV1 */)
}

func BenchmarkMultiStoreResolvedTsClientV2(b *testing.B) {
benchmarkMultipleStoreResolvedTs(b, true /* clientV2 */)
benchmarkMultiStoreResolvedTsClientV2(b)
}

func BenchmarkMultiStoreResolvedTsClientV2WorkerPool(b *testing.B) {
hwm := regionWorkerHighWatermark
lwm := regionWorkerLowWatermark
regionWorkerHighWatermark = 1000
regionWorkerLowWatermark = 200
defer func() {
regionWorkerHighWatermark = hwm
regionWorkerLowWatermark = lwm
}()
benchmarkMultiStoreResolvedTsClientV2(b)
}
Loading

0 comments on commit 723051d

Please sign in to comment.