Skip to content

Commit

Permalink
feat: merge optimize from etcd etcd-io/etcd#10892 & etcd-io/etcd#10920
Browse files Browse the repository at this point in the history
  • Loading branch information
absolute8511 committed Jul 8, 2021
1 parent a3bf044 commit 79f1c52
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 294 deletions.
6 changes: 5 additions & 1 deletion node/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,11 @@ func (rc *raftNode) startRaft(ds DataStorage, standalone bool) error {
if rc.join {
startPeers = nil
}
rc.node = raft.StartNode(c, startPeers, isLearner)
if len(startPeers) == 0 {
rc.node = raft.RestartNode(c)
} else {
rc.node = raft.StartNode(c, startPeers, isLearner)
}
}
rc.initForTransport()
rc.wgServe.Add(1)
Expand Down
97 changes: 97 additions & 0 deletions raft/bootstrap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package raft

import (
"errors"

pb "github.com/youzan/ZanRedisDB/raft/raftpb"
)

// Bootstrap initializes the RawNode for first use by appending configuration
// changes for the supplied peers. This method returns an error if the Storage
// is nonempty.
//
// It is recommended that instead of calling this method, applications bootstrap
// their state manually by setting up a Storage that has a first index > 1 and
// which stores the desired ConfState as its InitialState.
func (rn *RawNode) Bootstrap(peers []Peer) error {
if len(peers) == 0 {
return errors.New("must provide at least one peer to Bootstrap")
}
lastIndex, err := rn.raft.raftLog.storage.LastIndex()
if err != nil {
return err
}

if lastIndex != 0 {
return errors.New("can't bootstrap a nonempty Storage")
}

// We've faked out initial entries above, but nothing has been
// persisted. Start with an empty HardState (thus the first Ready will
// emit a HardState update for the app to persist).
rn.prevHardSt = emptyState

// TODO(tbg): remove StartNode and give the application the right tools to
// bootstrap the initial membership in a cleaner way.
rn.raft.becomeFollower(1, None)
ents := make([]pb.Entry, len(peers))
for i, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, ReplicaID: peer.ReplicaID,
NodeGroup: pb.Group{NodeId: peer.NodeID, Name: rn.raft.group.Name, GroupId: rn.raft.group.GroupId,
RaftReplicaId: peer.ReplicaID},
Context: peer.Context}
data, err := cc.Marshal()
if err != nil {
return err
}
// TODO(tbg): this should append the ConfChange for the own node first
// and also call applyConfChange below for that node first. Otherwise
// we have a Raft group (for a little while) that doesn't have itself
// in its config, which is bad.
// This whole way of setting things up is rickety. The app should just
// populate the initial ConfState appropriately and then all of this
// goes away.
e := pb.Entry{
Type: pb.EntryConfChange,
Term: 1,
Index: uint64(i + 1),
Data: data,
}
ents[i] = e
}
rn.raft.raftLog.append(ents...)

// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
// be added to raft twice: here and when the application's Ready
// loop calls ApplyConfChange. The calls to addNode must come after
// all calls to raftLog.append so progress.next is set after these
// bootstrapping entries (it is an error if we try to append these
// entries since they have already been committed).
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
//
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
rn.raft.raftLog.committed = uint64(len(ents))
for _, peer := range peers {
grp := pb.Group{NodeId: peer.NodeID, Name: rn.raft.group.Name, GroupId: rn.raft.group.GroupId,
RaftReplicaId: peer.ReplicaID}
rn.raft.addNode(peer.ReplicaID, grp)
}
return nil
}
137 changes: 41 additions & 96 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,68 +211,37 @@ type Peer struct {
}

type prevState struct {
havePrevLastUnstablei bool
prevLastUnstablei uint64
prevLastUnstablet uint64
prevSnapi uint64
prevSoftSt *SoftState
prevHardSt pb.HardState
prevLead uint64
prevLead uint64
}

func newPrevState(r *raft) *prevState {
func newPrevState(r *RawNode) *prevState {
return &prevState{
prevSoftSt: r.softState(),
prevHardSt: emptyState,
prevLead: None,
prevLead: None,
}
}

// StartNode returns a new Node given configuration and a list of raft peers.
// It appends a ConfChangeAddNode entry for each given peer to the initial log.
func StartNode(c *Config, peers []Peer, isLearner bool) Node {
if isLearner {
c.learners = append(c.learners, c.Group)
}
r := newRaft(c)
// become the follower at term 1 and apply initial configuration
// entries of term 1
r.becomeFollower(1, None)
for _, peer := range peers {
cc := pb.ConfChange{Type: pb.ConfChangeAddNode, ReplicaID: peer.ReplicaID,
NodeGroup: pb.Group{NodeId: peer.NodeID, Name: r.group.Name, GroupId: r.group.GroupId,
RaftReplicaId: peer.ReplicaID},
Context: peer.Context}
d, err := cc.Marshal()
if err != nil {
panic("unexpected marshal error")
}
e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
r.raftLog.append(e)
}
// Mark these initial entries as committed.
// TODO(bdarnell): These entries are still unstable; do we need to preserve
// the invariant that committed < unstable?
r.raftLog.committed = r.raftLog.lastIndex()
// Now apply them, mainly so that the application can call Campaign
// immediately after StartNode in tests. Note that these nodes will
// be added to raft twice: here and when the application's Ready
// loop calls ApplyConfChange. The calls to addNode must come after
// all calls to raftLog.append so progress.next is set after these
// bootstrapping entries (it is an error if we try to append these
// entries since they have already been committed).
// We do not set raftLog.applied so the application will be able
// to observe all conf changes via Ready.CommittedEntries.
for _, peer := range peers {
r.addNode(peer.ReplicaID, pb.Group{NodeId: peer.NodeID, Name: r.group.Name, GroupId: r.group.GroupId,
RaftReplicaId: peer.ReplicaID})
//if isLearner {
// c.learners = append(c.learners, c.Group)
//}
if len(peers) == 0 {
panic("no peers given; use RestartNode instead")
}
//r := newRaft(c)

rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
rn.Bootstrap(peers)

n := newNode()
n.logger = c.Logger
n.r = r
n.prevS = newPrevState(r)
off := max(r.raftLog.applied+1, r.raftLog.firstIndex())
n.r = rn
n.prevS = newPrevState(rn)
off := max(rn.raft.raftLog.applied+1, rn.raft.raftLog.firstIndex())
n.lastSteppedIndex = off
n.NotifyEventCh()
return &n
Expand All @@ -283,13 +252,16 @@ func StartNode(c *Config, peers []Peer, isLearner bool) Node {
// If the caller has an existing state machine, pass in the last log index that
// has been applied to it; otherwise use zero.
func RestartNode(c *Config) Node {
r := newRaft(c)
rn, err := NewRawNode(c)
if err != nil {
panic(err)
}

n := newNode()
n.logger = c.Logger
n.r = r
n.prevS = newPrevState(r)
off := max(r.raftLog.applied+1, r.raftLog.firstIndex())
n.r = rn
n.prevS = newPrevState(rn)
off := max(rn.raft.raftLog.applied+1, rn.raft.raftLog.firstIndex())
n.lastSteppedIndex = off
n.NotifyEventCh()
return &n
Expand All @@ -306,7 +278,7 @@ type node struct {
stop chan struct{}
status chan chan Status
eventNotifyCh chan bool
r *raft
r *RawNode
prevS *prevState
newReadyFunc func(*raft, *SoftState, pb.HardState, bool) Ready
needAdvance bool
Expand Down Expand Up @@ -358,29 +330,29 @@ func (n *node) StepNode(moreEntriesToApply bool, busySnap bool) (Ready, bool) {
if busySnap && m.Type == pb.MsgApp {
// ignore msg app while busy snapshot
} else {
n.handleReceivedMessage(n.r, m)
n.handleReceivedMessage(n.r.raft, m)
}
msgs[i].Entries = nil
}
if n.handleTicks(n.r) {
if n.handleTicks(n.r.raft) {
hasEvent = true
}
needHandleProposal := n.handleLeaderUpdate(n.r)
needHandleProposal := n.handleLeaderUpdate(n.r.raft)
var ev bool
ev, needHandleProposal = n.handleConfChanged(n.r, needHandleProposal)
ev, needHandleProposal = n.handleConfChanged(n.r.raft, needHandleProposal)
if ev {
hasEvent = ev
}
if needHandleProposal {
props := n.propQ.Get()
for _, p := range props {
hasEvent = true
n.handleProposal(n.r, p)
n.handleProposal(n.r.raft, p)
}
}
n.handleStatus(n.r)
n.handleStatus(n.r.raft)
_ = hasEvent
rd := n.newReadyFunc(n.r, n.prevS.prevSoftSt, n.prevS.prevHardSt, moreEntriesToApply)
rd := n.r.readyWithoutAccept(moreEntriesToApply)
if rd.containsUpdates() {
n.needAdvance = true
var stepIndex uint64
Expand All @@ -391,8 +363,8 @@ func (n *node) StepNode(moreEntriesToApply bool, busySnap bool) (Ready, bool) {
fi := rd.CommittedEntries[0].Index
if n.lastSteppedIndex != 0 && fi > n.lastSteppedIndex+1 {
e := fmt.Sprintf("raft.node: %x(%v) index not continued: %v, %v, %v, snap:%v, prev: %v, logs: %v ",
n.r.id, n.r.group, fi, n.lastSteppedIndex, stepIndex, rd.Snapshot.Metadata.String(), n.prevS,
n.r.raftLog.String())
n.r.raft.id, n.r.raft.group, fi, n.lastSteppedIndex, stepIndex, rd.Snapshot.Metadata.String(), n.prevS,
n.r.raft.raftLog.String())
n.logger.Error(e)
}
stepIndex = rd.CommittedEntries[len(rd.CommittedEntries)-1].Index
Expand All @@ -404,10 +376,10 @@ func (n *node) StepNode(moreEntriesToApply bool, busySnap bool) (Ready, bool) {
}

func (n *node) DebugString() string {
ents := n.r.raftLog.allEntries()
ents := n.r.raft.raftLog.allEntries()
e := fmt.Sprintf("raft.node: %x(%v) index not continued: %v, prev: %v, logs: %v, %v ",
n.r.id, n.r.group, n.lastSteppedIndex, n.prevS, len(ents),
n.r.raftLog.String())
n.r.raft.id, n.r.raft.group, n.lastSteppedIndex, n.prevS, len(ents),
n.r.raft.raftLog.String())
return e
}

Expand Down Expand Up @@ -475,35 +447,8 @@ func (n *node) addReqMessageToQueue(req pb.Message) {
}

func (n *node) Advance(rd Ready) {
if rd.SoftState != nil {
n.prevS.prevSoftSt = rd.SoftState
}
if len(rd.Entries) > 0 {
n.prevS.prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
n.prevS.prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
n.prevS.havePrevLastUnstablei = true
}
if !IsEmptyHardState(rd.HardState) {
n.prevS.prevHardSt = rd.HardState
}
if !IsEmptySnap(rd.Snapshot) {
n.prevS.prevSnapi = rd.Snapshot.Metadata.Index
}

n.r.msgs = nil
n.r.readStates = nil

appliedI := rd.appliedCursor()
if appliedI != 0 {
// since the committed entries may less than the hard commit index due to the
// limit for buffer len, we should not use the hard state commit index.
n.r.raftLog.appliedTo(appliedI)
}
if n.prevS.havePrevLastUnstablei {
n.r.raftLog.stableTo(n.prevS.prevLastUnstablei, n.prevS.prevLastUnstablet)
n.prevS.havePrevLastUnstablei = false
}
n.r.raftLog.stableSnapTo(n.prevS.prevSnapi)
n.r.acceptReady(rd)
n.r.Advance(rd)
n.needAdvance = false
}

Expand All @@ -512,7 +457,7 @@ func (n *node) ConfChangedCh() <-chan pb.ConfChange {
}

func (n *node) HandleConfChanged(cc pb.ConfChange) {
n.processConfChanged(n.r, cc, true)
n.processConfChanged(n.r.raft, cc, true)
}

func (n *node) handleConfChanged(r *raft, needHandleProposal bool) (bool, bool) {
Expand Down
2 changes: 1 addition & 1 deletion raft/node_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func BenchmarkOneNode(b *testing.B) {
n := newNode()
s := NewMemoryStorage()
defer s.Close()
r := newTestRaft(1, []uint64{1}, 10, 1, s)
r := newTestRawNode(1, []uint64{1}, 10, 1, s)
n.r = r
n.prevS = newPrevState(r)
defer n.Stop()
Expand Down
Loading

0 comments on commit 79f1c52

Please sign in to comment.