Skip to content

Commit

Permalink
coordinator: fix too much cursor causes by ensemble breaking
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Jun 13, 2024
1 parent 9696873 commit f6e7382
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions coordinator/impl/shard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ func (s *shardController) electLeader() error {

func (s *shardController) deletingRemovedNodes() error {
for _, ds := range s.shardMetadata.RemovedNodes {
if _, err := s.newTerm(s.ctx, ds); err != nil {
return err
}
if _, err := s.rpc.DeleteShard(s.ctx, ds, &proto.DeleteShardRequest{
Namespace: s.namespace,
ShardId: s.shard,
Expand Down Expand Up @@ -497,8 +500,8 @@ func (s *shardController) internalNewTermAndAddFollower(ctx context.Context, nod
func (s *shardController) newTermQuorum() (map[model.ServerAddress]*proto.EntryId, error) { //nolint:revive
timer := s.newTermQuorumLatency.Timer()

fencingQuorum := mergeLists(s.shardMetadata.Ensemble, s.shardMetadata.RemovedNodes)
fencingQuorumSize := len(fencingQuorum)
ensembleQuorum := s.shardMetadata.Ensemble
fencingQuorumSize := len(ensembleQuorum)
majority := fencingQuorumSize/2 + 1

// Use a new context, so we can cancel the pending requests
Expand All @@ -512,7 +515,7 @@ func (s *shardController) newTermQuorum() (map[model.ServerAddress]*proto.EntryI
error
}, fencingQuorumSize)

for _, sa := range fencingQuorum {
for _, sa := range ensembleQuorum {
// We need to save the address because it gets modified in the loop
serverAddress := sa
go common.DoWithLabels(
Expand Down Expand Up @@ -559,11 +562,8 @@ func (s *shardController) newTermQuorum() (map[model.ServerAddress]*proto.EntryI
totalResponses++
if r.error == nil {
successResponses++

// We don't consider the removed nodes as candidates for leader/followers
if listContains(s.shardMetadata.Ensemble, r.ServerAddress) {
res[r.ServerAddress] = r.EntryId
}
res[r.ServerAddress] = r.EntryId
} else {
err = multierr.Append(err, r.error)
}
Expand Down

0 comments on commit f6e7382

Please sign in to comment.