Skip to content

Commit

Permalink
Support for fibers
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Aug 22, 2022
1 parent 9f13d3e commit f6739b0
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 1 deletion.
131 changes: 131 additions & 0 deletions source/concurrency/fiber.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
module concurrency.fiber;

import concurrency.sender;
import concepts;
import core.thread.fiber;
import core.thread.fiber : Fiber;

alias Continuation = Object;

package(concurrency) abstract class BaseFiber : Fiber {
private Continuation continuation;
this(void delegate() dg, size_t sz, size_t guardPageSize) @trusted nothrow {
super(dg, sz, guardPageSize);
}
static BaseFiber getThis() @trusted nothrow {
import core.thread.fiber : Fiber;
return cast(BaseFiber)Fiber.getThis();
}
}

class OpFiber(Op) : BaseFiber {
import core.memory : pageSize;

private Op op;

this(void delegate() shared @safe nothrow dg, size_t sz = pageSize * defaultStackPages, size_t guardPageSize = pageSize) @trusted nothrow {
super(cast(void delegate())dg, sz, guardPageSize);
}
}

struct FiberSender {
static assert (models!(typeof(this), isSender));
alias Value = void;
auto connect(Receiver)(return Receiver receiver) @safe scope return {
auto op = FiberSenderOp!(Receiver)(receiver);
return op;
}
}

struct FiberSenderOp(Receiver) {
Receiver receiver;
alias BaseSender = typeof(receiver.getScheduler().schedule());
alias Op = OpType!(BaseSender, InnerFiberSchedulerReceiver!Receiver);
void start() @trusted nothrow scope {
auto fiber = new OpFiber!Op(cast(void delegate()shared nothrow @safe)&run);
cycle(fiber, true);
}
private void schedule(OpFiber!Op fiber) @trusted nothrow {
// TODO: why can't we store the Op here?
fiber.op = receiver.getScheduler.schedule().connect(InnerFiberSchedulerReceiver!Receiver(fiber, &cycle, receiver));
fiber.op.start();
}
private void cycle(BaseFiber f, bool inline_) @trusted nothrow {
auto fiber = cast(OpFiber!Op)f;
if (!inline_)
return schedule(fiber);
import std.exception : assumeWontThrow;
fiber.call(Fiber.Rethrow.no).assumeWontThrow;
if (fiber.continuation !is null) {
auto sender = cast(SenderObjectBase!void)fiber.continuation;
try {
auto op = sender.connectHeap(InnerFiberSchedulerReceiver!Receiver(fiber, &cycle, receiver));
op.start();
} catch (Throwable t) {
receiver.setError(t);
return;
}
} else if (fiber.state == Fiber.State.HOLD) {
schedule(fiber);
} else {
// reuse it?
}
}
private void run() nothrow @trusted {
import concurrency.receiver : setValueOrError;
import concurrency.error : clone;
import concurrency : parentStopSource;

parentStopSource = receiver.getStopToken().source;

try {
receiver.setValue();
} catch (Exception e) {
receiver.setError(e);
} catch (Throwable t) {
receiver.setError(t.clone());
}

parentStopSource = null;
}
}

struct InnerFiberSchedulerReceiver(Receiver) {
import concurrency.receiver : ForwardExtensionPoints;
BaseFiber fiber;
void delegate(BaseFiber, bool) nothrow @trusted cycle;
Receiver receiver;
void setDone() nothrow @safe {
cycle(fiber, true);
}
void setError(Throwable e) nothrow @safe {
cycle(fiber, true);
}
void setValue() nothrow @safe {
cycle(fiber, true);
}
mixin ForwardExtensionPoints!receiver;
}

void yield() @trusted {
import std.concurrency;
std.concurrency.yield();
}

auto yield(Sender)(Sender sender) @trusted {
import concurrency : Result;
import concurrency.operations : onResult;
import concurrency.sender : toSenderObject;

auto fiber = BaseFiber.getThis();

shared Result!(Sender.Value) local;
fiber.continuation = cast(Object)sender.onResult((Result!(Sender.Value) r) @safe shared { local = r; }).toSenderObject;
yield();
fiber.continuation = null;

static if (!is(Sender.Value == void))
return local.value;
else
(cast()local).assumeOk;
}
2 changes: 1 addition & 1 deletion source/concurrency/thread.d
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ package struct LocalThreadWorker {
if (!executor.queue.empty) {
auto work = executor.queue.pop();
import std.stdio;
writeln("Got unwanted message ", work);
writeln("Got unwanted message ", work.payload);
assert(0);
}
assert(executor.wheels.totalTimers == 0, "Still timers left");
Expand Down
39 changes: 39 additions & 0 deletions tests/ut/concurrency/fiber.d
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
module ut.concurrency.fiber;

import concurrency.fiber;
import concurrency.operations : then, whenAll;
import concurrency;
import concurrency.sender;
import core.time;

import unit_threaded;

@("yield.basic")
@safe unittest {
auto fiber = FiberSender().then(() @trusted shared {
yield();
});
whenAll(fiber, fiber).syncWait().assumeOk;
}

@("yield.delay")
@safe unittest {
auto fiber = FiberSender().then(() @trusted shared {
delay(100.msecs).yield();
});
whenAll(fiber, fiber).syncWait().assumeOk;
}

@("yield.error")
@safe unittest {
FiberSender().then(() @trusted shared {
ThrowingSender().yield();
}).syncWait().isError.should == true;
}

@("yield.error")
@safe unittest {
FiberSender().then(() @trusted shared {
DoneSender().yield();
}).syncWait().isError.should == true;
}
1 change: 1 addition & 0 deletions tests/ut/ut_runner.d
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ int main(string[] args)
"ut.concurrency.mpsc",
"ut.concurrency.waitable",
"ut.concurrency.asyncscope",
"ut.concurrency.fiber",
);
}

0 comments on commit f6739b0

Please sign in to comment.