"The master told me what I had to do. He speaks to me and I know his name. He calls himself Malebolgia."
Malebolgia creates new spawns.
It is a powerful library in Nim that simplifies the implementation of
concurrent and parallel programming. It provides a straightforward approach to
expressing parallelism using the spawn
construct and ensures synchronization
using barriers.
- Works well on embedded devices.
- Bounded memory consumption: Solves the "backpressure" problem as a side effect.
- Only support "structured" concurrency.
- Detach the notion of "wait for all tasks" from the notion of a "thread pool".
- Detects simple "read/write" and "write/write" conflicts.
- Builtin support for cancelation and timeouts.
- Small: Less than 300 lines of Nim code, no dependencies.
- Low energy consumption.
- Fast: Wins some benchmarks (crawler; DFS), shows acceptable performance for others (fib).
This program demonstrates the parallel execution of the depth-first search algorithm
using Malebolgia. By utilizing the spawn
and awaitAll
features, the program can
efficiently distribute the workload across multiple threads, enabling faster computation:
import malebolgia
proc dfs(depth, breadth: int): int {.gcsafe.} =
if depth == 0: return 1
# The seq where we collect the results of the subtasks:
var sums = newSeq[int](breadth)
# Create a Master object for task coordination:
var m = createMaster()
# Synchronize all spawned tasks using an AwaitAll block:
m.awaitAll:
for i in 0 ..< breadth:
# Spawn subtasks recursively, store the result in `sums[i]`:
m.spawn dfs(depth - 1, breadth) -> sums[i]
result = 0
for i in 0 ..< breadth:
result += sums[i] # No `sync(sums[i])` required
let answer = dfs(8, 8)
echo answer
Notice the absence of a FlowVar[T]
concept. Malebolgia does not offer
FlowVars because they are not required. Instead the barrier within awaitAll
synchronizes.
Compile this with nim c -d:ThreadPoolSize=8 -d:FixedChanSize=16 dfs.nim
.
There are two parameters that influence the efficiency of Malebolgia:
ThreadPoolSize
: Usually this should be the number of CPU cores, but for IO bound programs it can be much higher.FixedChanSize
: The fixed size of the communication channel(s). The default value is usually good enough.
If a spawned
task raises an exception, the master object notices and rethrows the exception after
awaitAll
. If multiple tasks raise an exception only the first exception is kept and rethrown.
Cancelation is available by calling cancel
on the master
object:
import malebolgia
proc foo = echo "foo"
proc bar(s: string) = echo "bar ", s
var m = createMaster()
m.awaitAll:
m.spawn foo()
for i in 0..<1000:
m.spawn bar($i)
if i == 300:
# cancel after 300 iterations:
m.cancel()
createMaster
supports an optional timeout
parameter. The timeout covers
all created tasks that belong to the created master. Long running tasks
can query master.cancelled
to see if they should stop.
import std / times
import malebolgia
proc bar(s: string) = echo "bar ", s
var m = createMaster(initDuration(milliseconds=500))
m.awaitAll:
for i in 0..<1000:
m.spawn bar($i)
if not m.cancelled:
# if not cancelled, run even more:
for i in 1000..<2000:
m.spawn bar($i)
A Master
object cannot be passed to subroutines, but
a MasterHandle
can be passed to subroutines. In order to create a MasterHandle
use the getHandle
proc:
import malebolgia
proc g(m: MasterHandle; i: int) {.gcsafe.} =
if i < 800:
echo "BEGIN G"
m.spawn g(m, i+1)
echo "END G"
proc main =
var m = createMaster()
m.awaitAll:
m.spawn g(getHandle(m), 0)
main()
A MasterHandle
does not support the awaitAll
operation but it can spawn
new tasks and supports cancelation. Thus a MasterHandle
object cannot be used
to break the structured concurrency abstraction.
The Locker[T]
type wraps a data structure of type T
with a lock and enables
these types to be passed to a spawned
operation. The data structure allows
shared access and mutation:
import std / [strutils, tables]
import malebolgia
import malebolgia / lockers
proc countWords(filename: string; results: Locker[CountTable[string]]) =
for w in splitWhitespace(readFile(filename)):
lock results as r:
r.inc w
proc main() =
var m = createMaster()
var results = initLocker initCountTable[string]()
m.awaitAll:
m.spawn countWords("README.md", results)
m.spawn countWords("malebolgia.nimble", results)
unprotected results as r:
r.sort()
echo r
main()
Currently var T
parameters are unfortunately not supported but it is easy to
work around this limitation: Since the parallelism is "structured" we can take the
address of a variable declared outside of the awaitAll
block and pass it safely to
a spawned
operation:
import std / [strutils, tables]
import malebolgia
import malebolgia / ticketlocks
proc countWords(filename: string; results: ptr CountTable[string]; L: ptr TicketLock) =
for w in splitWhitespace(readFile(filename)):
withLock L[]:
results[].inc w
proc main =
var m = createMaster()
var results = initCountTable[string]()
var L = initTicketLock() # protects `results`
m.awaitAll:
m.spawn countWords("temp.nim", addr results, addr L)
m.spawn countWords("tester.nim", addr results, addr L)
results.sort()
echo results
main()
In general such a parameter needs to be protected by a lock.
We use Malebolgia's TicketLock
here which does not require annoying deinitLock
calls.