Skip to content

Commit

Permalink
mcs: fix prepare checker panic (#7736)
Browse files Browse the repository at this point in the history
close #7735

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jan 19, 2024
1 parent 1a79fb0 commit 649cc8a
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 56 deletions.
12 changes: 4 additions & 8 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,23 +350,19 @@ func (c *RaftCluster) Start(s Server) error {
return nil
}

var once sync.Once

func (c *RaftCluster) checkServices() {
if c.isAPIServiceMode {
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName)
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.independentServices.Delete(mcsutils.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() {
if c.stopSchedulingJobs() || c.coordinator == nil {
c.initCoordinator(c.ctx, c, c.hbstreams)
} else {
once.Do(func() {
c.initCoordinator(c.ctx, c, c.hbstreams)
})
}
c.independentServices.Store(mcsutils.SchedulingServiceName, true)
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
c.independentServices.Store(mcsutils.SchedulingServiceName, true)
}
}
} else {
c.startSchedulingJobs(c, c.hbstreams)
Expand Down
66 changes: 66 additions & 0 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,3 +607,69 @@ func waitSyncFinish(re *require.Assertions, tc *tests.TestSchedulingCluster, typ
return tc.GetPrimaryServer().GetCluster().GetSharedConfig().GetStoreLimitByType(2, typ) == expectedLimit
})
}

type multipleServerTestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
cluster *tests.TestCluster
pdLeader *tests.TestServer
backendEndpoints string
}

func TestMultipleServerTestSuite(t *testing.T) {
suite.Run(t, new(multipleServerTestSuite))
}

func (suite *multipleServerTestSuite) SetupSuite() {
var err error
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`))
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 2)
re.NoError(err)

err = suite.cluster.RunInitialServers()
re.NoError(err)

leaderName := suite.cluster.WaitLeader()
suite.pdLeader = suite.cluster.GetServer(leaderName)
suite.backendEndpoints = suite.pdLeader.GetAddr()
re.NoError(suite.pdLeader.BootstrapCluster())
}

func (suite *multipleServerTestSuite) TearDownSuite() {
re := suite.Require()
suite.cluster.Destroy()
suite.cancel()
re.NoError(failpoint.Disable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs"))
}

func (suite *multipleServerTestSuite) TestReElectLeader() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)

rc := suite.pdLeader.GetServer().GetRaftCluster()
re.NotNil(rc)
regionLen := 100
regions := tests.InitRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
re.NoError(err)
}

originLeaderName := suite.pdLeader.GetLeader().GetName()
suite.pdLeader.ResignLeader()
newLeaderName := suite.cluster.WaitLeader()
re.NotEqual(originLeaderName, newLeaderName)

suite.pdLeader.ResignLeader()
newLeaderName = suite.cluster.WaitLeader()
re.Equal(originLeaderName, newLeaderName)

rc = suite.pdLeader.GetServer().GetRaftCluster()
rc.IsPrepared()
}
52 changes: 4 additions & 48 deletions tests/server/region_syncer/region_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
Expand All @@ -35,15 +33,6 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.LeakOptions...)
}

type idAllocator struct {
allocator *mockid.IDAllocator
}

func (i *idAllocator) alloc() uint64 {
v, _ := i.allocator.Alloc()
return v
}

func TestRegionSyncer(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -77,7 +66,7 @@ func TestRegionSyncer(t *testing.T) {
})

regionLen := 110
regions := initRegions(regionLen)
regions := tests.InitRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
re.NoError(err)
Expand Down Expand Up @@ -186,7 +175,7 @@ func TestFullSyncWithAddMember(t *testing.T) {
rc := leaderServer.GetServer().GetRaftCluster()
re.NotNil(rc)
regionLen := 110
regions := initRegions(regionLen)
regions := tests.InitRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
re.NoError(err)
Expand Down Expand Up @@ -230,7 +219,7 @@ func TestPrepareChecker(t *testing.T) {
rc := leaderServer.GetServer().GetRaftCluster()
re.NotNil(rc)
regionLen := 110
regions := initRegions(regionLen)
regions := tests.InitRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
re.NoError(err)
Expand Down Expand Up @@ -279,7 +268,7 @@ func TestPrepareCheckerWithTransferLeader(t *testing.T) {
rc := leaderServer.GetServer().GetRaftCluster()
re.NotNil(rc)
regionLen := 100
regions := initRegions(regionLen)
regions := tests.InitRegions(regionLen)
for _, region := range regions {
err = rc.HandleRegionHeartbeat(region)
re.NoError(err)
Expand All @@ -306,36 +295,3 @@ func TestPrepareCheckerWithTransferLeader(t *testing.T) {
re.True(rc.IsPrepared())
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/changeCoordinatorTicker"))
}

func initRegions(regionLen int) []*core.RegionInfo {
allocator := &idAllocator{allocator: mockid.NewIDAllocator()}
regions := make([]*core.RegionInfo, 0, regionLen)
for i := 0; i < regionLen; i++ {
r := &metapb.Region{
Id: allocator.alloc(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
Peers: []*metapb.Peer{
{Id: allocator.alloc(), StoreId: uint64(1)},
{Id: allocator.alloc(), StoreId: uint64(2)},
{Id: allocator.alloc(), StoreId: uint64(3)},
},
}
region := core.NewRegionInfo(r, r.Peers[0], core.SetSource(core.Heartbeat))
// Here is used to simulate the upgrade process.
if i < regionLen/2 {
buckets := &metapb.Buckets{
RegionId: r.Id,
Keys: [][]byte{r.StartKey, r.EndKey},
Version: 1,
}
region.UpdateBuckets(buckets, region.GetBuckets())
}
regions = append(regions, region)
}
return regions
}
44 changes: 44 additions & 0 deletions tests/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
sc "github.com/tikv/pd/pkg/mcs/scheduling/server/config"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/versioninfo"
Expand Down Expand Up @@ -365,3 +366,46 @@ func (s *SchedulingTestEnvironment) startCluster(m mode) {
s.clusters[apiMode] = cluster
}
}

type idAllocator struct {
allocator *mockid.IDAllocator
}

func (i *idAllocator) alloc() uint64 {
v, _ := i.allocator.Alloc()
return v
}

// InitRegions is used for test purpose.
func InitRegions(regionLen int) []*core.RegionInfo {
allocator := &idAllocator{allocator: mockid.NewIDAllocator()}
regions := make([]*core.RegionInfo, 0, regionLen)
for i := 0; i < regionLen; i++ {
r := &metapb.Region{
Id: allocator.alloc(),
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
Peers: []*metapb.Peer{
{Id: allocator.alloc(), StoreId: uint64(1)},
{Id: allocator.alloc(), StoreId: uint64(2)},
{Id: allocator.alloc(), StoreId: uint64(3)},
},
}
region := core.NewRegionInfo(r, r.Peers[0], core.SetSource(core.Heartbeat))
// Here is used to simulate the upgrade process.
if i < regionLen/2 {
buckets := &metapb.Buckets{
RegionId: r.Id,
Keys: [][]byte{r.StartKey, r.EndKey},
Version: 1,
}
region.UpdateBuckets(buckets, region.GetBuckets())
}
regions = append(regions, region)
}
return regions
}

0 comments on commit 649cc8a

Please sign in to comment.