Skip to content

Commit

Permalink
Merge branch 'master' into fix_owner_block_http_api
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Nov 18, 2021
2 parents f0d89cf + fe92b89 commit 932e945
Show file tree
Hide file tree
Showing 57 changed files with 3,546 additions and 156 deletions.
34 changes: 29 additions & 5 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/owner"
"github.com/pingcap/ticdc/cdc/processor"
"github.com/pingcap/ticdc/cdc/processor/pipeline/system"
"github.com/pingcap/ticdc/pkg/config"
cdcContext "github.com/pingcap/ticdc/pkg/context"
cerror "github.com/pingcap/ticdc/pkg/errors"
Expand Down Expand Up @@ -60,6 +61,8 @@ type Capture struct {
etcdClient *etcd.CDCEtcdClient
grpcPool kv.GrpcPool

tableActorSystem *system.System

cancel context.CancelFunc

newProcessorManager func() *processor.Manager
Expand Down Expand Up @@ -103,6 +106,19 @@ func (c *Capture) reset(ctx context.Context) error {
if c.grpcPool != nil {
c.grpcPool.Close()
}
if c.tableActorSystem != nil {
err := c.tableActorSystem.Stop()
if err != nil {
log.Warn("stop table actor system failed", zap.Error(err))
}
}
if conf.Debug.EnableTableActor {
c.tableActorSystem = system.NewSystem()
err = c.tableActorSystem.Start(ctx)
if err != nil {
return errors.Annotate(cerror.WrapError(cerror.ErrNewCaptureFailed, err), "create capture session")
}
}
c.grpcPool = kv.NewGrpcPoolImpl(ctx, conf.Security)
log.Info("init capture", zap.String("capture-id", c.info.ID), zap.String("capture-addr", c.info.AdvertiseAddr))
return nil
Expand Down Expand Up @@ -148,11 +164,12 @@ func (c *Capture) Run(ctx context.Context) error {

func (c *Capture) run(stdCtx context.Context) error {
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
PDClient: c.pdClient,
KVStorage: c.kvStorage,
CaptureInfo: c.info,
EtcdClient: c.etcdClient,
GrpcPool: c.grpcPool,
TableActorSystem: c.tableActorSystem,
})
err := c.register(ctx)
if err != nil {
Expand Down Expand Up @@ -354,6 +371,13 @@ func (c *Capture) AsyncClose() {
if c.grpcPool != nil {
c.grpcPool.Close()
}
if c.tableActorSystem != nil {
err := c.tableActorSystem.Stop()
if err != nil {
log.Warn("stop table actor system failed", zap.Error(err))
}
c.tableActorSystem = nil
}
}

// WriteDebugInfo writes the debug info into writer.
Expand Down
2 changes: 2 additions & 0 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/cdc/sorter"
"github.com/pingcap/ticdc/cdc/sorter/leveldb"
"github.com/pingcap/ticdc/cdc/sorter/leveldb/system"
"github.com/pingcap/ticdc/cdc/sorter/memory"
"github.com/pingcap/ticdc/cdc/sorter/unified"
"github.com/pingcap/ticdc/pkg/actor"
Expand Down Expand Up @@ -53,4 +54,5 @@ func init() {
memory.InitMetrics(registry)
unified.InitMetrics(registry)
leveldb.InitMetrics(registry)
system.InitMetrics(registry)
}
44 changes: 44 additions & 0 deletions cdc/processor/pipeline/system/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package system

import (
"context"

"github.com/pingcap/ticdc/pkg/actor"
)

// System manages table pipeline global resource.
type System struct {
tableActorSystem *actor.System
tableActorRouter *actor.Router
}

// NewSystem returns a system.
func NewSystem() *System {
return &System{}
}

// Start starts a system.
func (s *System) Start(ctx context.Context) error {
// todo: make the table actor system configurable
s.tableActorSystem, s.tableActorRouter = actor.NewSystemBuilder("table").Build()
s.tableActorSystem.Start(ctx)
return nil
}

// Stop stops a system.
func (s *System) Stop() error {
return s.tableActorSystem.Stop()
}
29 changes: 29 additions & 0 deletions cdc/processor/pipeline/system/system_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package system

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestStartAndStopSystem(t *testing.T) {
t.Parallel()

s := NewSystem()
require.Nil(t, s.Start(context.TODO()))
require.Nil(t, s.Stop())
}
Loading

0 comments on commit 932e945

Please sign in to comment.