-
Notifications
You must be signed in to change notification settings - Fork 0
/
zk_pr_2152.tla
295 lines (274 loc) · 15.5 KB
/
zk_pr_2152.tla
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
----------------------------- MODULE zk_pr_2152 -----------------------------
(*
Specification for verifying the code fix in
https://github.com/apache/zookeeper/pull/2152
Targeting: Jira 3023, 4394, 4643, 4646 & 4685 in ZooKeeper.
*)
EXTENDS zk_test_thread
-----------------------------------------------------------------------------
\* VARIABLES
varsFix == <<coreVars, threadVars>>
-----------------------------------------------------------------------------
InitFix == /\ CoreInit
/\ InitThreadVars
-----------------------------------------------------------------------------
\* Utils
RECURSIVE packetsToCommitInSync(_,_,_)
packetsToCommitInSync(packets, ZxidsToCommit, selected) ==
IF ZxidsToCommit = << >> THEN <<packets, selected>>
ELSE IF packets = << >> THEN packetsToCommitInSync(<< >>, Tail(ZxidsToCommit), selected)
ELSE LET firstZxidToCommit == ZxidsToCommit[1]
firstPacket == packets[1]
match == TxnZxidEqual(firstPacket, firstZxidToCommit)
IN IF ~match
THEN packetsToCommitInSync(packets, Tail(ZxidsToCommit), selected)
ELSE packetsToCommitInSync(Tail(packets), Tail(ZxidsToCommit), Append(selected, firstPacket))
\* See lastProposed in Leader for details.
LastNewProposed(i) == IF Len(history'[i]) = 0 THEN [ index |-> 0,
zxid |-> <<0, 0>> ]
ELSE
LET lastIndex == Len(history'[i])
entry == history'[i][lastIndex]
IN [ index |-> lastIndex,
zxid |-> entry.zxid ]
FollowerSyncAndCommitInitialLogEntries(i) ==
LET packets == packetsSync[i].notCommitted
ZxidsToCommit == packetsSync[i].committed
packetsLeftAndToCommit == packetsToCommitInSync(packets, ZxidsToCommit, << >>)
packetsLeft == packetsLeftAndToCommit[1]
packetsToCommit == packetsLeftAndToCommit[2]
IN /\ history' = [history EXCEPT ![i] = @ \o packetsToCommit]
/\ initialHistory' = [initialHistory EXCEPT ![i] = history'[i]]
/\ lastCommitted' = [lastCommitted EXCEPT ![i] = LastNewProposed(i) ]
/\ lastProcessed' = [lastProcessed EXCEPT ![i] = lastCommitted'[i]]
/\ packetsSync' = [packetsSync EXCEPT ![i] = [ notCommitted |-> packetsLeft,
committed |-> << >> ]]
/\ UNCHANGED <<queuedRequests, committedRequests>>
-----------------------------------------------------------------------------
\* Fix in https://github.com/apache/zookeeper/pull/2152
(* Persist and commit requests according to "packetsCommitted". *)
FollowerProcessNEWLEADER_pr2152_1(i, j) ==
/\ IsON(i)
/\ IsFollower(i)
/\ PendingNEWLEADER(i, j)
/\ currentEpoch[i] /= acceptedEpoch[i]
/\ connectInfo[i].nlRcv = FALSE
/\ LET infoOk == IsMyLeader(i, j)
IN /\ infoOk
/\ \/ /\ SnapshotNeeded(i)
/\ TakeSnapshot(i)
\/ /\ ~SnapshotNeeded(i)
/\ UNCHANGED <<lastSnapshot, daInv>>
/\ FollowerSyncAndCommitInitialLogEntries(i)
/\ connectInfo' = [ connectInfo EXCEPT ![i].nlRcv = TRUE ]
/\ UNCHANGED currentEpoch
/\ UNCHANGED <<servingState>>
/\ UNCHANGED <<rcvBuffer>>
/\ UNCHANGED <<state, zabState, acceptedEpoch, hzxid, leaderVars, electionVars, envVars, verifyVars>>
/\ LET msg == rcvBuffer[j][i][1]
IN UpdateRecorder(<<"FollowerProcessNEWLEADER_pr2152_1", i, j, msg.mzxid>>)
/\ UpdateAfterAction
(* Update currentEpoch after persisting committed history. *)
FollowerProcessNEWLEADER_pr2152_2(i, j) ==
/\ IsON(i)
/\ IsFollower(i)
/\ PendingNEWLEADER(i, j)
/\ currentEpoch[i] /= acceptedEpoch[i]
/\ connectInfo[i].nlRcv = TRUE
/\ UNCHANGED <<lastSnapshot, daInv>>
/\ currentEpoch' = [currentEpoch EXCEPT ![i] = acceptedEpoch[i] ]
/\ servingState' = [servingState EXCEPT ![i] = INITIAL ]
/\ connectInfo' = [connectInfo EXCEPT ![i].syncMode = NONE ]
/\ UNCHANGED <<history, initialHistory, lastCommitted, lastProcessed, packetsSync, queuedRequests, committedRequests>>
/\ UNCHANGED <<rcvBuffer>>
/\ UNCHANGED <<state, zabState, acceptedEpoch, hzxid, leaderVars, electionVars, envVars, verifyVars>>
/\ LET msg == rcvBuffer[j][i][1]
IN UpdateRecorder(<<"FollowerProcessNEWLEADER_pr2152_2", i, j, msg.mzxid>>)
/\ UpdateAfterAction
(* Reply ACK-NEWLEADER at last of processing NEWLEADER. *)
FollowerProcessNEWLEADER_pr2152_3(i, j) ==
/\ IsON(i)
/\ IsFollower(i)
/\ PendingNEWLEADER(i, j)
/\ currentEpoch[i] = acceptedEpoch[i]
/\ connectInfo[i].nlRcv = TRUE
/\ LET msg == rcvBuffer[j][i][1]
infoOk == IsMyLeader(i, j)
m_ackld == [ mtype |-> ACK,
mzxid |-> msg.mzxid ]
IN /\ infoOk
/\ Reply(i, j, m_ackld)
/\ UNCHANGED <<currentEpoch, queuedRequests, connectInfo, packetsSync>>
/\ UNCHANGED <<state, zabState, servingState, acceptedEpoch, initialHistory, lastCommitted, lastProcessed, lastSnapshot, hzxid, committedRequests, history, leaderVars, electionVars, envVars, verifyVars, daInv>>
/\ LET msg == rcvBuffer[j][i][1]
IN UpdateRecorder(<<"FollowerProcessNEWLEADER_pr2152_3", i, j, msg.mzxid>>)
/\ UpdateAfterAction
-----------------------------------------------------------------------------
Shutdown_fix(S, crashSet) ==
/\ state' = [s \in Server |-> IF s \in S THEN LOOKING ELSE state[s] ]
/\ zabState' = [s \in Server |-> IF s \in S THEN ELECTION ELSE zabState[s] ]
/\ servingState' = [s \in Server |-> IF s \in S THEN SHUTDOWN ELSE servingState[s] ]
/\ connectInfo' = [s \in Server |-> IF s \in S THEN InitialConnectInfo ELSE connectInfo[s] ]
/\ CleanInputBuffer(S)
\* If SyncRequestProcessor starts, it logs all pending txns if not crashed
/\ history' = [s \in Server |->
LET inCluster == s \in S
inCrashSet == s \in crashSet
syncProcessorReady == servingState[s] /= SHUTDOWN
pengdingProposal == \/ queuedRequests[s] /= << >>
\/ packetsSync[s].notCommitted /= << >>
IN IF /\ inCluster
/\ ~inCrashSet
/\ syncProcessorReady
/\ pengdingProposal
/\ IsLeader(s) \* Actually Leader's SyncRequestProcessor is not modeled in this spec, so its queuedRequests[s] and packetsSync[s].notCommitted are empty
THEN history[s] \o queuedRequests[s] \o packetsSync[s].notCommitted
ELSE history[s] ]
/\ initialHistory' = [s \in Server |-> IF s \in S THEN history'[s]
ELSE initialHistory[s] ]
/\ lastCommitted' = [s \in Server |-> IF s \in S THEN [ index |-> 0, zxid |-> <<0, 0>> ]
ELSE lastCommitted[s] ]
\* clear volatile data
\* TODO:
/\ queuedRequests' = [s \in Server |-> IF s \in S THEN << >> ELSE queuedRequests[s] ]
/\ committedRequests' = [s \in Server |-> IF s \in S THEN << >>
ELSE committedRequests[s] ]
/\ packetsSync' = [s \in Server |-> IF s \in S THEN [ notCommitted |-> << >>,
committed |-> << >> ]
ELSE packetsSync[s] ]
/\ lastProcessed' = [s \in Server |-> IF s \in S THEN InitLastProcessed(s)
ELSE lastProcessed[s] ]
\* see ZooKeeperServer.loadData()
/\ hzxid' = [s \in Server |-> IF s \in S THEN lastProcessed'[s].zxid ELSE hzxid[s]]
FollowerShutdown_fix(i, isCrash) ==
/\ state' = [state EXCEPT ![i] = LOOKING]
/\ zabState' = [zabState EXCEPT ![i] = ELECTION]
/\ servingState' = [servingState EXCEPT ![i] = SHUTDOWN]
/\ connectInfo' = [connectInfo EXCEPT ![i] = InitialConnectInfo]
/\ lastCommitted' = [lastCommitted EXCEPT ![i] = [ index |-> 0,
zxid |-> <<0, 0>> ] ]
\* If SyncRequestProcessor starts, it logs all pending txns if not crashed
/\ history' = [history EXCEPT ![i] =
IF \/ isCrash
\/ servingState[i] = SHUTDOWN
\/ /\ queuedRequests[i] = << >>
/\ packetsSync[i].notCommitted = << >>
THEN @
ELSE @ \o queuedRequests[i] \o packetsSync[i].notCommitted ]
/\ initialHistory' = [initialHistory EXCEPT ![i] = history'[i]]
\* SyncRequestProcessor will process this synchronously
/\ queuedRequests' = [queuedRequests EXCEPT ![i] = << >>]
\* CommitProcessor will process this synchronously
/\ committedRequests' = [committedRequests EXCEPT ![i] = << >>]
/\ packetsSync' = [packetsSync EXCEPT ![i].notCommitted = << >>,
![i].committed = << >>]
\* in version 3.7+, lastProcessed will be modified when turning to LOOKING
/\ lastProcessed' = [lastProcessed EXCEPT ![i] = InitLastProcessed(i)]
/\ hzxid' = [hzxid EXCEPT ![i] = lastProcessed'[i].zxid]
LeaderShutdown_fix(i, crashSet) ==
/\ LET cluster == {i} \union learners[i]
IN Shutdown_fix(cluster, crashSet)
/\ learners' = [learners EXCEPT ![i] = {}]
/\ forwarding' = [forwarding EXCEPT ![i] = {}]
/\ leaderOracle' = NullPoint
PartitionStart_fix(i, j) ==
/\ CheckExternalEventExecute(<<"PartitionStart", i, j>>)
/\ CheckPartition
/\ i /= j
/\ IsON(i)
/\ IsON(j)
/\ \lnot HasPartitioned(i, j)
/\ \/ /\ IsLeader(i) /\ IsMyLearner(i, j)
/\ IsFollower(j) /\ IsMyLeader(j, i)
/\ LET newCluster == learners[i] \ {j}
IN \/ /\ IsQuorum(newCluster) \* just remove this learner
/\ RemoveLearner(i, j)
/\ FollowerShutdown_fix(j, FALSE)
/\ Clean(i, j)
/\ UNCHANGED electionVars
\/ /\ ~IsQuorum(newCluster) \* leader switches to looking
/\ LeaderShutdown_fix(i, {})
/\ UNCHANGED <<electing, connecting, ackldRecv>>
\/ /\ IsLooking(i)
/\ IsLooking(j)
/\ IdComparePredicate(i, j) \* to compress state space
/\ UNCHANGED <<threadVars>>
/\ UNCHANGED <<state, zabState, servingState, lastProcessed, hzxid, connecting, noDisVars,
history, initialHistory, lastCommitted, connectInfo, packetsSync, netVars, electionVars>>
/\ partition' = [partition EXCEPT ![i][j] = TRUE, ![j][i] = TRUE ]
/\ UNCHANGED <<acceptedEpoch, currentEpoch, lastSnapshot, tempMaxEpoch, status, verifyVars, daInv>>
/\ UpdateRecorder(<<"PartitionStart", i, j>>)
NodeCrash_fix(i) ==
/\ CheckExternalEventExecute(<<"NodeCrash", i>>)
/\ CheckCrash(i)
/\ IsON(i)
/\ status' = [status EXCEPT ![i] = OFFLINE ]
/\ \/ /\ IsLooking(i)
/\ UNCHANGED <<threadVars>>
/\ UNCHANGED <<state, zabState, servingState, lastProcessed, hzxid, connecting, noDisVars,
history, initialHistory, lastCommitted, connectInfo, packetsSync, netVars, electionVars>>
\/ /\ IsFollower(i)
/\ LET connectedWithLeader == leaderOracle /= NullPoint /\ i \in learners[leaderOracle]
\* connectedWithLeader == HasLeader(i)
IN \/ /\ connectedWithLeader
/\ LET leader == leaderOracle
\* leader == connectInfo[i].sid
newCluster == learners[leader] \ {i}
IN
\/ /\ IsQuorum(newCluster)
/\ RemoveLearner(leader, i)
/\ FollowerShutdown_fix(i, TRUE)
/\ Clean(leader, i)
/\ UNCHANGED electionVars
\/ /\ ~IsQuorum(newCluster)
/\ LeaderShutdown_fix(leader, {i})
/\ UNCHANGED <<electing, connecting, ackldRecv>>
\/ /\ ~connectedWithLeader
/\ FollowerShutdown_fix(i, TRUE)
/\ CleanInputBuffer({i})
/\ UNCHANGED <<connecting, noDisVars, electionVars>>
\/ /\ IsLeader(i)
/\ LeaderShutdown_fix(i, {i})
/\ UNCHANGED <<connecting, electing, ackldRecv>>
/\ UNCHANGED <<acceptedEpoch, currentEpoch, lastSnapshot, tempMaxEpoch, partition, verifyVars, daInv>>
/\ UpdateRecorder(<<"NodeCrash", i>>)
-----------------------------------------------------------------------------
\* Next
NextFix ==
(* Set initial state *)
\/ /\ recorder["step"] = 0
/\ SetInitState
/\ UpdateAfterAction
\/ /\ recorder["step"] > 0
/\ AfterActionCheck
/\ DuringActionCheck
/\
(* FLE and Discovery *)
\/ \E i \in Server, S \in Quorums: ElectionAndDiscovery(i, S)
(* Zab - Synchronization *)
\/ \E i, j \in Server: LeaderSyncFollower(i, j)
\/ \E i, j \in Server: FollowerProcessSyncMessage(i, j)
\/ \E i, j \in Server: FollowerProcessPROPOSALInSync(i, j)
\/ \E i, j \in Server: FollowerProcessCOMMITInSync(i, j)
\/ \E i, j \in Server: FollowerProcessNEWLEADER_pr2152_1(i, j)
\/ \E i, j \in Server: FollowerProcessNEWLEADER_pr2152_2(i, j)
\/ \E i, j \in Server: FollowerProcessNEWLEADER_pr2152_3(i, j)
\/ \E i, j \in Server: FollowerProcessUPTODATE(i, j)
(* Zab - Broadcast *)
\/ \E i \in Server: LeaderProcessRequest(i)
\/ \E i, j \in Server: FollowerProcessPROPOSAL(i, j)
\/ \E i, j \in Server: LeaderProcessACK(i, j) \* Sync + Broadcast
\/ \E i, j \in Server: FollowerProcessCOMMIT(i, j)
(* Internal event: save a request to disk and reply ack, commit a request *)
\/ \E i, j \in Server: FollowerSyncProcessorLogRequest(i, j)
\/ \E i, j \in Server: FollowerCommitProcessorCommit(i, j)
(* Filter redundant messages in network *)
\/ \E i \in Server: FilterNonexistentMessage(i)
(* Failures like node crash and network partition *)
\/ \E i, j \in Server: PartitionStart_fix(i, j)
\/ \E i, j \in Server: PartitionRecover(i, j)
\/ \E i \in Server: NodeCrash_fix(i)
\/ \E i \in Server: NodeStart(i)
/\ UpdateAfterAction
SpecFix == InitFix /\ [][NextFix]_varsFix
=============================================================================