Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv-client: use worker pool in region worker #1481

Merged
48 changes: 46 additions & 2 deletions cdc/kv/client_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,34 @@ func benchmarkSingleWorkerResolvedTs(b *testing.B, clientV2 bool) {
}
}

func benchmarkResolvedTsClientV2(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
InitWorkerPool()
go func() {
RunWorkerPool(ctx) //nolint:errcheck
}()
benchmarkSingleWorkerResolvedTs(b, true /* clientV2 */)
}

func BenchmarkResolvedTsClientV1(b *testing.B) {
benchmarkSingleWorkerResolvedTs(b, false /* clientV1 */)
}

func BenchmarkResolvedTsClientV2(b *testing.B) {
benchmarkSingleWorkerResolvedTs(b, true /* clientV2 */)
benchmarkResolvedTsClientV2(b)
}

func BenchmarkResolvedTsClientV2WorkerPool(b *testing.B) {
hwm := regionWorkerHighWatermark
lwm := regionWorkerLowWatermark
regionWorkerHighWatermark = 10000
regionWorkerLowWatermark = 2000
defer func() {
regionWorkerHighWatermark = hwm
regionWorkerLowWatermark = lwm
}()
benchmarkResolvedTsClientV2(b)
}

func benchmarkMultipleStoreResolvedTs(b *testing.B, clientV2 bool) {
Expand Down Expand Up @@ -490,10 +512,32 @@ func benchmarkMultipleStoreResolvedTs(b *testing.B, clientV2 bool) {
}
}

func benchmarkMultiStoreResolvedTsClientV2(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
InitWorkerPool()
go func() {
RunWorkerPool(ctx) //nolint:errcheck
}()
benchmarkMultipleStoreResolvedTs(b, true /* clientV2 */)
}

func BenchmarkMultiStoreResolvedTsClientV1(b *testing.B) {
benchmarkMultipleStoreResolvedTs(b, false /* clientV1 */)
}

func BenchmarkMultiStoreResolvedTsClientV2(b *testing.B) {
benchmarkMultipleStoreResolvedTs(b, true /* clientV2 */)
benchmarkMultiStoreResolvedTsClientV2(b)
}

func BenchmarkMultiStoreResolvedTsClientV2WorkerPool(b *testing.B) {
hwm := regionWorkerHighWatermark
lwm := regionWorkerLowWatermark
regionWorkerHighWatermark = 1000
regionWorkerLowWatermark = 200
defer func() {
regionWorkerHighWatermark = hwm
regionWorkerLowWatermark = lwm
}()
benchmarkMultiStoreResolvedTsClientV2(b)
}
60 changes: 57 additions & 3 deletions cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/regionspan"
"github.com/pingcap/ticdc/pkg/retry"
Expand All @@ -46,7 +47,15 @@ import (
"google.golang.org/grpc"
)

func Test(t *testing.T) { check.TestingT(t) }
func Test(t *testing.T) {
conf := config.GetDefaultServerConfig()
config.StoreGlobalServerConfig(conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we set a default config in init function?

InitWorkerPool()
go func() {
RunWorkerPool(context.Background()) //nolint:errcheck
}()
check.TestingT(t)
}

type clientSuite struct {
}
Expand Down Expand Up @@ -688,8 +697,7 @@ func (s *etcdSuite) TestCompatibilityWithSameConn(c *check.C) {
cancel()
}

func (s *etcdSuite) TestHandleFeedEvent(c *check.C) {
defer testleak.AfterTest(c)()
func (s *etcdSuite) testHandleFeedEvent(c *check.C) {
defer s.TearDownTest(c)
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
Expand Down Expand Up @@ -966,6 +974,17 @@ func (s *etcdSuite) TestHandleFeedEvent(c *check.C) {
Ts: 145,
},
}
multiSize := 100
regions := make([]uint64, multiSize)
for i := range regions {
regions[i] = 3
}
multipleResolved := &cdcpb.ChangeDataEvent{
ResolvedTs: &cdcpb.ResolvedTs{
Regions: regions,
Ts: 160,
},
}

expected := []*model.RegionFeedEvent{
{
Expand Down Expand Up @@ -1055,6 +1074,13 @@ func (s *etcdSuite) TestHandleFeedEvent(c *check.C) {
RegionID: 3,
},
}
multipleExpected := &model.RegionFeedEvent{
Resolved: &model.ResolvedSpan{
Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")},
ResolvedTs: 160,
},
RegionID: 3,
}

ch1 <- eventsBeforeInit
ch1 <- initialized
Expand All @@ -1071,9 +1097,37 @@ func (s *etcdSuite) TestHandleFeedEvent(c *check.C) {
}
}

ch1 <- multipleResolved
for i := 0; i < multiSize; i++ {
select {
case event := <-eventCh:
c.Assert(event, check.DeepEquals, multipleExpected)
case <-time.After(time.Second):
c.Errorf("expected event %v not received", multipleExpected)
}
}

cancel()
}

func (s *etcdSuite) TestHandleFeedEvent(c *check.C) {
defer testleak.AfterTest(c)()
s.testHandleFeedEvent(c)
}

func (s *etcdSuite) TestHandleFeedEventWithWorkerPool(c *check.C) {
defer testleak.AfterTest(c)()
hwm := regionWorkerHighWatermark
lwm := regionWorkerLowWatermark
regionWorkerHighWatermark = 8
regionWorkerLowWatermark = 2
defer func() {
regionWorkerHighWatermark = hwm
regionWorkerLowWatermark = lwm
}()
s.testHandleFeedEvent(c)
}

// TestStreamSendWithError mainly tests the scenario that the `Send` call of a gPRC
// stream of kv client meets error, and kv client can clean up the broken stream,
// establish a new one and recover the normal evend feed processing.
Expand Down
9 changes: 9 additions & 0 deletions cdc/kv/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ type regionStatefulEvent struct {
changeEvent *cdcpb.Event
resolvedTs *cdcpb.ResolvedTs
state *regionFeedState

// regionID is used for load balancer, we don't use fileds in state to reduce lock usage
regionID uint64

// finishedCounter is used to mark events that are sent from a give region
// worker to this worker(one of the workers in worker pool) are all processed.
finishedCounter *int32
}

func (s *eventFeedSession) sendRegionChangeEventV2(
Expand Down Expand Up @@ -116,6 +123,7 @@ func (s *eventFeedSession) sendRegionChangeEventV2(
return ctx.Err()
case worker.inputCh <- &regionStatefulEvent{
changeEvent: event,
regionID: event.RegionId,
state: state,
}:
}
Expand All @@ -142,6 +150,7 @@ func (s *eventFeedSession) sendResolvedTsV2(
select {
case worker.inputCh <- &regionStatefulEvent{
resolvedTs: resolvedTs,
regionID: regionID,
state: state,
}:
case <-ctx.Done():
Expand Down
Loading