Skip to content

Commit

Permalink
refactor: move IO.Channel and IO.Mutex to Std.Sync (#6282)
Browse files Browse the repository at this point in the history
This PR moves `IO.Channel` and `IO.Mutex` from `Init` to `Std.Sync` and
renames them to `Std.Channel` and `Std.Mutex`.

Note that the original files are retained and the deprecation is written
manually as we cannot import `Std` from `Init` so this is the only way
to deprecate without a hard breaking change. In particular we do not yet
move `Std.Queue` from `Init` to `Std` both because it needs to be
retained for this deprecation to work but also because it is already
within the `Std` namespace and as such we cannot maintain two copies of
the file at once. After the deprecation period is finished `Std.Queue`
will find a new home in `Std.Data.Queue`.
  • Loading branch information
hargoniX authored Dec 3, 2024
1 parent cb600ed commit 24b412e
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 17 deletions.
14 changes: 14 additions & 0 deletions src/Init/Data/Channel.lean
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import Init.Data.Queue
import Init.System.Promise
import Init.System.Mutex

set_option linter.deprecated false

namespace IO

/--
Internal state of an `Channel`.
We maintain the invariant that at all times either `consumers` or `values` is empty.
-/
@[deprecated "Use Std.Channel.State from Std.Sync.Channel instead" (since := "2024-12-02")]
structure Channel.State (α : Type) where
values : Std.Queue α := ∅
consumers : Std.Queue (Promise (Option α)) := ∅
Expand All @@ -27,12 +30,14 @@ FIFO channel with unbounded buffer, where `recv?` returns a `Task`.
A channel can be closed. Once it is closed, all `send`s are ignored, and
`recv?` returns `none` once the queue is empty.
-/
@[deprecated "Use Std.Channel from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel (α : Type) : Type := Mutex (Channel.State α)

instance : Nonempty (Channel α) :=
inferInstanceAs (Nonempty (Mutex _))

/-- Creates a new `Channel`. -/
@[deprecated "Use Std.Channel.new from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.new : BaseIO (Channel α) :=
Mutex.new {}

Expand All @@ -41,6 +46,7 @@ Sends a message on an `Channel`.
This function does not block.
-/
@[deprecated "Use Std.Channel.send from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.send (ch : Channel α) (v : α) : BaseIO Unit :=
ch.atomically do
let st ← get
Expand All @@ -54,6 +60,7 @@ def Channel.send (ch : Channel α) (v : α) : BaseIO Unit :=
/--
Closes an `Channel`.
-/
@[deprecated "Use Std.Channel.close from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.close (ch : Channel α) : BaseIO Unit :=
ch.atomically do
let st ← get
Expand All @@ -67,6 +74,7 @@ Every message is only received once.
Returns `none` if the channel is closed and the queue is empty.
-/
@[deprecated "Use Std.Channel.recv? from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.recv? (ch : Channel α) : BaseIO (Task (Option α)) :=
ch.atomically do
let st ← get
Expand All @@ -85,6 +93,7 @@ def Channel.recv? (ch : Channel α) : BaseIO (Task (Option α)) :=
Note that if this function is called twice, each `forAsync` only gets half the messages.
-/
@[deprecated "Use Std.Channel.forAsync from Std.Sync.Channel instead" (since := "2024-12-02")]
partial def Channel.forAsync (f : α → BaseIO Unit) (ch : Channel α)
(prio : Task.Priority := .default) : BaseIO (Task Unit) := do
BaseIO.bindTask (prio := prio) (← ch.recv?) fun
Expand All @@ -96,11 +105,13 @@ Receives all currently queued messages from the channel.
Those messages are dequeued and will not be returned by `recv?`.
-/
@[deprecated "Use Std.Channel.recvAllCurrent from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.recvAllCurrent (ch : Channel α) : BaseIO (Array α) :=
ch.atomically do
modifyGet fun st => (st.values.toArray, { st with values := ∅ })

/-- Type tag for synchronous (blocking) operations on a `Channel`. -/
@[deprecated "Use Std.Channel.Sync from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.Sync := Channel

/--
Expand All @@ -110,6 +121,7 @@ For example, `ch.sync.recv?` blocks until the next message,
and `for msg in ch.sync do ...` iterates synchronously over the channel.
These functions should only be used in dedicated threads.
-/
@[deprecated "Use Std.Channel.sync from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.sync (ch : Channel α) : Channel.Sync α := ch

/--
Expand All @@ -118,9 +130,11 @@ Synchronously receives a message from the channel.
Every message is only received once.
Returns `none` if the channel is closed and the queue is empty.
-/
@[deprecated "Use Std.Channel.Sync.recv? from Std.Sync.Channel instead" (since := "2024-12-02")]
def Channel.Sync.recv? (ch : Channel.Sync α) : BaseIO (Option α) := do
IO.wait (← Channel.recv? ch)

@[deprecated "Use Std.Channel.Sync.forIn from Std.Sync.Channel instead" (since := "2024-12-02")]
private partial def Channel.Sync.forIn [Monad m] [MonadLiftT BaseIO m]
(ch : Channel.Sync α) (f : α → β → m (ForInStep β)) : β → m β := fun b => do
match ← ch.recv? with
Expand Down
25 changes: 18 additions & 7 deletions src/Init/System/Mutex.lean
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ prelude
import Init.System.IO
import Init.Control.StateRef


set_option linter.deprecated false

namespace IO

private opaque BaseMutexImpl : NonemptyType.{0}
Expand All @@ -16,12 +19,13 @@ Mutual exclusion primitive (a lock).
If you want to guard shared state, use `Mutex α` instead.
-/
@[deprecated "Use Std.BaseMutex from Std.Sync.Mutex instead" (since := "2024-12-02")]
def BaseMutex : Type := BaseMutexImpl.type

instance : Nonempty BaseMutex := BaseMutexImpl.property

/-- Creates a new `BaseMutex`. -/
@[extern "lean_io_basemutex_new"]
@[extern "lean_io_basemutex_new", deprecated "Use Std.BaseMutex.new from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque BaseMutex.new : BaseIO BaseMutex

/--
Expand All @@ -30,7 +34,7 @@ Locks a `BaseMutex`. Waits until no other thread has locked the mutex.
The current thread must not have already locked the mutex.
Reentrant locking is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basemutex_lock"]
@[extern "lean_io_basemutex_lock", deprecated "Use Std.BaseMutex.lock from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque BaseMutex.lock (mutex : @& BaseMutex) : BaseIO Unit

/--
Expand All @@ -39,33 +43,35 @@ Unlocks a `BaseMutex`.
The current thread must have already locked the mutex.
Unlocking an unlocked mutex is undefined behavior (inherited from the C++ implementation).
-/
@[extern "lean_io_basemutex_unlock"]
@[extern "lean_io_basemutex_unlock", deprecated "Use Std.BaseMutex.unlock from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque BaseMutex.unlock (mutex : @& BaseMutex) : BaseIO Unit

private opaque CondvarImpl : NonemptyType.{0}

/-- Condition variable. -/
@[deprecated "Use Std.Condvar from Std.Sync.Mutex instead" (since := "2024-12-02")]
def Condvar : Type := CondvarImpl.type

instance : Nonempty Condvar := CondvarImpl.property

/-- Creates a new condition variable. -/
@[extern "lean_io_condvar_new"]
@[extern "lean_io_condvar_new", deprecated "Use Std.Condvar.new from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque Condvar.new : BaseIO Condvar

/-- Waits until another thread calls `notifyOne` or `notifyAll`. -/
@[extern "lean_io_condvar_wait"]
@[extern "lean_io_condvar_wait", deprecated "Use Std.Condvar.wait from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque Condvar.wait (condvar : @& Condvar) (mutex : @& BaseMutex) : BaseIO Unit

/-- Wakes up a single other thread executing `wait`. -/
@[extern "lean_io_condvar_notify_one"]
@[extern "lean_io_condvar_notify_one", deprecated "Use Std.Condvar.notifyOne from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque Condvar.notifyOne (condvar : @& Condvar) : BaseIO Unit

/-- Wakes up all other threads executing `wait`. -/
@[extern "lean_io_condvar_notify_all"]
@[extern "lean_io_condvar_notify_all", deprecated "Use Std.Condvar.notifyAll from Std.Sync.Mutex instead" (since := "2024-12-02")]
opaque Condvar.notifyAll (condvar : @& Condvar) : BaseIO Unit

/-- Waits on the condition variable until the predicate is true. -/
@[deprecated "Use Std.Condvar.waitUntil from Std.Sync.Mutex instead" (since := "2024-12-02")]
def Condvar.waitUntil [Monad m] [MonadLift BaseIO m]
(condvar : Condvar) (mutex : BaseMutex) (pred : m Bool) : m Unit := do
while !(← pred) do
Expand All @@ -78,6 +84,7 @@ The type `Mutex α` is similar to `IO.Ref α`,
except that concurrent accesses are guarded by a mutex
instead of atomic pointer operations and busy-waiting.
-/
@[deprecated "Use Std.Mutex from Std.Sync.Mutex instead" (since := "2024-12-02")]
structure Mutex (α : Type) where private mk ::
private ref : IO.Ref α
mutex : BaseMutex
Expand All @@ -86,6 +93,7 @@ structure Mutex (α : Type) where private mk ::
instance : CoeOut (Mutex α) BaseMutex where coe := Mutex.mutex

/-- Creates a new mutex. -/
@[deprecated "Use Std.Mutex.new from Std.Sync.Mutex instead" (since := "2024-12-02")]
def Mutex.new (a : α) : BaseIO (Mutex α) :=
return { ref := ← mkRef a, mutex := ← BaseMutex.new }

Expand All @@ -94,9 +102,11 @@ def Mutex.new (a : α) : BaseIO (Mutex α) :=
with outside monad `m`.
The action has access to the state `α` of the mutex (via `get` and `set`).
-/
@[deprecated "Use Std.AtomicT from Std.Sync.Mutex instead" (since := "2024-12-02")]
abbrev AtomicT := StateRefT' IO.RealWorld

/-- `mutex.atomically k` runs `k` with access to the mutex's state while locking the mutex. -/
@[deprecated "Use Std.Mutex.atomically from Std.Sync.Mutex instead" (since := "2024-12-02")]
def Mutex.atomically [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
(mutex : Mutex α) (k : AtomicT α m β) : m β := do
try
Expand All @@ -110,6 +120,7 @@ def Mutex.atomically [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
waiting on `condvar` until `pred` returns true.
Both `k` and `pred` have access to the mutex's state.
-/
@[deprecated "Use Std.Mutex.atomicallyOnce from Std.Sync.Mutex instead" (since := "2024-12-02")]
def Mutex.atomicallyOnce [Monad m] [MonadLiftT BaseIO m] [MonadFinally m]
(mutex : Mutex α) (condvar : Condvar)
(pred : AtomicT α m Bool) (k : AtomicT α m β) : m β :=
Expand Down
14 changes: 7 additions & 7 deletions src/Lean/Server/FileWorker.lean
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Authors: Marc Huisinga, Wojciech Nawrocki
-/
prelude
import Init.System.IO
import Init.Data.Channel
import Std.Sync.Channel

import Lean.Data.RBMap
import Lean.Environment
Expand Down Expand Up @@ -64,7 +64,7 @@ open Widget in
structure WorkerContext where
/-- Synchronized output channel for LSP messages. Notifications for outdated versions are
discarded on read. -/
chanOut : IO.Channel JsonRpc.Message
chanOut : Std.Channel JsonRpc.Message
/--
Latest document version received by the client, used for filtering out notifications from
previous versions.
Expand All @@ -75,7 +75,7 @@ structure WorkerContext where
Channel that receives a message for every a `$/lean/fileProgress` notification, indicating whether
the notification suggests that the file is currently being processed.
-/
chanIsProcessing : IO.Channel Bool
chanIsProcessing : Std.Channel Bool
/--
Diagnostics that are included in every single `textDocument/publishDiagnostics` notification.
-/
Expand Down Expand Up @@ -271,7 +271,7 @@ open Language Lean in
Callback from Lean language processor after parsing imports that requests necessary information from
Lake for processing imports.
-/
def setupImports (meta : DocumentMeta) (cmdlineOpts : Options) (chanOut : Channel JsonRpc.Message)
def setupImports (meta : DocumentMeta) (cmdlineOpts : Options) (chanOut : Std.Channel JsonRpc.Message)
(srcSearchPathPromise : Promise SearchPath) (stx : Syntax) :
Language.ProcessingT IO (Except Language.Lean.HeaderProcessedSnapshot SetupImportsResult) := do
let importsAlreadyLoaded ← importsLoadedRef.modifyGet ((·, true))
Expand Down Expand Up @@ -337,7 +337,7 @@ section Initialization
let clientHasWidgets := initParams.initializationOptions?.bind (·.hasWidgets?) |>.getD false
let maxDocVersionRef ← IO.mkRef 0
let freshRequestIdRef ← IO.mkRef (0 : Int)
let chanIsProcessing ← IO.Channel.new
let chanIsProcessing ← Std.Channel.new
let stickyDiagnosticsRef ← IO.mkRef ∅
let chanOut ← mkLspOutputChannel maxDocVersionRef chanIsProcessing
let srcSearchPathPromise ← IO.Promise.new
Expand Down Expand Up @@ -380,8 +380,8 @@ section Initialization
the output FS stream after discarding outdated notifications. This is the only component of
the worker with access to the output stream, so we can synchronize messages from parallel
elaboration tasks here. -/
mkLspOutputChannel maxDocVersion chanIsProcessing : IO (IO.Channel JsonRpc.Message) := do
let chanOut ← IO.Channel.new
mkLspOutputChannel maxDocVersion chanIsProcessing : IO (Std.Channel JsonRpc.Message) := do
let chanOut ← Std.Channel.new
let _ ← chanOut.forAsync (prio := .dedicated) fun msg => do
-- discard outdated notifications; note that in contrast to responses, notifications can
-- always be silently discarded
Expand Down
6 changes: 3 additions & 3 deletions src/Lean/Server/Watchdog.lean
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Authors: Marc Huisinga, Wojciech Nawrocki
-/
prelude
import Init.System.IO
import Init.System.Mutex
import Std.Sync.Mutex
import Init.Data.ByteArray
import Lean.Data.RBMap

Expand Down Expand Up @@ -113,7 +113,7 @@ section FileWorker
structure FileWorker where
doc : DocumentMeta
proc : Process.Child workerCfg
exitCode : IO.Mutex (Option UInt32)
exitCode : Std.Mutex (Option UInt32)
commTask : Task WorkerEvent
state : WorkerState
-- This should not be mutated outside of namespace FileWorker,
Expand Down Expand Up @@ -392,7 +392,7 @@ section ServerM
-- open session for `kill` above
setsid := true
}
let exitCode ← IO.Mutex.new none
let exitCode ← Std.Mutex.new none
let pendingRequestsRef ← IO.mkRef (RBMap.empty : PendingRequestMap)
let initialDependencyBuildMode := m.dependencyBuildMode
let updatedDependencyBuildMode :=
Expand Down
1 change: 1 addition & 0 deletions src/Std.lean
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Authors: Sebastian Ullrich
prelude
import Std.Data
import Std.Sat
import Std.Sync
import Std.Time
import Std.Tactic
import Std.Internal
8 changes: 8 additions & 0 deletions src/Std/Sync.lean
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/-
Copyright (c) 2024 Lean FRO, LLC. All rights reserved.
Released under Apache 2.0 license as described in the file LICENSE.
Authors: Henrik Böving
-/
prelude
import Std.Sync.Channel
import Std.Sync.Mutex
Loading

0 comments on commit 24b412e

Please sign in to comment.