Skip to content

Commit

Permalink
Making them work
Browse files Browse the repository at this point in the history
  • Loading branch information
skoppe committed Oct 31, 2022
1 parent c85e855 commit 7250bcf
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 12 deletions.
14 changes: 8 additions & 6 deletions source/concurrency/fiber.d
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ struct FiberSenderOp(Receiver) {
Receiver receiver;
alias BaseSender = typeof(receiver.getScheduler().schedule());
alias Op = OpType!(BaseSender, InnerFiberSchedulerReceiver!Receiver);
@disable this(this);
@disable this(ref return scope typeof(this) rhs);
void start() @trusted nothrow scope {
auto fiber = new OpFiber!Op(cast(void delegate()shared nothrow @safe)&run);
cycle(fiber, true);
Expand All @@ -55,7 +57,7 @@ struct FiberSenderOp(Receiver) {
if (!inline_)
return schedule(fiber);
import std.stdio;
debug writeln("enter fiber");
debug writeln("enter fiber, ", cast(void*)fiber.continuation);
if (auto throwable = fiber.call!(Fiber.Rethrow.no)) {
debug writeln("fiber threw ", throwable);
receiver.setError(throwable);
Expand All @@ -82,14 +84,17 @@ struct FiberSenderOp(Receiver) {
private void run() nothrow @trusted {
import concurrency.receiver : setValueOrError;
import concurrency.error : clone;
import concurrency : parentStopSource;
import concurrency : parentStopSource, CancelledException;

parentStopSource = receiver.getStopToken().source;

import std.stdio;
try {
debug writeln("FiberSender.run");
receiver.setValue();
} catch (CancelledException e) {
debug writeln("FiberSender.run.cancelled");
receiver.setDone();
} catch (Exception e) {
debug writeln("FiberSender.run.exception");
receiver.setError(e);
Expand Down Expand Up @@ -143,8 +148,5 @@ auto yield(Sender)(Sender sender) @trusted {
yield();
debug writeln("Resume");

static if (!is(Sender.Value == void))
return local.value;
else
(cast()local).assumeOk;
return cast()local;
}
8 changes: 7 additions & 1 deletion source/concurrency/syncwait.d
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ package struct SyncWaitReceiver2(Value) {

@reflectErr enum Cancelled { cancelled }

class CancelledException : Exception {
this(string file = __FILE__, size_t line = __LINE__, Throwable next = null) @nogc @safe pure nothrow {
super("Cancelled", file, line, next);
}
}

struct Result(T) {
alias V = Variant!(Cancelled, Exception, T);
V result;
Expand All @@ -79,7 +85,7 @@ struct Result(T) {
import mir.algebraic : match;
return result.match!(valueHandler,
function T (Cancelled c) {
throw new Exception("Cancelled");
throw new CancelledException();
},
function T (Exception e) {
throw e;
Expand Down
24 changes: 19 additions & 5 deletions tests/ut/concurrency/fiber.d
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,35 @@ import unit_threaded;
@("yield.delay")
@safe unittest {
auto fiber = FiberSender().then(() @trusted shared {
delay(100.msecs).yield();
delay(100.msecs).yield().assumeOk;
});
whenAll(fiber, fiber).syncWait().assumeOk;
}

@("yield.error")
@("yield.error.basic")
@safe unittest {
FiberSender().then(() @trusted shared {
ThrowingSender().yield();
ThrowingSender().yield().isError.should == true;
}).syncWait();
}

@("yield.error.propagate")
@safe unittest {
FiberSender().then(() @trusted shared {
ThrowingSender().yield().assumeOk;
}).syncWait().isError.should == true;
}

@("yield.cancel")
@("yield.cancel.basic")
@safe unittest {
FiberSender().then(() @trusted shared {
DoneSender().yield().isCancelled.should == true;
}).syncWait().assumeOk;
}

@("yield.cancel.propagate")
@safe unittest {
FiberSender().then(() @trusted shared {
DoneSender().yield();
DoneSender().yield().assumeOk;
}).syncWait().isCancelled.should == true;
}

0 comments on commit 7250bcf

Please sign in to comment.