diff --git a/src/Lean/Server/Watchdog.lean b/src/Lean/Server/Watchdog.lean index f49cd50ffc8b..cb1eef0d58c3 100644 --- a/src/Lean/Server/Watchdog.lean +++ b/src/Lean/Server/Watchdog.lean @@ -90,6 +90,10 @@ section Utils | crashed (e : IO.Error) | ioError (e : IO.Error) + inductive CrashOrigin + | fileWorkerToClientForwarding + | clientToFileWorkerForwarding + inductive WorkerState where /-- The watchdog can detect a crashed file worker in two places: When trying to send a message to the file worker and when reading a request reply. @@ -98,7 +102,7 @@ section Utils that are in-flight are errored. Upon receiving the next packet for that file worker, the file worker is restarted and the packet is forwarded to it. If the crash was detected while writing a packet, we queue that packet until the next packet for the file worker arrives. -/ - | crashed (queuedMsgs : Array JsonRpc.Message) + | crashed (queuedMsgs : Array JsonRpc.Message) (origin : CrashOrigin) | running abbrev PendingRequestMap := RBMap RequestID JsonRpc.Message compare @@ -136,6 +140,11 @@ section FileWorker for ⟨id, _⟩ in pendingRequests do hError.writeLspResponseError { id := id, code := code, message := msg } + def queuedMsgs (fw : FileWorker) : Array JsonRpc.Message := + match fw.state with + | .running => #[] + | .crashed queuedMsgs _ => queuedMsgs + end FileWorker end FileWorker @@ -404,10 +413,23 @@ section ServerM return eraseFileWorker uri - def handleCrash (uri : DocumentUri) (queuedMsgs : Array JsonRpc.Message) : ServerM Unit := do + def handleCrash (uri : DocumentUri) (queuedMsgs : Array JsonRpc.Message) (origin: CrashOrigin) : ServerM Unit := do let some fw ← findFileWorker? uri | return - updateFileWorkers { fw with state := WorkerState.crashed queuedMsgs } + updateFileWorkers { fw with state := WorkerState.crashed queuedMsgs origin } + + def tryDischargeQueuedMessages (uri : DocumentUri) (queuedMsgs : Array JsonRpc.Message) : ServerM Unit := do + let some fw ← findFileWorker? uri + | throwServerError "Cannot find file worker for '{uri}'." + let mut crashedMsgs := #[] + -- Try to discharge all queued msgs, tracking the ones that we can't discharge + for msg in queuedMsgs do + try + fw.stdin.writeLspMessage msg + catch _ => + crashedMsgs := crashedMsgs.push msg + if ¬ crashedMsgs.isEmpty then + handleCrash uri crashedMsgs .clientToFileWorkerForwarding /-- Tries to write a message, sets the state of the FileWorker to `crashed` if it does not succeed and restarts the file worker if the `crashed` flag was already set. Just logs an error if @@ -423,7 +445,7 @@ section ServerM let some fw ← findFileWorker? uri | return match fw.state with - | WorkerState.crashed queuedMsgs => + | WorkerState.crashed queuedMsgs _ => let mut queuedMsgs := queuedMsgs if queueFailedMessage then queuedMsgs := queuedMsgs.push msg @@ -432,17 +454,7 @@ section ServerM -- restart the crashed FileWorker eraseFileWorker uri startFileWorker fw.doc - let some newFw ← findFileWorker? uri - | throwServerError "Cannot find file worker for '{uri}'." - let mut crashedMsgs := #[] - -- try to discharge all queued msgs, tracking the ones that we can't discharge - for msg in queuedMsgs do - try - newFw.stdin.writeLspMessage msg - catch _ => - crashedMsgs := crashedMsgs.push msg - if ¬ crashedMsgs.isEmpty then - handleCrash uri crashedMsgs + tryDischargeQueuedMessages uri queuedMsgs | WorkerState.running => let initialQueuedMsgs := if queueFailedMessage then @@ -452,7 +464,7 @@ section ServerM try fw.stdin.writeLspMessage msg catch _ => - handleCrash uri initialQueuedMsgs + handleCrash uri initialQueuedMsgs .clientToFileWorkerForwarding /-- Sends a notification to the file worker identified by `uri` that its dependency `staleDependency` @@ -955,7 +967,16 @@ section MainLoop let workers ← st.fileWorkersRef.get let mut workerTasks := #[] for (_, fw) in workers do - if let WorkerState.running := fw.state then + -- When the forwarding task crashes, its return value will be stuck at + -- `WorkerEvent.crashed _`. + -- We want to handle this event only once, not over and over again, + -- so once the state becomes `WorkerState.crashed _ .fileWorkerToClientForwarding` + -- as a result of `WorkerEvent.crashed _`, we stop handling this event until + -- eventually the file worker is restarted by a notification from the client. + -- We do not want to filter the forwarding task in case of + -- `WorkerState.crashed _ .clientToFileWorkerForwarding`, since the forwarding task + -- exit code may still contain valuable information in this case (e.g. that the imports changed). + if !(fw.state matches WorkerState.crashed _ .fileWorkerToClientForwarding) then workerTasks := workerTasks.push <| fw.commTask.map (ServerEvent.workerEvent fw) let ev ← IO.waitAny (clientTask :: workerTasks.toList) @@ -984,13 +1005,16 @@ section MainLoop | WorkerEvent.ioError e => throwServerError s!"IO error while processing events for {fw.doc.uri}: {e}" | WorkerEvent.crashed _ => - handleCrash fw.doc.uri #[] + handleCrash fw.doc.uri fw.queuedMsgs .fileWorkerToClientForwarding mainLoop clientTask | WorkerEvent.terminated => throwServerError <| "Internal server error: got termination event for worker that " ++ "should have been removed" | .importsChanged => + let uri := fw.doc.uri + let queuedMsgs := fw.queuedMsgs startFileWorker fw.doc + tryDischargeQueuedMessages uri queuedMsgs mainLoop clientTask end MainLoop diff --git a/src/lake/Lake/Util/Proc.lean b/src/lake/Lake/Util/Proc.lean index ec4dd70bc44f..f40f70d92345 100644 --- a/src/lake/Lake/Util/Proc.lean +++ b/src/lake/Lake/Util/Proc.lean @@ -57,6 +57,7 @@ def testProc (args : IO.Process.SpawnArgs) : BaseIO Bool := EIO.catchExceptions (h := fun _ => pure false) do let child ← IO.Process.spawn { args with + stdin := IO.Process.Stdio.null stdout := IO.Process.Stdio.null stderr := IO.Process.Stdio.null }