Skip to content

Commit

Permalink
Add swap!, update!, and call-with derived operations
Browse files Browse the repository at this point in the history
  • Loading branch information
lexi-lambda committed Nov 26, 2023
1 parent 9092404 commit f2deb66
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 5 deletions.
66 changes: 63 additions & 3 deletions mvar-doc/scribblings/data/mvar.scrbl
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,24 @@ This list is far from exhaustive, and multiple M-vars used in concert can be eve

If a thread is blocked on a call to @racket[mvar-peek], the call is guaranteed to complete the next time the M-var is @tech[#:key "full"]{filled}, even if another thread is blocked on a call to @racket[mvar-take!] on the same M-var. In other words, whenever @racket[mvar-peek] and @racket[mvar-take!] compete to read the next value of an @tech{empty} M-var, @racket[mvar-peek] always wins. Since @racket[mvar-peek] is not exclusive—that is, it does not preclude another thread from reading the same M-var after it completes—this preference for @racket[mvar-peek] ensures that the maximum number of threads are woken up each time an M-var is filled.

@subsection[#:tag "safety"]{Safety}
@subsection[#:tag "atomicity"]{Atomicity}

Being a concurrent data structure, @tech{M-vars} are naturally thread-safe. They are also @reftech{break}-safe: if a thread is interrupted via @racket[break-thread] while executing an M-var operation, either none or all of the operation’s effects will take place. In other words, the atomicity of M-var operations is never compromised by breaks.
All M-var @tech{core operations} are @deftech{atomic}: so long as their executing thread is not killed or suspended (see @secref["thread-safety"]), either none or all of their effects will take place. This atomicity is guaranteed regardless of the way in which the M-var is used.

In contrast, @tech{derived operations} internally perform multiple core operations in sequence, and those sequences are @emph{not} intrinsically atomic. However, all derived operations currently provided by this library offer a weaker form of atomicity: they rely on the common M-var usage pattern of treating the M-var like a @reftech{semaphore}-protected @reftech{box}, where each use of @racket[mvar-put!] is preceded by a use of @racket[mvar-take!] to acquire exclusive access. This exclusive locking discipline ensures that derived operations are atomic with respect to the M-var’s state---each operation constitutes a non-overlapping “transaction” that is always fully committed or fully rolled back---but following the pattern correctly is the programmer’s responsibility.

@subsection[#:tag "thread-safety"]{Thread Safety}

@tech{M-vars} are a concurrent data structure, so all M-var operations are naturally thread-safe. Furthermore, they are also @reftech{break}-safe: if a thread is interrupted via @racket[break-thread] while executing any M-var operation, its @seclink["atomicity"]{atomicity} guarantees will not be compromised, and the M-var will always remain in a valid state.

However, M-vars are @emph{not} kill-safe. If a thread is killed via @racket[kill-thread] while executing any M-var operation (including @racket[mvar-peek]), the operation’s effects may only partially complete, and the M-var may be permanently left in an invalid state. In a similar vein, if a thread is suspended via @racket[thread-suspend] while executing any M-var operation, the operation may only partially complete, and the M-var may become temporarily unusable until the thread is resumed.

M-var operations also cannot safely be called in @tech[#:doc '(lib "scribblings/foreign/foreign.scrbl")]{atomic mode}. Even non-blocking operations like @racket[mvar-try-peek] may require polling events, which can lead to a deadlock if atomic mode is active.

@section[#:tag "core-operations"]{Core Operations}

This section documents the complete list of @tech{M-var} @deftech{core operations}. All core operations are primitive (they cannot be derived from other operations) and @tech{atomic} (but not kill-safe; see @secref["thread-safety"]).

@defproc*[([(make-mvar) mvar?]
[(make-mvar [v any/c]) mvar?])]{
Creates and returns a new @tech{M-var}. If called with no arguments, the returned M-var is initially @tech{empty}. If called with one argument, the returned M-var is initially @tech{full} and contains @racket[v].
Expand Down Expand Up @@ -164,6 +172,58 @@ Returns a @reftech{synchronizable event} for use with @racket[sync]. The event i

Like @racket[mvar-empty?], this operation should be used very carefully: even if the event is selected, another thread might fill @racket[mv] the instant that @racket[sync] returns, so it is almost always better to use @racket[mvar-put!-evt], instead. However, in programs where @racket[mv] only has a single writer, it can rarely be useful, so it is provided for completeness.}

@section[#:tag "derived-operations"]{Derived Operations}

The bindings documented in this section are @deftech{derived operations}, which codify some common patterns that arise when an @tech{M-var} is used like a @reftech{semaphore}-protected @reftech{box}. Since they are implemented as sequences of @tech{core operations}, they are @emph{not} intrinsically @tech{atomic}. Instead, atomicity is enforced through a locking discipline: each operation expects to receive an M-var that is normally kept @tech{full}, and it uses @racket[mvar-take!] to acquire an exclusive lock on its contents. When the operation returns, it uses @racket[mvar-put!] to simultaneously update the contents and release the lock.

Since all of the bindings in this section follow this locking discipline, an easy recipe to ensure atomicity is to never use @racket[mvar-take!] or @racket[mvar-put!] directly on any M-var used as a @reftech{semaphore}-protected @reftech{box}. Doing this comes at the risk of accidentally failing to either acquire or release the lock, which is especially easy to do if the critical section fails with an exception or is interrupted by a @reftech{break}. The higher-level, derived operations documented in this section make those accidents impossible, so they should be preferred whenever the lower-level, channel-style operations are not needed.

@(define (requires-locking-discipline operation-name)
@list{Since @operation-name is implemented using @racket[mvar-take!] followed by @racket[mvar-put!], it is not intrinsically @tech{atomic}. To ensure atomicity, @operation-name should only be used on @tech{M-vars} that follow @seclink["derived-operations"]{the required locking discipline}.})

@defproc[(mvar-swap! [mv mvar?] [v any/c] [#:enable-break? enable-break? any/c #f]) any/c]{
Takes a value from @racket[mv], puts @racket[v] into @racket[mv], then returns the taken value. If @racket[enable-break?] is not @racket[#f], @reftech{breaks} are explicitly enabled while waiting to take from @racket[mv].

@requires-locking-discipline[@racket[mvar-swap!]]

@(mvar-examples
(define mv (make-mvar 'old))
(eval:check (mvar-swap! mv 'new) 'old)
mv)}

@defproc[(mvar-update! [mv mvar?]
[update-proc (-> any/c any/c)]
[#:enable-break? enable-break? any/c #f])
void?]{
Takes a value from @racket[mv], applies @racket[update-proc] to the taken value, then puts the result into @racket[mv]. If @racket[enable-break?] is not @racket[#f], @reftech{breaks} are explicitly enabled while waiting to take from @racket[mv].

@requires-locking-discipline[@racket[mvar-update!]]

@(mvar-examples
(define mv (make-mvar 0))
(mvar-update! mv add1)
mv)}

@defproc[(call-with-mvar [mv mvar?]
[body-proc (-> any/c any)]
[#:enable-break? enable-break? any/c #f])
any]{
Takes a value from @racket[mv], applies @racket[body-proc] to the taken value, then puts the taken value back into @racket[mv]. The result of @racket[body-proc] is the result of the @racket[call-with-mvar] call. If @racket[enable-break?] is not @racket[#f], @reftech{breaks} are explicitly enabled while waiting to take from @racket[mv].

@racket[call-with-mvar] is useful if @racket[mv] contains a handle to a shared resource that cannot be used by more than one thread at a time. For example, @racket[mv] might hold an @reftech{output port}, and a writer thread might use @racket[call-with-mvar] to obtain exclusive access to the port while it writes a packet, which must not be interleaved with other writes. Once the packet has been written and @racket[body-proc] returns, @racket[call-with-mvar] puts the output port back in @racket[mv], making it available for acquisition by some other thread.

@requires-locking-discipline[@racket[call-with-mvar]]}

@defproc[(call-with-mvar! [mv mvar?]
[body-proc (-> any/c (values any/c ...+))]
[#:enable-break? enable-break? any/c #f])
any]{
Takes a value from @racket[mv] and applies @racket[body-proc] to the taken value, which must return at least one result. The first result is put into @racket[mv], and the other results are the result of the @racket[call-with-mvar!] call. If @racket[enable-break?] is not @racket[#f], @reftech{breaks} are explicitly enabled while waiting to take from @racket[mv].

In other words, @racket[call-with-mvar!] is effectively a combination of @racket[call-with-mvar] and @racket[mvar-update!]: @racket[body-proc] determines both the value put back into @racket[mv] and the result (or results) of the overall call.

@requires-locking-discipline[@racket[call-with-mvar!]]}

@section[#:tag "contracts"]{Contracts}

@defproc[(mvar/c [in-ctc contract?] [out-ctc contract? in-ctc]) contract?]{
Expand Down Expand Up @@ -208,7 +268,7 @@ The @racketmodname[syncvar/mvar #:indirect] library predates @racketmodname[data

@item{@racketmodname[data/mvar] provides @seclink["contracts"]{contracts}, @seclink["chaperones-and-impersonators"]{chaperones, and impersonators} on M-vars, while @racketmodname[syncvar/mvar #:indirect] does not.}

@item{In @racketmodname[data/mvar], @racket[mvar-put!], @racket[mvar-take!], and @racket[mvar-peek] accept an @racket[#:enable-breaks?] keyword argument to allow @reftech{breaks} to be delivered while blocked even if they are disabled in the enclosing context. @racketmodname[syncvar/mvar #:indirect] does not, though @racket[sync/enable-break] can be used as an alternative.}
@item{In @racketmodname[data/mvar], @racket[mvar-put!], @racket[mvar-take!], and @racket[mvar-peek] accept an @racket[#:enable-break?] keyword argument to allow @reftech{breaks} to be delivered while blocked even if they are disabled in the enclosing context. @racketmodname[syncvar/mvar #:indirect] does not, though @racket[sync/enable-break] can be used as an alternative.}

@item{@racketmodname[data/mvar] provides stronger guarantees for @racket[mvar-peek] (see @secref["ordering-and-fairness"]).}

Expand Down
71 changes: 70 additions & 1 deletion mvar-lib/data/mvar.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@
[mvar-empty? (-> mvar? boolean?)]
[mvar-empty-evt (-> mvar? evt?)]

[mvar-swap! (->* [mvar? any/c] [#:enable-break? any/c] any/c)]
[mvar-update! (->* [mvar? (-> any/c any/c)] [#:enable-break? any/c] void?)]
[call-with-mvar (->* [mvar? (-> any/c any)] [#:enable-break? any/c] any)]
[call-with-mvar! (->* [mvar? (-> any/c any)] [#:enable-break? any/c] any)]

[impersonate-mvar impersonate-mvar/c]
[chaperone-mvar impersonate-mvar/c]
[mvar/c (->* [contract?] [contract?] contract?)]))

(define no-value (gensym 'no-value))

;; -----------------------------------------------------------------------------
;; mvar
;; core operations

;; An mvar is implemented as a mutable cell combined with two semaphores and a
;; channel. The semaphores are waited on by threads trying to take or put, and
Expand Down Expand Up @@ -178,6 +183,70 @@
(fail)
fail))

;; -----------------------------------------------------------------------------
;; derived operations

(define (mvar-swap! mv new-val #:enable-break? [enable-break? #f])
(define breaks? (break-enabled))
(parameterize-break #f
(define old-val (mvar-take! mv #:enable-break? (or breaks? enable-break?)))
(mvar-put! mv new-val)
old-val))

(define (call-with-mvar! mv proc #:enable-break? [enable-break? #f])
(define break-paramz (current-break-parameterization))
(define breaks? (break-enabled))
(parameterize-break #f
(define old-val (mvar-take! mv #:enable-break? (or breaks? enable-break?)))
(define new-val old-val)
(dynamic-wind
void
(λ ()
(call-with-continuation-barrier
(λ ()
(call-with-break-parameterization
break-paramz
(λ ()
(call-with-values
(λ () (proc old-val))
(case-lambda
[() (raise-arguments-error
'call-with-mvar!
"contract violation;\n given procedure returned wrong number of results"
"expected" (unquoted-printing-string "at least 1")
"received" 0
"procedure" proc)]
[(val)
(set! new-val val)
(values)]
[(val result)
(set! new-val val)
(values result)]
[(val result1 result2)
(set! new-val val)
(values result1 result2)]
[(val . results)
(set! new-val val)
(apply values results)])))))))
(λ () (mvar-put! mv new-val)))))

(define (mvar-update! mv proc #:enable-break? [enable-break? #f])
(call-with-mvar!
mv #:enable-break? enable-break?
(λ (val) (values (proc val) (void)))))

(define (call-with-mvar mv proc #:enable-break? [enable-break? #f])
(call-with-mvar!
mv #:enable-break? enable-break?
(λ (val)
(call-with-values
(λ () (proc val))
(case-lambda
[() val]
[(result) (values val result)]
[(result1 result2) (values val result1 result2)]
[results (apply values val results)])))))

;; -----------------------------------------------------------------------------
;; chaperones and impersonators

Expand Down
46 changes: 45 additions & 1 deletion mvar-test/tests/data/mvar.rkt
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,56 @@
(check-equal? (execute-one) 'dead)
(check-equal? (execute-one) 'dead))

(test-begin
"basic mvar-swap! usage"
(define mv (make-mvar 1))
(check-equal? (mvar-swap! mv 2) 1)
(check-equal? (mvar-swap! mv 3) 2)
(check-equal? (mvar-swap! mv 4) 3)
(check-equal? (mvar-try-peek mv) 4))

(test-begin
"basic mvar-update! usage"
(define mv (make-mvar 1))
(check-equal? (mvar-try-peek mv) 1)
(mvar-update! mv add1)
(check-equal? (mvar-try-peek mv) 2)
(mvar-update! mv add1)
(check-equal? (mvar-try-peek mv) 3))

(test-case
"mvar-update! restores the value on exceptions"
(define mv (make-mvar 1))
(check-exn exn:fail? (λ () (mvar-update! mv (λ () (error "bang")))))
(check-equal? (mvar-try-peek mv) 1))

(test-case
"basic call-with-mvar usage"
(define mv (make-mvar 1))
(check-equal? (call-with-mvar mv (λ (val) (add1 val))) 2)
(check-equal? (mvar-try-peek mv) 1))

(test-case
"multi-valued return from call-with-mvar"
(define mv (make-mvar 1))
(check-equal? (call-with-values (λ () (call-with-mvar mv (λ (val) (values)))) list)
'())
(check-equal? (call-with-values (λ () (call-with-mvar mv (λ (val) (values (add1 val) (sub1 val))))) list)
'(2 0))
(check-equal? (mvar-try-peek mv) 1))

(test-case
"call-with-mvar! zero-valued return error message"
(check-exn #px"expected: at least 1\n received: 0"
(λ () (call-with-mvar! (make-mvar 1) (λ (val) (values))))))

(define ((exn?-blaming which) exn)
(and (exn:fail:contract:blame? exn)
(let ([b (exn:fail:contract:blame-object exn)])
(equal? (blame-positive b) which))))

(test-begin
(test-case
"mvar/c blame"
(define mv (make-mvar))
(define mv+c (contract (mvar/c exact-integer?) mv 'pos 'neg))
(check-equal? (mvar-put! mv+c 10) (void))
Expand Down

0 comments on commit f2deb66

Please sign in to comment.