Skip to content

Commit

Permalink
CID never created if Zero stops early after first init (#5719)
Browse files Browse the repository at this point in the history
* created common function for proposing a new cid

* checking for CID in entries

* checking for CID in entries and proposing a new one if not found

* changed a comment

* proposing CID until there is no CID is assigned to it instead of proposing CID infinitely (useful in the case when a restarted node is joining a cluster)
  • Loading branch information
antblood authored Jul 9, 2020
1 parent 5e7f443 commit 09ef988
Showing 1 changed file with 75 additions and 26 deletions.
101 changes: 75 additions & 26 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,73 @@ func (n *node) triggerLeaderChange() {
n.server.updateZeroLeader()
}

func (n *node) proposeNewCID() {
// Either this is a new cluster or can't find a CID in the entries. So, propose a new ID for the cluster.
// CID check is needed for the case when a leader assigns a CID to the new node and the new node is proposing a CID
for len(n.server.membershipState().Cid) == 0 {
id := uuid.New().String()
err := n.proposeAndWait(context.Background(), &pb.ZeroProposal{Cid: id})
if err == nil {
glog.Infof("CID set for cluster: %v", id)
break
}
if err == errInvalidProposal {
glog.Errorf("invalid proposal error while proposing cluster id")
return
}
glog.Errorf("While proposing CID: %v. Retrying...", err)
time.Sleep(3 * time.Second)
}

// Apply trial license only if not already licensed.
if n.server.license() == nil {
if err := n.proposeTrialLicense(); err != nil {
glog.Errorf("while proposing trial license to cluster: %v", err)
}
}
}

func (n *node) checkForCIDInEntries() (bool, error) {
first, err := n.Store.FirstIndex()
if err != nil {
return false, err
}
last, err := n.Store.LastIndex()
if err != nil {
return false, err
}

for batch := first; batch <= last; {
entries, err := n.Store.Entries(batch, last+1, 64<<20)
if err != nil {
return false, err
}

// Exit early from the loop if no entries were found.
if len(entries) == 0 {
break
}

// increment the iterator to the next batch
batch = entries[len(entries)-1].Index + 1

for _, entry := range entries {
if entry.Type != raftpb.EntryNormal {
continue
}
var proposal pb.ZeroProposal
err = proposal.Unmarshal(entry.Data)
if err != nil {
return false, err
}
if len(proposal.Cid) > 0 {
return true, err
}
}
}
return false, err
}

func (n *node) initAndStartNode() error {
_, restart, err := n.PastLife()
x.Check(err)
Expand All @@ -461,6 +528,13 @@ func (n *node) initAndStartNode() error {
}

n.SetRaft(raft.RestartNode(n.Cfg))
foundCID, err := n.checkForCIDInEntries()
if err != nil {
return err
}
if !foundCID {
go n.proposeNewCID()
}

case len(opts.peer) > 0:
p := conn.GetPools().Connect(opts.peer)
Expand Down Expand Up @@ -500,32 +574,7 @@ func (n *node) initAndStartNode() error {
x.Check(err)
peers := []raft.Peer{{ID: n.Id, Context: data}}
n.SetRaft(raft.StartNode(n.Cfg, peers))

go func() {
// This is a new cluster. So, propose a new ID for the cluster.
for {
id := uuid.New().String()
err := n.proposeAndWait(context.Background(), &pb.ZeroProposal{Cid: id})
if err == nil {
glog.Infof("CID set for cluster: %v", id)
x.WriteCidFile(id)
break
}
if err == errInvalidProposal {
glog.Errorf("invalid proposal error while proposing cluster id")
return
}
glog.Errorf("While proposing CID: %v. Retrying...", err)
time.Sleep(3 * time.Second)
}

// Apply trial license only if not already licensed.
if n.server.license() == nil {
if err := n.proposeTrialLicense(); err != nil {
glog.Errorf("while proposing trial license to cluster: %v", err)
}
}
}()
go n.proposeNewCID()
}

go n.Run()
Expand Down

0 comments on commit 09ef988

Please sign in to comment.