# (c) 2023 Andreas Rumpf import std / [atomics, locks, tasks, times] from std / os import sleep import std / isolation export isolation template opUS(i:int = 1) {.pragma.} type Master* = object ## Masters can spawn new tasks inside an `awaitAll` block. c: Cond L: Lock error: string runningTasks: int stopToken: Atomic[bool] shouldEndAt: Time usesTimeout, activeProducer: bool budget: Atomic[int] proc `=destroy`(m: var Master) {.inline.} = deinitCond(m.c) deinitLock(m.L) proc `=copy`(dest: var Master; src: Master) {.error.} proc `=sink`(dest: var Master; src: Master) {.error.} const ThreadBudget*{.intdefine.} = 800 ## avg time in nanoseconds we loose to scheduling a task proc createMaster*(timeout = default(Duration); activeProducer = false): Master = ## Set `activeProducer` to true to prevent the master thread from ## running a task directly. ## But beware! This can introduce deadlocks for recursive loads! result = default(Master) initCond(result.c) initLock(result.L) result.budget.store(ThreadBudget) if timeout != default(Duration): result.usesTimeout = true result.shouldEndAt = getTime() + timeout result.activeProducer = activeProducer proc cancel*(m: var Master) = ## Try to stop all running tasks immediately. ## This cannot fail but it might take longer than desired. store(m.stopToken, true, moRelaxed) proc cancelled*(m: var Master): bool {.inline.} = m.stopToken.load(moRelaxed) proc taskCreated(m: var Master) {.inline.} = acquire(m.L) inc m.runningTasks release(m.L) proc taskCompleted(m: var Master) {.inline.} = acquire(m.L) dec m.runningTasks if m.runningTasks == 0: signal(m.c) release(m.L) proc stillHaveTime*(m: Master): bool {.inline.} = not m.usesTimeout or getTime() < m.shouldEndAt proc waitForCompletions(m: var Master) = var timeoutErr = false if not m.usesTimeout: acquire(m.L) while m.runningTasks > 0: wait(m.c, m.L) else: while true: acquire(m.L) let success = m.runningTasks == 0 release(m.L) if success: break if getTime() > m.shouldEndAt: timeoutErr = true break sleep(10) # XXX maybe make more precise acquire(m.L) let err = move(m.error) release(m.L) if err.len > 0: raise newException(ValueError, err) elif timeoutErr: m.cancel() raise newException(ValueError, "'awaitAll' timeout") # thread pool independent of the 'master': const FixedChanSize* {.intdefine.} = 16 ## must be a power of two! FixedChanMask = FixedChanSize - 1 ThreadPoolSize* {.intdefine.} = 8 # 24 type PoolTask = object ## a task for the thread pool m: ptr Master ## who is waiting for us t: Task ## what to do result: pointer ## where to store the potential result FixedChan = object ## channel of a fixed size spaceAvailable, dataAvailable: Cond L: Lock head, tail, count: int data: array[FixedChanSize, PoolTask] var thr: array[ThreadPoolSize-1, Thread[void]] # -1 because the main thread counts too chan: FixedChan globalStopToken: Atomic[bool] busyThreads: Atomic[int] proc send(item: sink PoolTask) = # see deques.addLast: acquire(chan.L) while chan.count >= FixedChanSize: wait(chan.spaceAvailable, chan.L) if chan.count < FixedChanSize: inc chan.count chan.data[chan.tail] = item chan.tail = (chan.tail + 1) and FixedChanMask signal(chan.dataAvailable) release(chan.L) else: release(chan.L) quit "logic bug: queue not empty after signal!" proc worker() {.thread.} = var item: PoolTask while not globalStopToken.load(moRelaxed): acquire(chan.L) while chan.count == 0: wait(chan.dataAvailable, chan.L) if chan.count > 0: # see deques.popFirst: dec chan.count item = move chan.data[chan.head] chan.head = (chan.head + 1) and FixedChanMask signal(chan.spaceAvailable) release(chan.L) if not item.m.stopToken.load(moRelaxed): try: atomicInc busyThreads item.t.invoke(item.result) except: acquire(item.m.L) if item.m.error.len == 0: let e = getCurrentException() item.m.error = "SPAWN FAILURE: [" & $e.name & "] " & e.msg & "\n" & getStackTrace(e) release(item.m.L) finally: atomicDec busyThreads # but mark it as completed either way! taskCompleted item.m[] proc setup() = initCond(chan.dataAvailable) initCond(chan.spaceAvailable) initLock(chan.L) for i in 0..high(thr): createThread[void](thr[i], worker) proc panicStop*() = ## Stops all threads. globalStopToken.store(true, moRelaxed) joinThreads(thr) deinitCond(chan.dataAvailable) deinitCond(chan.spaceAvailable) deinitLock(chan.L) proc shouldSend(master: var Master;): bool {.inline.} = master.activeProducer or busyThreads.load(moRelaxed) < ThreadPoolSize-1 import std / macros macro getOpUs(fn: typed; cp: typed{nkSym}): int = for p in fn[0].getImpl().pragma: if p.len > 0 and p[0].kind == nnkSym and p[0] == cp: return p[1] return newLit(1) func fnCost(opUs: int): int = 1000 div opUs template spawnImplNoRes(master: var Master; fn: typed) = let cost = getOpUS(fn, opUS).fnCost let budget = master.budget.fetchSub(cost) - cost if budget > 0 or stillHaveTime(master): if budget < 0 and shouldSend(master): master.budget.store(ThreadBudget) taskCreated master send PoolTask(m: addr(master), t: toTask(fn), result: nil) else: fn macro spawn*(a: Master; b: untyped) = result = newCall(bindSym"spawnImplNoRes", a, b) template awaitAll*(master: var Master; body: untyped) = try: body finally: waitForCompletions(master) when not defined(maleSkipSetup): setup() include malebolgia / masterhandles # TESTE CODE var m = createMaster() proc spm(): auto {.opUS:100.} = discard ## takes ~0010 ns proc baz(): auto {.opUS:050.} = discard ## takes ~0020 ns proc bar(): auto {.opUS:010.} = discard ## takes ~0100 ns proc foo(): auto {.opUS:001.} = discard ## takes ~1000 ns m.awaitAll: for i in 0 .. 1_000_000: m.spawn spm() m.spawn baz() m.spawn bar() m.spawn foo()