From bb457424c4bc9c363026fcfff07af5a3b350de0e Mon Sep 17 00:00:00 2001 From: hugosenari Date: Wed, 30 Aug 2023 23:29:46 -0300 Subject: [PATCH 1/3] experiment: nessie --- src/experiment/malebolgia_nessie.nim | 322 +++++++++++++++++++++++++++ src/malebolgia.nim | 105 +++++---- 2 files changed, 372 insertions(+), 55 deletions(-) create mode 100644 src/experiment/malebolgia_nessie.nim diff --git a/src/experiment/malebolgia_nessie.nim b/src/experiment/malebolgia_nessie.nim new file mode 100644 index 0000000..e4829f8 --- /dev/null +++ b/src/experiment/malebolgia_nessie.nim @@ -0,0 +1,322 @@ +import std/monotimes +import std/times +import std/tasks +import std/strutils + + +### MALEBOLGIA + +import malebolgia + +# (c) 2023 Andreas Rumpf + +import std / [atomics, locks, tasks, times] +from std / os import sleep + +import std / isolation +export isolation + +type + Master* = object ## Masters can spawn new tasks inside an `awaitAll` block. + c: Cond + L: Lock + error: string + runningTasks: int + completedTasks: Atomic[int] + stopToken: Atomic[bool] + shouldEndAt: Time + usesTimeout: bool + +proc `=destroy`(m: var Master) {.inline.} = + deinitCond(m.c) + deinitLock(m.L) + +proc `=copy`(dest: var Master; src: Master) {.error.} +proc `=sink`(dest: var Master; src: Master) {.error.} + +proc createMaster*(timeout = default(Duration)): Master = + result = default(Master) + initCond(result.c) + initLock(result.L) + + result.completedTasks.store(0, moRelaxed) + if timeout != default(Duration): + result.usesTimeout = true + result.shouldEndAt = getTime() + timeout + +proc cancel*(m: var Master) = + ## Try to stop all running tasks immediately. + ## This cannot fail but it might take longer than desired. + store(m.stopToken, true, moRelaxed) + +proc cancelled*(m: var Master): bool {.inline.} = + m.stopToken.load(moRelaxed) + +proc taskCompleted(m: var Master) {.inline.} = + m.completedTasks.atomicInc + if m.runningTasks == m.completedTasks.load(moRelaxed): + signal(m.c) + +proc stillHaveTime*(m: Master): bool {.inline.} = + not m.usesTimeout or getTime() < m.shouldEndAt + +proc waitForCompletions(m: var Master) = + var timeoutErr = false + if not m.usesTimeout: + while m.runningTasks > m.completedTasks.load(moRelaxed): + wait(m.c, m.L) + else: + while true: + let success = m.runningTasks > m.completedTasks.load(moRelaxed) + if success: break + if getTime() > m.shouldEndAt: + timeoutErr = true + break + sleep(10) # XXX maybe make more precise + let err = move(m.error) + if err.len > 0: + raise newException(ValueError, err) + elif timeoutErr: + m.cancel() + raise newException(ValueError, "'awaitAll' timeout") + +# thread pool independent of the 'master': + +const + FixedChanSize {.intdefine.} = 16 ## must be a power of two! + FixedChanMask = FixedChanSize - 1 + + ThreadPoolSize {.intdefine.} = 8 # 24 + +type + PoolTask = object ## a task for the thread pool + m: ptr Master ## who is waiting for us + t: Task ## what to do + result: pointer ## where to store the potential result + + FixedChan = object ## channel of a fixed size + spaceAvailable, dataAvailable: Cond + L: Lock + data: array[FixedChanSize, PoolTask] + lock: array[FixedChanSize, Lock] + todo: array[FixedChanSize, bool] + +var + thr: array[ThreadPoolSize-1, Thread[void]] # -1 because the main thread counts too + chan: FixedChan + globalStopToken: Atomic[bool] + +proc send(item: sink PoolTask) = + # see deques.addLast: + while true: + for i in 0..high(chan.lock): + if not tryAcquire(chan.lock[i]): + continue + if chan.todo[i]: + chan.lock[i].release + continue + chan.data[i] = item + chan.todo[i] = true + chan.lock[i].release + return + signal(chan.dataAvailable) + # TODO: I removed the feature of busyThreads that runs in current instead of wait + wait(chan.spaceAvailable, chan.L) + + +proc worker() {.thread.} = + var item: PoolTask + while not globalStopToken.load(moRelaxed): + for i in 0..high(chan.lock): + let before = getMonoTime() + if not tryAcquire(chan.lock[i]): + continue + if not chan.todo[i]: + chan.lock[i].release + continue + item = move chan.data[i] + if item.m.stopToken.load(moRelaxed): + chan.lock[i].release + break + try: + item.t.invoke(item.result) + except: + acquire(item.m.L) + if item.m.error.len == 0: + let e = getCurrentException() + item.m.error = "SPAWN FAILURE: [" & $e.name & "] " & e.msg & "\n" & getStackTrace(e) + release(item.m.L) + finally: + chan.todo[i] = false + chan.lock[i].release + signal(chan.spaceAvailable) + # but mark it as completed either way! + taskCompleted item.m[] + wait(chan.dataAvailable, chan.L) + +proc setup() = + initCond(chan.dataAvailable) + initCond(chan.spaceAvailable) + initLock(chan.L) + for i in 0..high(thr): createThread[void](thr[i], worker) + for i in 0..high(chan.lock): + initLock(chan.lock[i]) + chan.todo[i] = false + +proc panicStop*() = + ## Stops all threads. + globalStopToken.store(true, moRelaxed) + joinThreads(thr) + deinitCond(chan.dataAvailable) + deinitCond(chan.spaceAvailable) + deinitLock(chan.L) + for i in 0..high(chan.lock): deinitLock(chan.lock[i]) + +template spawnImplRes[T](master: var Master; fn: typed; res: T) = + if stillHaveTime(master): + master.runningTasks += 1 + send PoolTask(m: addr(master), t: toTask(fn), result: addr res) + +template spawnImplNoRes(master: var Master; fn: typed) = + if stillHaveTime(master): + master.runningTasks += 1 + send PoolTask(m: addr(master), t: toTask(fn), result: nil) + +import std / macros + +macro spawn*(a: Master; b: untyped) = + if b.kind in nnkCallKinds and b.len == 3 and b[0].eqIdent("->"): + result = newCall(bindSym"spawnImplRes", a, b[1], b[2]) + else: + result = newCall(bindSym"spawnImplNoRes", a, b) + +macro checkBody(body: untyped): untyped = + # We check here for dangerous "too early" access of memory locations that + # are "already" gone. + # For example: + # + # m.awaitAll: + # m.spawn g(i+1) -> resA + # m.spawn g(i+1) -> resA # <-- store into the same location without protection! + + const DeclarativeNodes = {nnkTypeSection, nnkFormalParams, nnkGenericParams, + nnkMacroDef, nnkTemplateDef, nnkConstSection, nnkConstDef, + nnkIncludeStmt, nnkImportStmt, + nnkExportStmt, nnkPragma, nnkCommentStmt, + nnkTypeOfExpr, nnkMixinStmt, nnkBindStmt} + + const BranchingNodes = {nnkIfStmt, nnkCaseStmt} + + proc isSpawn(n: NimNode): bool = + n.eqIdent("spawn") or (n.kind == nnkDotExpr and n[1].eqIdent("spawn")) + + proc check(n: NimNode; exprs: var seq[NimNode]; withinLoop: bool) = + if n.kind in nnkCallKinds and isSpawn(n[0]): + let b = n[^1] + for i in 1 ..< n.len: + check n[i], exprs, withinLoop + if b.kind in nnkCallKinds and b.len == 3 and b[0].eqIdent("->"): + let dest = b[2] + exprs.add dest + if withinLoop and dest.kind in {nnkSym, nnkIdent}: + error("re-use of expression '" & $dest & "' before 'awaitAll' completed", dest) + + elif n.kind in DeclarativeNodes: + discard "declarative nodes are not interesting" + else: + let withinLoopB = withinLoop or n.kind in {nnkWhileStmt, nnkForStmt} + if n.kind in BranchingNodes: + let preExprs = exprs[0..^1] + for child in items(n): + var branchExprs = preExprs + check child, branchExprs, withinLoopB + exprs.add branchExprs[preExprs.len..^1] + else: + for child in items(n): check child, exprs, withinLoopB + for i in 0.. 0: + while m.runningTasks > m.completedTasks.load(moRelaxed): wait(m.c, m.L) else: while true: - acquire(m.L) - let success = m.runningTasks == 0 - release(m.L) + let success = m.runningTasks > m.completedTasks.load(moRelaxed) if success: break if getTime() > m.shouldEndAt: timeoutErr = true break sleep(10) # XXX maybe make more precise - acquire(m.L) let err = move(m.error) - release(m.L) if err.len > 0: raise newException(ValueError, err) elif timeoutErr: @@ -96,63 +87,71 @@ type FixedChan = object ## channel of a fixed size spaceAvailable, dataAvailable: Cond L: Lock - head, tail, count: int data: array[FixedChanSize, PoolTask] + lock: array[FixedChanSize, Lock] + todo: array[FixedChanSize, bool] var thr: array[ThreadPoolSize-1, Thread[void]] # -1 because the main thread counts too chan: FixedChan globalStopToken: Atomic[bool] - busyThreads: Atomic[int] proc send(item: sink PoolTask) = # see deques.addLast: - acquire(chan.L) - while chan.count >= FixedChanSize: - wait(chan.spaceAvailable, chan.L) - if chan.count < FixedChanSize: - inc chan.count - chan.data[chan.tail] = item - chan.tail = (chan.tail + 1) and FixedChanMask + while true: + for i in 0..high(chan.lock): + if not tryAcquire(chan.lock[i]): + continue + if chan.todo[i]: + chan.lock[i].release + continue + chan.data[i] = item + chan.todo[i] = true + chan.lock[i].release + return signal(chan.dataAvailable) - release(chan.L) - else: - release(chan.L) - quit "logic bug: queue not empty after signal!" + # TODO: I removed the feature of busyThreads that runs in current instead of wait + wait(chan.spaceAvailable, chan.L) + proc worker() {.thread.} = var item: PoolTask while not globalStopToken.load(moRelaxed): - acquire(chan.L) - while chan.count == 0: - wait(chan.dataAvailable, chan.L) - if chan.count > 0: - # see deques.popFirst: - dec chan.count - item = move chan.data[chan.head] - chan.head = (chan.head + 1) and FixedChanMask - signal(chan.spaceAvailable) - release(chan.L) - if not item.m.stopToken.load(moRelaxed): + for i in 0..high(chan.lock): + let before = getMonoTime() + if not tryAcquire(chan.lock[i]): + continue + if not chan.todo[i]: + chan.lock[i].release + continue + item = move chan.data[i] + if item.m.stopToken.load(moRelaxed): + chan.lock[i].release + break try: - atomicInc busyThreads item.t.invoke(item.result) - atomicDec busyThreads except: acquire(item.m.L) if item.m.error.len == 0: let e = getCurrentException() item.m.error = "SPAWN FAILURE: [" & $e.name & "] " & e.msg & "\n" & getStackTrace(e) release(item.m.L) - - # but mark it as completed either way! - taskCompleted item.m[] + finally: + chan.todo[i] = false + chan.lock[i].release + signal(chan.spaceAvailable) + # but mark it as completed either way! + taskCompleted item.m[] + wait(chan.dataAvailable, chan.L) proc setup() = initCond(chan.dataAvailable) initCond(chan.spaceAvailable) initLock(chan.L) for i in 0..high(thr): createThread[void](thr[i], worker) + for i in 0..high(chan.lock): + initLock(chan.lock[i]) + chan.todo[i] = false proc panicStop*() = ## Stops all threads. @@ -161,22 +160,17 @@ proc panicStop*() = deinitCond(chan.dataAvailable) deinitCond(chan.spaceAvailable) deinitLock(chan.L) + for i in 0..high(chan.lock): deinitLock(chan.lock[i]) template spawnImplRes[T](master: var Master; fn: typed; res: T) = if stillHaveTime(master): - if busyThreads.load(moRelaxed) < ThreadPoolSize-1: - taskCreated master - send PoolTask(m: addr(master), t: toTask(fn), result: addr res) - else: - res = fn + master.runningTasks += 1 + send PoolTask(m: addr(master), t: toTask(fn), result: addr res) template spawnImplNoRes(master: var Master; fn: typed) = if stillHaveTime(master): - if busyThreads.load(moRelaxed) < ThreadPoolSize-1: - taskCreated master - send PoolTask(m: addr(master), t: toTask(fn), result: nil) - else: - fn + master.runningTasks += 1 + send PoolTask(m: addr(master), t: toTask(fn), result: nil) import std / macros @@ -243,6 +237,7 @@ template awaitAll*(master: var Master; body: untyped) = try: checkBody body finally: + signal(chan.dataAvailable) waitForCompletions(master) when not defined(maleSkipSetup): From 06ed5a21682cf11bedda624df318e0002239946e Mon Sep 17 00:00:00 2001 From: Hugo Sena Ribeiro Date: Thu, 31 Aug 2023 02:46:56 -0300 Subject: [PATCH 2/3] Update src/malebolgia.nim --- src/malebolgia.nim | 1 - 1 file changed, 1 deletion(-) diff --git a/src/malebolgia.nim b/src/malebolgia.nim index bbc21fe..1191959 100644 --- a/src/malebolgia.nim +++ b/src/malebolgia.nim @@ -118,7 +118,6 @@ proc worker() {.thread.} = var item: PoolTask while not globalStopToken.load(moRelaxed): for i in 0..high(chan.lock): - let before = getMonoTime() if not tryAcquire(chan.lock[i]): continue if not chan.todo[i]: From a906562b4e63e6bf06862de1b1a64ab96f011699 Mon Sep 17 00:00:00 2001 From: hugosenari Date: Sat, 2 Sep 2023 17:20:36 -0300 Subject: [PATCH 3/3] experiment: Gilson --- src/experiment/malebolgia_gilson.nim | 505 +++++++++++++++++++++++++++ 1 file changed, 505 insertions(+) create mode 100644 src/experiment/malebolgia_gilson.nim diff --git a/src/experiment/malebolgia_gilson.nim b/src/experiment/malebolgia_gilson.nim new file mode 100644 index 0000000..1ec3dcc --- /dev/null +++ b/src/experiment/malebolgia_gilson.nim @@ -0,0 +1,505 @@ +import std/monotimes +import std/times +import std/tasks +import std/strutils + + +### MALEBOLGIA + +import malebolgia + +# (c) 2023 Andreas Rumpf + +import std / [atomics, locks, tasks, times] +from std / os import sleep + +import std / isolation +export isolation + + +const ThreadPoolSize {.intdefine.} = 8 # 24 + + +type + ## Prevent all the pool from waiting the same resource + ## That can be summarized at the end + LockPool[T] = object + overloaded: bool + # Happens at the initalization and someone free resource after full + available: Cond + # Happens every time we locks[ThreadPoolSize mod 4 == 0] + halfloaded:Cond + # Happens every time that all our locks are busy + full: Cond + lock: Lock + busy: array[ThreadPoolSize * 2, bool] + locks: array[ThreadPoolSize * 2, Lock] + values: array[ThreadPoolSize * 2, T] + + +proc initLockPool*[T](): LockPool[T] = + result = default(LockPool[T]) + initCond(result.available) + initCond(result.halfloaded) + initCond(result.full) + initLock(result.lock) + for i in 0..high(result.locks): + initLock(result.locks[i]) + signal(result.available) + + +proc deinitLockPool(pool: var LockPool) {.inline.} = + deinitCond(pool.available) + deinitCond(pool.halfloaded) + deinitCond(pool.full) + deinitLock(pool.lock) + for i in 0..high(pool.locks): + deinitLock(pool.locks[i]) + +## We are not using this but still here as reference +proc acquire[T](pool: var LockPool[T]): int {.inline.} = + while true: + for i in 0..high(pool.locks): + if pool.busy[i]: + continue + if not pool.locks[i].tryAcquire: + continue + pool.busy[i] = true + if i mod 4 == 0: + signal(pool.halfloaded) + return i + + if tryAcquire(pool.lock): + pool.overloaded = true + signal(pool.full) + wait(pool.available, pool.lock) + release(pool.lock) + return -1 + +proc release[T](pool: var LockPool[T]; pos: int) {.inline.} = + release(pool.locks[pos]) + pool.busy[pos] = false + if pos mod 4 == 0 and tryAcquire(pool.lock): + signal(pool.available) + release(pool.lock) + if pool.overloaded and tryAcquire(pool.lock): + pool.overloaded = false + signal(pool.available) + release(pool.lock) + + +type + Master* = object ## Masters can spawn new tasks inside an `awaitAll` block. + c: Cond + L: Lock + error: string + runningTasks: int + completedTasks: int + stopToken: Atomic[bool] + shouldEndAt: Time + usesTimeout: bool + pendingTasks: LockPool[int] + +proc `=destroy`(m: var Master) {.inline.} = + deinitCond(m.c) + deinitLock(m.L) + deinitLockPool(m.pendingTasks) + +proc `=copy`(dest: var Master; src: Master) {.error.} +proc `=sink`(dest: var Master; src: Master) {.error.} + +proc createMaster*(timeout = default(Duration)): Master = + result = default(Master) + initCond(result.c) + initLock(result.L) + var pendingTasks = initLockPool[int]() + result.pendingTasks = pendingTasks + + if timeout != default(Duration): + result.usesTimeout = true + result.shouldEndAt = getTime() + timeout + +proc taskCreated(m: var Master;pos: int) {.inline.} = + m.pendingTasks.busy[pos] = true + m.pendingTasks.values[pos] = getThreadId() + +proc taskCompleted(m: var Master; pos: int) {.inline.} = + m.pendingTasks.busy[pos] = false + m.pendingTasks.values[pos] = getThreadId() + +proc cancel*(m: var Master) = + ## Try to stop all running tasks immediately. + ## This cannot fail but it might take longer than desired. + store(m.stopToken, true, moRelaxed) + +proc cancelled*(m: var Master): bool {.inline.} = + m.stopToken.load(moRelaxed) + +proc stillHaveTime*(m: Master): bool {.inline.} = + not m.usesTimeout or getTime() < m.shouldEndAt + +iterator waitForCompletions(m: var Master): bool = + var timeoutErr = false + var pendings = len(m.pendingTasks.locks) + while pendings > 0: + pendings = len(m.pendingTasks.locks) + #echo "busy list: ", m.pendingTasks.busy + for i in 0..high(m.pendingTasks.locks): + if m.pendingTasks.busy[i]: + continue + if not tryAcquire(m.pendingTasks.locks[i]): + continue + # makes sure value is what we tested before the lock + if m.pendingTasks.busy[i]: + release(m.pendingTasks.locks[i]) + continue + m.pendingTasks.values[i] = getThreadId() + dec(pendings) + release(m.pendingTasks.locks[i]) + yield false + yield true + # + # if m.usesTimeout and getTime() > m.shouldEndAt: + # timeoutErr = true + # break + # elif m.usesTimeout: + # sleep(10) # XXX maybe make more precise + let err = move(m.error) + if err.len > 0: + raise newException(ValueError, err) + elif timeoutErr: + m.cancel() + raise newException(ValueError, "'awaitAll' timeout") + + +# thread pool independent of the 'master': +type + PoolTaskKind = enum + ptkSlot + ptkTask + + PoolTask = object ## a task for the thread pool + case kind: PoolTaskKind + of ptkTask: + m: ptr Master ## who is waiting for us + t: Task ## what to do + result: pointer ## where to store the potential result + else: discard + + FixedChan = LockPool[PoolTask] + +var + thr: array[ThreadPoolSize-1, Thread[void]] # -1 because the main thread counts too + chan: FixedChan + globalStopToken: Atomic[bool] + + +proc acquireSlot(pool: var LockPool[PoolTask]; waitable: bool = true; attempts: int = 2): int {.inline.} = + result = -1 + var executions = attempts + while true: + dec executions + for i in 0..high(pool.locks): + if pool.busy[i]: + continue + if pool.values[i].kind == ptkTask: + continue + if not pool.locks[i].tryAcquire: + continue + if pool.values[i].kind == ptkTask: + release(pool.locks[i]) + continue + pool.busy[i] = true + if i mod 4 == 0: + signal(pool.halfloaded) + return i + pool.overloaded = true + + if tryAcquire(pool.lock): + release(pool.lock) + + if executions == 0 and not waitable: + return -1 + + if executions == 0: + executions = attempts + if tryAcquire(pool.lock): + #echo "waiting more slots ", getThreadId(), " locked" + wait(pool.available, pool.lock) + release(pool.lock) + else: + # if thread is locked someone is waiting for this + signal(pool.halfloaded) + + + +proc acquireTask(pool: var LockPool[PoolTask]; waitable: bool = true; attempts: int = 4): int {.inline.} = + result = -1 + var executions = attempts + while true: + dec executions + for i in 0..high(pool.locks): + if pool.busy[i]: + continue + if pool.values[i].kind == ptkSlot: + continue + if not pool.locks[i].tryAcquire: + continue + if pool.values[i].kind == ptkSlot: + release(pool.locks[i]) + continue + pool.busy[i] = true + return i + + if executions == 0 and not waitable: + return -1 + + if executions == 0: + executions = attempts + if tryAcquire(pool.lock): + #echo "waiting more tasks ", getThreadId(), " locked" + wait(pool.halfloaded, pool.lock) + release(pool.lock) + else: + #echo "waiting more tasks ", getThreadId() + acquire(pool.lock) + release(pool.lock) + #echo "receive more tasks ", getThreadId() + +proc send(item: sink PoolTask) = + var pos = acquireSlot(chan) + # if pos < 0: + # var kinds = [ptkSlot,ptkSlot,ptkSlot,ptkSlot,ptkSlot,ptkSlot,ptkSlot,ptkSlot] + # for i in 0..high(chan.locks): + # if chan.values[i].kind == ptkTask: + # signal(chan.halfloaded) + # kinds[i] = chan.values[i].kind + # echo "No slots see ", kinds + # pos = acquireSlot(chan) + + item.m[].taskCreated(pos) + chan.values[pos] = move item + release chan, pos + +proc worker() {.thread.} = + var item = PoolTask(kind: ptkSlot) + while not globalStopToken.load(moRelaxed): + var pos = acquireTask(chan) + # if pos < 0: + # var kinds = [ptkSlot,ptkSlot,ptkSlot,ptkSlot,ptkSlot,ptkSlot,ptkSlot,ptkSlot] + # for i in 0..high(chan.locks): + # kinds[i] = chan.values[i].kind + # echo "No tasks see ", kinds + # pos = acquireTask(chan, true) + item = move chan.values[pos] + chan.values[pos] = PoolTask(kind: ptkSlot) + try: + if item.m.stopToken.load(moRelaxed): + break + item.t.invoke(item.result) + except: + acquire(item.m.L) + if item.m.error.len == 0: + let e = getCurrentException() + item.m.error = "SPAWN FAILURE: [" & $e.name & "] " & e.msg & "\n" & getStackTrace(e) + release(item.m.L) + finally: + taskCompleted(item.m[], pos) + release chan, pos + + +proc setup() = + chan = initLockPool[PoolTask]() + for i in 0..high(thr): createThread[void](thr[i], worker) + +proc panicStop*() = + ## Stops all threads. + globalStopToken.store(true, moRelaxed) + joinThreads(thr) + deinitLockPool(chan) + +template spawnImplRes[T](master: var Master; fn: typed; res: T) = + if stillHaveTime(master): + send PoolTask(kind: ptkTask, m: addr(master), t: toTask(fn), result: addr res) + +template spawnImplNoRes(master: var Master; fn: typed) = + if stillHaveTime(master): + send PoolTask(kind: ptkTask, m: addr(master), t: toTask(fn), result: nil) + +import std / macros + +macro spawn*(a: Master; b: untyped) = + if b.kind in nnkCallKinds and b.len == 3 and b.eqIdent("->"): + result = newCall(bindSym"spawnImplRes", a, b[1], b[2]) + else: + result = newCall(bindSym"spawnImplNoRes", a, b) + +macro checkBody(body: untyped): untyped = + const DeclarativeNodes = {nnkTypeSection, nnkFormalParams, nnkGenericParams, + nnkMacroDef, nnkTemplateDef, nnkConstSection, nnkConstDef, + nnkIncludeStmt, nnkImportStmt, + nnkExportStmt, nnkPragma, nnkCommentStmt, + nnkTypeOfExpr, nnkMixinStmt, nnkBindStmt} + + const BranchingNodes = {nnkIfStmt, nnkCaseStmt} + + proc isSpawn(n: NimNode): bool = + n.eqIdent("spawn") or (n.kind == nnkDotExpr and n[1].eqIdent("spawn")) + + proc check(n: NimNode; exprs: var seq[NimNode]; withinLoop: bool) = + if n.kind in nnkCallKinds and isSpawn(n): + let b = n[^1] + for i in 1 ..< n.len: + check n[i], exprs, withinLoop + if b.kind in nnkCallKinds and b.len == 3 and b.eqIdent("->"): + let dest = b[2] + exprs.add dest + if withinLoop and dest.kind in {nnkSym, nnkIdent}: + error("re-use of expression '" & $dest & "' before 'awaitAll' completed", dest) + + elif n.kind in DeclarativeNodes: + discard "declarative nodes are not interesting" + else: + let withinLoopB = withinLoop or n.kind in {nnkWhileStmt, nnkForStmt} + if n.kind in BranchingNodes: + let preExprs = exprs[0..^1] + for child in items(n): + var branchExprs = preExprs + check child, branchExprs, withinLoopB + exprs.add branchExprs[preExprs.len..^1] + else: + for child in items(n): check child, exprs, withinLoopB + for i in 0..