Skip to content

Commit

Permalink
kvClient (ticdc): revert e5999e3 to remove useless metrics (pingcap#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen authored May 29, 2024
1 parent 54e93ed commit 6277d9a
Show file tree
Hide file tree
Showing 14 changed files with 60 additions and 173 deletions.
79 changes: 12 additions & 67 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"fmt"
"io"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand All @@ -39,7 +38,6 @@ import (
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/txnutil"
"github.com/pingcap/tiflow/pkg/version"
"github.com/prometheus/client_golang/prometheus"
tidbkv "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -75,8 +73,6 @@ const (
resolveLockMinInterval = 10 * time.Second

scanRegionsConcurrency = 1024

tableMonitorInterval = 2 * time.Second
)

// time interval to force kv client to terminate gRPC stream and reconnect
Expand Down Expand Up @@ -166,7 +162,6 @@ type CDCKVClient interface {
ts uint64,
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) error

// RegionCount returns the number of captured regions.
Expand Down Expand Up @@ -310,9 +305,8 @@ func (c *CDCClient) EventFeed(
ctx context.Context, span regionspan.ComparableSpan, ts uint64,
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) error {
s := newEventFeedSession(c, span, lockResolver, ts, eventCh, enableTableMonitor)
s := newEventFeedSession(c, span, lockResolver, ts, eventCh)
return s.eventFeed(ctx)
}

Expand Down Expand Up @@ -396,11 +390,6 @@ type eventFeedSession struct {

rangeLock *regionspan.RegionRangeLock

enableTableMonitor bool
regionChSizeGauge prometheus.Gauge
errChSizeGauge prometheus.Gauge
rangeChSizeGauge prometheus.Gauge

// storeStreamsCache is used to cache the established gRPC streams to TiKV stores.
// Note: The cache is not thread-safe, so it should be accessed in the same goroutine.
// For now, it is only accessed in the `requestRegionToStore` goroutine.
Expand All @@ -421,31 +410,23 @@ func newEventFeedSession(
lockResolver txnutil.LockResolver,
startTs uint64,
eventCh chan<- model.RegionFeedEvent,
enableTableMonitor bool,
) *eventFeedSession {
id := allocateRequestID()
rangeLock := regionspan.NewRegionRangeLock(
id, totalSpan.Start, totalSpan.End, startTs,
client.changefeed.Namespace+"."+client.changefeed.ID)

return &eventFeedSession{
client: client,
startTs: startTs,
changefeed: client.changefeed,
tableID: client.tableID,
tableName: client.tableName,
storeStreamsCache: make(map[string]*eventFeedStream),
totalSpan: totalSpan,
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
enableTableMonitor: enableTableMonitor,
regionChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "region"),
errChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "err"),
rangeChSizeGauge: clientChannelSize.WithLabelValues(client.changefeed.Namespace,
client.changefeed.ID, strconv.FormatInt(client.tableID, 10), "range"),
client: client,
startTs: startTs,
changefeed: client.changefeed,
tableID: client.tableID,
tableName: client.tableName,
storeStreamsCache: make(map[string]*eventFeedStream),
totalSpan: totalSpan,
eventCh: eventCh,
rangeLock: rangeLock,
lockResolver: lockResolver,
resolvedTsPool: sync.Pool{
New: func() any {
return &regionStatefulEvent{
Expand Down Expand Up @@ -486,7 +467,6 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case task := <-s.requestRangeCh.Out():
s.rangeChSizeGauge.Dec()
// divideAndSendEventFeedToRegions could be blocked for some time,
// since it must wait for the region lock available. In order to
// consume region range request from `requestRangeCh` as soon as
Expand All @@ -508,7 +488,6 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()
case errInfo := <-s.errCh.Out():
s.errChSizeGauge.Dec()
if err := s.handleError(ctx, errInfo); err != nil {
return err
}
Expand All @@ -518,7 +497,6 @@ func (s *eventFeedSession) eventFeed(ctx context.Context) error {
})

s.requestRangeCh.In() <- rangeRequestTask{span: s.totalSpan}
s.rangeChSizeGauge.Inc()

log.Info("event feed started",
zap.String("namespace", s.changefeed.Namespace),
Expand All @@ -539,7 +517,6 @@ func (s *eventFeedSession) scheduleDivideRegionAndRequest(
task := rangeRequestTask{span: span}
select {
case s.requestRangeCh.In() <- task:
s.rangeChSizeGauge.Inc()
case <-ctx.Done():
}
}
Expand All @@ -553,7 +530,6 @@ func (s *eventFeedSession) scheduleRegionRequest(ctx context.Context, sri single
sri.lockedRange = res.LockedRange
select {
case s.regionCh.In() <- sri:
s.regionChSizeGauge.Inc()
case <-ctx.Done():
}
case regionspan.LockRangeStatusStale:
Expand Down Expand Up @@ -615,7 +591,6 @@ func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErr
zap.Error(errorInfo.err))
select {
case s.errCh.In() <- errorInfo:
s.errChSizeGauge.Inc()
case <-ctx.Done():
}
}
Expand Down Expand Up @@ -791,7 +766,6 @@ func (s *eventFeedSession) dispatchRequest(ctx context.Context) error {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case sri = <-s.regionCh.Out():
s.regionChSizeGauge.Dec()
}

// Send a resolved ts to event channel first, for two reasons:
Expand Down Expand Up @@ -1040,11 +1014,6 @@ func (s *eventFeedSession) receiveFromStream(

metricSendEventBatchResolvedSize := batchResolvedEventSize.
WithLabelValues(s.changefeed.Namespace, s.changefeed.ID)
metricReceiveBusyRatio := workerBusyRatio.WithLabelValues(
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-receiver")
metricProcessBusyRatio := workerBusyRatio.WithLabelValues(
s.changefeed.Namespace, s.changefeed.ID, strconv.FormatInt(s.tableID, 10), stream.addr, "event-processor")

// always create a new region worker, because `receiveFromStream` is ensured
// to call exactly once from outer code logic
worker := newRegionWorker(parentCtx, stream, s)
Expand All @@ -1063,7 +1032,7 @@ func (s *eventFeedSession) receiveFromStream(

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
err := handleExit(worker.run(s.enableTableMonitor))
err := handleExit(worker.run())
if err != nil {
log.Error("region worker exited with error",
zap.String("namespace", s.changefeed.Namespace),
Expand All @@ -1079,32 +1048,10 @@ func (s *eventFeedSession) receiveFromStream(
})

receiveEvents := func() error {
var receiveTime time.Duration
var processTime time.Duration
startToWork := time.Now()

maxCommitTs := model.Ts(0)
for {
startToReceive := time.Now()
cevent, err := stream.client.Recv()

if s.enableTableMonitor {
receiveTime += time.Since(startToReceive)
if time.Since(startToWork) >= tableMonitorInterval {
now := time.Now()
// Receive busyRatio indicates the blocking time (receive and decode grpc msg) of the worker.
busyRatio := receiveTime.Seconds() / now.Sub(startToWork).Seconds() * 100
metricReceiveBusyRatio.Set(busyRatio)
receiveTime = 0
// Process busyRatio indicates the working time (dispatch to region worker) of the worker.
busyRatio = processTime.Seconds() / now.Sub(startToWork).Seconds() * 100
metricProcessBusyRatio.Set(busyRatio)
processTime = 0

startToWork = now
}
}

failpoint.Inject("kvClientRegionReentrantError", func(op failpoint.Value) {
if op.(string) == "error" {
_ = worker.sendEvents(ctx, []*regionStatefulEvent{nil})
Expand Down Expand Up @@ -1163,7 +1110,6 @@ func (s *eventFeedSession) receiveFromStream(
return nil
}

startToProcess := time.Now()
size := cevent.Size()
if size > warnRecvMsgSizeThreshold {
regionCount := 0
Expand Down Expand Up @@ -1208,7 +1154,6 @@ func (s *eventFeedSession) receiveFromStream(
tsStat.commitTs.Store(maxCommitTs)
}
}
processTime += time.Since(startToProcess)
}
}
eg.Go(func() error {
Expand Down
4 changes: 2 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func prepareBenchMultiStore(b *testing.B, storeNum, regionNum int) (
go func() {
err := cdcClient.EventFeed(ctx,
regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
100, lockResolver, eventCh, false)
100, lockResolver, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down Expand Up @@ -296,7 +296,7 @@ func prepareBench(b *testing.B, regionNum int) (
go func() {
err := cdcClient.EventFeed(ctx,
regionspan.ComparableSpan{Start: []byte("a"), End: []byte("z")},
100, lockResolver, eventCh, false)
100, lockResolver, eventCh)
if errors.Cause(err) != context.Canceled {
b.Error(err)
}
Expand Down
Loading

0 comments on commit 6277d9a

Please sign in to comment.