Skip to content

Commit

Permalink
Rename "packetsNotCommitted" to "packetsNotLogged" in Learner.syncWit…
Browse files Browse the repository at this point in the history
…hLeader(..) to make it more understandable
  • Loading branch information
AlphaCanisMajoris committed Apr 2, 2024
1 parent cdd13ca commit f95a261
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
boolean syncSnapshot = false;
readPacket(qp);
Deque<Long> packetsCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotCommitted = new ArrayDeque<>();
Deque<PacketInFlight> packetsNotLogged = new ArrayDeque<>();

synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
Expand Down Expand Up @@ -644,11 +644,11 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
self.setLastSeenQuorumVerifier(qv, true);
}

packetsNotCommitted.add(pif);
packetsNotLogged.add(pif);
break;
case Leader.COMMIT:
case Leader.COMMITANDACTIVATE:
pif = packetsNotCommitted.peekFirst();
pif = packetsNotLogged.peekFirst();
if (pif.hdr.getZxid() == qp.getZxid() && qp.getType() == Leader.COMMITANDACTIVATE) {
QuorumVerifier qv = self.configFromString(new String(((SetDataTxn) pif.rec).getData(), UTF_8));
boolean majorChange = self.processReconfig(
Expand All @@ -667,7 +667,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
Long.toHexString(pif.hdr.getZxid()));
} else {
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
packetsNotLogged.remove();
}
} else {
packetsCommitted.add(qp.getZxid());
Expand Down Expand Up @@ -709,7 +709,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// Apply to db directly if we haven't taken the snapshot
zk.processTxn(packet.hdr, packet.rec);
} else {
packetsNotCommitted.add(packet);
packetsNotLogged.add(packet);
packetsCommitted.add(qp.getZxid());
}

Expand Down Expand Up @@ -758,19 +758,19 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {

/*
* @see https://github.com/apache/zookeeper/pull/1848
* Persist and process the committed txns in "packetsNotCommitted"
* Persist and process the committed txns in "packetsNotLogged"
* according to "packetsCommitted", which have been committed by
* the leader. For these committed proposals, there is no need to
* reply ack.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-4394
* Keep the outstanding proposals in "packetsNotCommitted" to avoid
* Keep the outstanding proposals in "packetsNotLogged" to avoid
* NullPointerException when the follower receives COMMIT packet(s)
* right after replying NEWLEADER ack.
*/
while (!packetsCommitted.isEmpty()) {
long zxid = packetsCommitted.removeFirst();
pif = packetsNotCommitted.peekFirst();
pif = packetsNotLogged.peekFirst();
if (pif == null) {
LOG.warn("Committing 0x{}, but got no proposal", Long.toHexString(zxid));
continue;
Expand All @@ -779,7 +779,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
Long.toHexString(zxid), Long.toHexString(pif.hdr.getZxid()));
continue;
}
packetsNotCommitted.removeFirst();
packetsNotLogged.removeFirst();
fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
fzk.processTxn(pif.hdr, pif.rec);
}
Expand All @@ -788,8 +788,8 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// Make sure to persist the txns to disk before replying NEWLEADER ack.
fzk.getZKDatabase().commit();
LOG.info("It took {}ms to persist and commit txns in packetsCommitted. "
+ "{} outstanding txns left in packetsNotCommitted",
Time.currentElapsedTime() - startTime, packetsNotCommitted.size());
+ "{} outstanding txns left in packetsNotLogged",
Time.currentElapsedTime() - startTime, packetsNotLogged.size());
}

// @see https://issues.apache.org/jira/browse/ZOOKEEPER-4643
Expand All @@ -799,7 +799,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
LOG.info("Set the current epoch to {}", newEpoch);

// Now we almost complete the synchronization phase. Start RequestProcessors
// to asynchronously process the pending txns in "packetsNotCommitted" and
// to asynchronously process the pending txns in "packetsNotLogged" and
// "packetsCommitted" later.
sock.setSoTimeout(self.tickTime * self.syncLimit);
self.setSyncMode(QuorumPeer.SyncMode.NONE);
Expand Down Expand Up @@ -827,10 +827,10 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// We need to log the stuff that came in between the snapshot and the uptodate
if (zk instanceof FollowerZooKeeperServer) {
FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
for (PacketInFlight p : packetsNotLogged) {
fzk.logRequest(p.hdr, p.rec, p.digest);
}
LOG.info("{} txns have been logged asynchronously", packetsNotCommitted.size());
LOG.info("{} txns have been logged asynchronously", packetsNotLogged.size());

for (Long zxid : packetsCommitted) {
fzk.commit(zxid);
Expand All @@ -840,7 +840,7 @@ protected void syncWithLeader(long newLeaderZxid) throws Exception {
// Similar to follower, we need to log requests between the snapshot
// and UPTODATE
ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
for (PacketInFlight p : packetsNotCommitted) {
for (PacketInFlight p : packetsNotLogged) {
Long zxid = packetsCommitted.peekFirst();
if (p.hdr.getZxid() != zxid) {
// log warning message if there is no matching commit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,7 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f)

qp.setType(Leader.NEWLEADER);
qp.setZxid(ZxidUtils.makeZxid(1, 0));
qp.setData(null);
oa.writeRecord(qp, null);

// Get the ack of the new leader
Expand All @@ -897,10 +898,12 @@ public void converseWithFollower(InputArchive ia, OutputArchive oa, Follower f)
// Leader commits proposalZxid right after it sends NEWLEADER to follower
qp.setType(Leader.COMMIT);
qp.setZxid(proposalZxid);
qp.setData(null);
oa.writeRecord(qp, null);

qp.setType(Leader.UPTODATE);
qp.setZxid(0);
qp.setData(null);
oa.writeRecord(qp, null);

// Read the uptodate ack
Expand Down

0 comments on commit f95a261

Please sign in to comment.