Skip to content

Commit

Permalink
fork std/tasks, stricter exceptions (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnetheduck authored Jul 5, 2023
1 parent 5551f10 commit 2067764
Show file tree
Hide file tree
Showing 12 changed files with 315 additions and 33 deletions.
2 changes: 1 addition & 1 deletion benchmarks/bouncing_producer_consumer/taskpool_bpc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ proc bpc_consume_nopoll(usec: int32) =

dummy_cpt()

proc bpc_produce(n, d: int32) {.gcsafe.} =
proc bpc_produce(n, d: int32) {.gcsafe, raises: [].} =
if d > 0:
# Create producer task
tp.spawn bpc_produce(n, d-1)
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/dfs/taskpool_dfs.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import
system/ansi_c, strformat, os, strutils, cpuinfo,
# Library
../../taskpools

when not defined(windows):
# bench
import ../wtime

var tp: Taskpool

proc dfs(depth, breadth: int): uint32 {.gcsafe.} =
proc dfs(depth, breadth: int): uint32 {.gcsafe, raises: [].} =
if depth == 0:
return 1

Expand Down
24 changes: 15 additions & 9 deletions benchmarks/heat/taskpool_heat.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

# From fibril
#
# Original license
Expand Down Expand Up @@ -251,18 +253,22 @@ proc verify() =

me /= nx * ny

if mae > 1e-12:
echo &"Local maximal absolute error {mae:1.3e}"
quit 1
if mre > 1e-12:
echo &"Local maximal relative error {mre:1.3e}"
quit 1
if me > 1e-12:
echo &"Global mean absolute error {me:1.3e}"
quit 1
try:
if mae > 1e-12:
echo &"Local maximal absolute error {mae:1.3e}"
quit 1
if mre > 1e-12:
echo &"Local maximal relative error {mre:1.3e}"
quit 1
if me > 1e-12:
echo &"Global mean absolute error {me:1.3e}"
quit 1
except ValueError: raiseAssert "format strings"

echo "Verification successful"

{.pop.}

proc main() =
var nthreads: int
if existsEnv"TASKPOOL_NUM_THREADS":
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/nqueens/taskpool_nqueens.nim
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ func isValid(n: int32, a: CharArray): bool =
## Returns true if none of the queens conflict and 0 otherwise.

for i in 0'i32 ..< n:
let p = cast[int32](a[i])
let p = int32(a[i])

for j in i+1 ..< n:
let q = cast[int32](a[j])
let q = int32(a[j])
if q == p or q == p - (j-i) or q == p + (j-i):
return false
return true
Expand All @@ -111,7 +111,7 @@ proc nqueens_ser(n, j: int32, a: CharArray): int32 =
if isValid(j+1, a):
result += nqueens_ser(n, j+1, a)

proc nqueens_par(n, j: int32, a: CharArray): int32 {.gcsafe.} =
proc nqueens_par(n, j: int32, a: CharArray): int32 {.gcsafe, raises: [].} =

if n == j:
# Good solution, count it
Expand Down
9 changes: 6 additions & 3 deletions examples/e01_simple_tasks.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ import ../taskpools
block: # Async without result

proc displayInt(x: int) =
stdout.write(x)
stdout.write(" - SUCCESS\n")
try:
stdout.write(x)
stdout.write(" - SUCCESS\n")
except IOError:
quit 1 # can't do anything productive

proc main() =
echo "\nSanity check 1: Printing 123456 654321 in parallel"
Expand All @@ -21,7 +24,7 @@ block: # Async/Await
var tp: Taskpool


proc asyncFib(n: int): int {.gcsafe.} =
proc asyncFib(n: int): int {.gcsafe, raises: [].} =
if n < 2:
return n

Expand Down
2 changes: 1 addition & 1 deletion examples/e02_parallel_pi.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Demo of API using a very inefficient π approcimation algorithm.

import
std/[strutils, math, cpuinfo],
std/[strutils, cpuinfo],
../taskpools

# From https://github.com/nim-lang/Nim/blob/v1.6.2/tests/parallel/tpi.nim
Expand Down
6 changes: 3 additions & 3 deletions taskpools/channels_spsc_single.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import
std/atomics,
./instrumentation/[contracts, loggers]
Expand All @@ -28,8 +30,6 @@ type
itemSize*: uint8
buffer*{.align: 8.}: UncheckedArray[byte]

{.push raises: [AssertionDefect].} # Ensure no exceptions can happen

proc `=`(
dest: var ChannelSPSCSingle,
source: ChannelSPSCSingle
Expand Down Expand Up @@ -78,7 +78,7 @@ func trySend*[T](chan: var ChannelSPSCSingle, src: sink T): bool {.inline.} =
chan.full.store(true, moRelease)
return true

{.pop.} # raises: [AssertionDefect]
{.pop.} # raises: []

# Sanity checks
# ------------------------------------------------------------------------------
Expand Down
5 changes: 3 additions & 2 deletions taskpools/chase_lev_deques.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
# To reduce contention, stealing is done on the opposite end from push/pop
# so that there is a race only for the very last task.

{.push raises: [].} # Ensure no exceptions can happen

import
system/ansi_c,
std/atomics,
Expand Down Expand Up @@ -65,7 +67,6 @@ type
buf: Atomic[ptr Buf[T]]
garbage: ptr Buf[T]

{.push raises: [AssertionDefect].} # Ensure no exceptions can happen
{.push overflowChecks: off.} # We don't want exceptions (for Defect) in a multithreaded context
# but we don't to deal with underflow of unsigned int either
# say "if a < b - c" with c > b
Expand Down Expand Up @@ -192,4 +193,4 @@ proc steal*[T](deque: var ChaseLevDeque[T]): T =
return default(T)

{.pop.} # overflowChecks
{.pop.} # raises: [AssertionDefect]
{.pop.} # raises: []
3 changes: 2 additions & 1 deletion taskpools/event_notifiers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
# but requires the threadpool to be message-passing based.
# https://github.com/mratsim/weave/blob/a230cce98a8524b2680011e496ec17de3c1039f2/weave/cross_thread_com/event_notifiers.nim

{.push raises: [].} # Ensure no exceptions can happen

import
std/locks,
./instrumentation/contracts
Expand All @@ -36,7 +38,6 @@ type
parked: int
signals: int

{.push raises: [AssertionDefect].} # Ensure no exceptions can happen
{.push overflowChecks: off.} # We don't want exceptions (for Defect) in a multithreaded context
# but we don't to deal with underflow of unsigned int either
# say "if a < b - c" with c > b
Expand Down
3 changes: 2 additions & 1 deletion taskpools/flowvars.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
# * Apache v2 license (license terms in the root directory or at http://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.

{.push raises: [].}

import
std/os,
./instrumentation/contracts,
./channels_spsc_single,
./primitives/allocs
Expand Down
13 changes: 6 additions & 7 deletions taskpools/taskpools.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
# In case a thread is blocked for IO, other threads can steal pending tasks in that thread.
# If all threads are pending for IO, the threadpool will not make any progress and be soft-locked.

{.push raises: [AssertionDefect].} # Ensure no exceptions can happen
{.push raises: [].} # Ensure no exceptions can happen

import
system/ansi_c,
Expand All @@ -47,14 +47,13 @@ import
./instrumentation/[contracts, loggers],
./sparsesets,
./flowvars,
./ast_utils
./ast_utils,
./tasks

export
# flowvars
Flowvar, isSpawned, isReady, sync
Flowvar, isSpawned, isReady, sync, tasks

import std/[isolation, tasks]
export isolation

type
WorkerID = int32
Expand Down Expand Up @@ -182,7 +181,7 @@ proc new(T: type TaskNode, parent: TaskNode, task: sink Task): T =
tn.task = task
return tn

proc runTask(tn: var TaskNode) {.raises:[Exception], inline.} =
proc runTask(tn: var TaskNode) {.raises:[], inline.} =
## Run a task and consumes the taskNode
tn.task.invoke()
{.gcsafe.}: # Upstream missing tagging `=destroy` as gcsafe
Expand Down Expand Up @@ -245,7 +244,7 @@ const RootTask = default(Task) # TODO: sentinel value different from null task
template isRootTask(task: Task): bool =
task == RootTask

proc forceFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[Exception].} =
proc forceFuture*[T](fv: Flowvar[T], parentResult: var T) {.raises:[].} =
## Eagerly complete an awaited FlowVar

template ctx: untyped = workerContext
Expand Down
Loading

0 comments on commit 2067764

Please sign in to comment.