Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add "select" syntax to the language to await the first function in a given set that completes #5263

Open
andrewrk opened this issue May 3, 2020 · 20 comments
Labels
accepted This proposal is planned. proposal This issue suggests modifications. If it also has the "accepted" label then it is planned.
Milestone

Comments

@andrewrk
Copy link
Member

andrewrk commented May 3, 2020

I thought I had already proposed this but I could not find it. This depends on the cancel proposal being accepted, which is related to #3164 but I will open an explicit proposal for it.

There is one thing missing with zig's async / await features which is the ability to respond to the first completed function. Here's a use case:

fn httpRequestWithTimeout(url: []const, timeout_duration: u64) !Response {
    var request_tok = CancelToken{};
    var request = async httpRequest(url, &request_tok);
    defer request_tok.cancel(&request);

    var timeout_tok = CancelToken{};
    var timeout = async sleep(timeout_duration, &timeout_tok);
    defer timeout_tok.cancel(&timeout);

    select {
        request => |result| {
            request_tok.awaited = true;
            return result;
        },
        timeout => {
            timeout_tok.awaited = true;
            return error.TimedOut;
        },
    }
}

Here's a more complicated example, which shows how select also supports arrays of frames:

fn httpRequestWithMirrors(mirrors: []const []const u8) !Request {
    const frames = try allocator.alloc(@Frame(httpRequest), mirrors.len);
    defer allocator.free(frames);
    const cancel_tokens = try allocator.alloc(CancelToken, mirrors.len);
    defer allocator.free(cancel_tokens);

    for (frames) |*frame, i| {
        cancel_tokens[i] = CancelToken{};
        frame.* = async httpRequest(mirrors[i], &cancel_token[i]);
    }

    defer for (cancel_tokens) |*tok, i| tok.cancel(&frames[i]);

    var in_flight_count: usize = mirrors.len;
    while (true) {
        select {
            frames => |result, i| {
                cancel_tokens[i].awaited = true;
                if (result) |payload| {
                    return payload;
                } else |err| {
                    in_flight_count -= 1;
                    if (in_flight_count == 0)
                        return err;
                }
            },
        }
    }
}

Here's another use case:

/// Add a frame to the Batch. If all jobs are in-flight, then this function
/// waits until one completes.
/// This function is *not* thread-safe. It must be called from one thread at
/// a time, however, it need not be the same thread.
/// TODO: "select" language feature to use the next available slot, rather than
/// awaiting the next index.
pub fn add(self: *Self, frame: anyframe->Result) void {
const job = &self.jobs[self.next_job_index];
self.next_job_index = (self.next_job_index + 1) % max_jobs;
if (job.frame) |existing| {
job.result = if (async_ok) await existing else noasync await existing;
if (CollectedResult != void) {
job.result catch |err| {
self.collected_result = err;
};
}
}
job.frame = frame;
}

As an alternative syntax, we could replace select with await. Since only labeled blocks can return values, await { is unambiguously the "select" form of await.

@andrewrk andrewrk added the proposal This issue suggests modifications. If it also has the "accepted" label then it is planned. label May 3, 2020
@andrewrk andrewrk added this to the 0.7.0 milestone May 3, 2020
@rohlem
Copy link
Contributor

rohlem commented May 4, 2020

I can see the conciseness of assuming cancel in the code examples, but the two features are not intrinsically linked... right?

Edit: Thanks for the quick reply, being a multi-await it already makes more sense to me.
await or select-ing one of the select-ed frames concurrently would still lead to (safety-checked) undefined behaviour (as it does with single-frame await currently), but having two of them be await-ed/completed at the same time just means the second one to complete has no more awaiter at that point (and instead stores its result to be await-ed later).

(Original comment:
From what I understand, if we exposed a field .running of @Frame types (as mentioned in #3164 (comment)), we could already implement it in pure userland code. So am I correct in thinking that this is only syntactic sugar to more cleanly associate the frames with the code blocks handling them?

That is to say

[async/noasync/] select{
a => |result| {... break f(a);},
b => |result| {... break g(b);},
else => {... break h();}, //would be required (or generated as unreachable) with noasync keyword prefix, might not make sense with async keyword prefix (unless it can suspend -> repeat the block)?
}

is the same as

//async keyword prefix, if allowed, would move this block into an async function frame (replacing block-breaks with returns) and enclose it with while(true){ ... suspend; }
select_block: {
resume a; if(!a.running) { const result = noasync await a; ... break :select_block f(a); }
resume b; if(!b.running) { const result = noasync await b; ... break :select_block g(b); }
... break :select_block h(); //else, would be required (or generated as unreachable) with noasync keyword prefix, might not make sense with async keyword prefix (unless it can suspend -> continue the enclosing while loop)?
}

? Or am I missing something?
)

@andrewrk
Copy link
Member Author

andrewrk commented May 4, 2020

No, select cannot be implemented in terms of suspend/resume, for the same reason that await cannot be implemented in terms of suspend/resume. await does not do a resume operation. It only sets up the awaited function to resume the awaiter when the awaited function completes.

With select, multiple functions are set up to resume the same awaiter, at different resume code paths. They race against each other, and only one wins, and the others remain un-awaited.

@Rocknest
Copy link
Contributor

Rocknest commented May 5, 2020

PART 1
As i can see this proposal depends on cancel for one reason: to not waste cpu&memory when the result of the suspendable function is no longer needed. Otherwise this can be implemented right now if that problem is ignored (and resource management remains a manual process).

// AUTOCANCEL
fn httpRequestWithTimeout(url: []const, timeout: u64) !Response {
    var request = async httpRequest(url);
    var timeout = async sleep(timeout);

    await {
        request => |result| {
            // cancel timeout;
            // automatically cancels all frames from await block 
            return result;
        } orcancel {
            // when httpRequest finishes and finds out that
            // it is cancelled the control flow jumps to here
            // result is probably in scope
            // free resources or ignore
            _ = result;
            // may not return a value here
        },
        timeout => {
            // cancel request; // same as above
            return error.TimedOut;
        } orcancel {
            // send signal to the timer or ignore
            // may not return a value
        },
    }
}

// MANUAL CANCEL
// orcancel adds a 'finished, but cancelled, await myself' code path.
// this may be a general feature of the await:
fn example() Value {
    const gvframe = async getValue();
    return await gvframe orcancel {
        // await suspends the 'example' frame
        // if 'example' is cancelled (cancel bit is set) this code path will run when 'gvframe' completes 
    }
}

(my personal opinion: timeout is a basic property of a request so httpRequest should probably accept a timeout parameter and use it when calling functions such as connect or read with that timeout value)

PART 2
As far as i know languages that have async/await functionally mostly chose to ignore the problem of wasting resources when the result is no longer needed (rely on garbage collector or other). If it is absolutely required to not waste resources then some ownership mechanism or opt-in semantics are required - that probably means that 'blocking code = async code' will only work for cases 'blocking code = non cancelable async code'. The following is also true: 'cancelable async code = blocking code'.

Some examples for 'cancelable async code = blocking code', assuming that cancel sets a cancel bit in the frame's state:

fn httpRequest(url: []const) ![]const u8 <or cancel> { // opt-in compatible with error union return types
    var conn = connect(url); // suspend point
    var buf = allocBuf(); 
    errdefer free(buf); // resource management
    // every following resume point will check if the frame is cancelled 
    // and simulate `error.CancelZigFrame` so the errdefers run
    readHttpToBuf(buf); // suspend point
    return buf; // see [1]
}

This opt-in mechanism may allow to implement cancel without the ownership mechanism or some other destructor-like thing.

Open questions:
[1] Should cancelling a completed frame run errdefers?: not sure about this, its like simulating that return may return an error (try return?).
[2] Cancelling a frame that has not opted-in is a no op or illegal behaviour?: how return value should be handled (i think illegal behaviour is fine or orcancel is needed).

@andrewrk
Copy link
Member Author

andrewrk commented May 6, 2020

I updated the original post to take advantage of a hypothetical CancelToken abstraction, implemented in userland, rather than the make-believe cancel feature that we don't even have a viable proposal for.

Example implementation of CancelToken for httpRequest is here:

const CancelToken = struct {
    /// Set to `true` to prevent `cancel` from running. This field is helpful by convention;
    /// it's not required to be used.
    awaited: bool = false,

    /// This field is possibly modified from multiple threads and should not be accessed
    /// directly.
    state: enum {run, cancel, done} = .run,

    fn cancel(self: *Operation, frame: var) void {
        if (self.awaited) return;
        const need_cleanup = @atomicRmw(bool, &self.state, .Xchg, .cancel, .SeqCst) == .done;
        const return_value = await frame.*;
        if (need_cleanup) {
            // ... here, clean up resources from the operation completing successfully ...
            if (return_value) |response| {
                response.deinit();
            } else |err| {
                // nothing to clean up in this case
            }
        }
    }

    fn checkIsCanceled(self: *Operation) bool {
        return @atomicLoad(bool, &op.state, .SeqCst) == .cancel;
    }

    fn finishAndCheckIsCanceled(self: *Operation) bool {
        return @atomicRmw(bool, &op.state, .Xchg, .done, .SeqCst) == .cancel;
    }
};

Example implementation of httpRequest here:

fn httpRequest(tok: *CancelToken) !*Response {
    var resp = try allocator.create(Response);
    errdefer allocator.destroy(resp);

    var buf: [4096]u8 = undefined;
    while (true) {
        read_from_socket(&buf);
        if (tok.checkIsCanceled()) {
            return error.OperationCanceled;
        }
        try resp.data.append(buf);
    }

    if (tok.finishAndCheckIsCanceled()) {
        return error.OperationCanceled;
    }

    return resp;
}

@SpexGuy
Copy link
Contributor

SpexGuy commented May 6, 2020

The threading here gets tricky if unselected frames need to be guaranteed to be awaitable, but I think it's still possible as long as all async functions Xchg their frame's awaiter to "already returned", even if they are awaited. Did I miss any race conditions here?

/// Before calling select, all frames must be unawaited or finished but unconsumed.
/// After select returns, all frames will be unawaited or finished but unconsumed,
/// except for the one frame that was selected, which will be consumed.
void select(self: @Frame(select), a: @Frame, b: @Frame, c: @Frame) var {
    self.chosen_path = 0;

    if (cas(&a.awaiter, null, A_READY)) |actual_awaiter| {
        assert(actual_awaiter == ALREADY_RETURNED);
        goto A_READY;
    }
    if (cas(&b.awaiter, null, B_READY)) |actual_awaiter| {
        assert(actual_awaiter == ALREADY_RETURNED);
        goto B_READY;
    }
    if (cas(&c.awaiter, null, C_READY)) |actual_awaiter| {
        assert(actual_awaiter == ALREADY_RETURNED);
        goto C_READY;
    }

A_READY:
    // check if another path already started.  If it has, return to the 'resume' state.
    // a future 'await' will consume the result without suspending, so no other 'resume'
    // is needed.
    if (cas(&self.chosen_path, 0, 1)) |other_path| return;
    
    // consume the input, check for multiple awaiters UB. safe modes only.
    assert(cas(&a.awaiter, ALREADY_RETURNED, RESULT_CONSUMED) == null); 

    // if the other machines haven't finished, restore them to the unfinished state
    // if either of these CAS ops fails, there is a finishing race.  `chosen_path`
    // will handle that race condition, and the other awaiter will be left in a
    // returned but not consumed state.
    _ = cas(&b.awaiter, B_READY, 0);
    _ = cas(&c.awaiter, C_READY, 0);

    return caseA(a.return_value);

B_READY:
    if (cas(&self.chosen_path, 0, 2)) |other_path| return;

    assert(cas(&b.awaiter, ALREADY_RETURNED, RESULT_CONSUMED) == null); 
    _ = cas(&a.awaiter, A_READY, 0);
    _ = cas(&c.awaiter, C_READY, 0);

    return caseB(b.return_value);
    
C_READY:
    if (cas(&self.chosen_path, 0, 3)) |other_path| return;

    assert(cas(&c.awaiter, ALREADY_RETURNED, RESULT_CONSUMED) == null); 
    _ = cas(&a.awaiter, A_READY, 0);
    _ = cas(&b.awaiter, C_READY, 0);

    return caseC(c.return_value);
}

@andrewrk
Copy link
Member Author

andrewrk commented May 6, 2020

Extension to the proposal, adding else prong, which makes the select behave differently;
this will make it select a prong only if it is already completed, otherwise the else
prong will be selected.

select {
    one => {},
    two => {},
    else => {},
}

This would mean that these two things are semantically equivalent:

var y = nosuspend await x;
var y = select {
    x => |value| value,
    else => unreachable,
};

If this makes sense to do when implementing the proposal then let's do it, otherwise
I'll extract it to a separate proposal.

@andrewrk
Copy link
Member Author

andrewrk commented May 6, 2020

@SpexGuy that's the same general idea I had, if I'm reading it correctly. It might be tricky to support arrays of frames though. I do think we should have a plan for how to support arrays of frames.

@SpexGuy
Copy link
Contributor

SpexGuy commented May 7, 2020

We need to iterate all of the frames to reset their awaiters, so scanning to find one that is done shouldn't be too much extra work.
Though this would mean that, unlike most awaiters, it's not guaranteed that the thread that runs the select case is the same as the thread that executed the return instruction for that frame.

pub fn select(frame: *@Frame(select), select_frames: []anyframe) var {
    // either define this as checked UB or check and return here.
    // this would hang indefinitely otherwise since no frame will resume.
    if (select_frames.len == 0) return;

    frame.taken = @as(i32, -1);
    frame.select_frames = select_frames;
    frame.resume_state = DO_SELECT;

    for (select_frames) |*selected, i| {
        if (cas(&selected.awaiter, null, frame)) |actual_awaiter| {
            assert(actual_awaiter == ALREADY_RETURNED);
            assert(xchg(&frame.taken, 0) == -1);
            goto DO_SELECT;
        }
    }
    
    assert(xchg(&frame.taken, 0) == -1);
    
    return_to_resume;

DO_SELECT:
    while (true) {
        if (cas(&frame.taken, 0, 1)) |actual_taken| {
            if (actual_taken == 1) {
                return_to_resume;
            } else {
                // the frame was so fast, it returned before the
                // setup code finished setting up awaiters.
                // Need to wait for the setup code to finish
                // before continuing, but can't suspend.
                // It's on a different thread, we just have
                // to wait it out
                assert(actual_taken == -1);
                // maybe yield to help the scheduler?
            }
        } else break;
    }

    // when we get here, at least one item is in the returned state
    // we need to iterate all frames to reset their awaiters,
    // so scanning to find one that's done is not extra cost.
    var selected_index: ?usize = null;
    for (frame.select_frames) |*select_frame, i| {
        if (cas(&select_frame.awaiter, frame, 0)) |other_awaiter| {
            if (other_awaiter == ALREADY_RETURNED) {
                selected_index = i;
                // don't break, we need to reset all awaiters.
            } else {
                assert(other_awaiter == 0);
                break;
            }
        }
    }
    assert(selected_index != null);
    const selected_frame = select_frames[selected_index.?];
    assert(cas(&selected_frame.awaiter, ALREADY_RETURNED, RESULT_CONSUMED) == null);
    
    return runSelectBlock(selected_frame, selected_index.?);
}

We could also mix the two by using a fake intermediate frame type that is binary-compatible with anyframe to represent each case in the select.

const SelectFrame = struct {
    frame_function: usize = undefined, // points to a stub function to resume from for this select
    index: usize = undefined,          // index of this frame (normally this is the state)
    awaiter: anyframe = undefined,     // points to the frame of the function containing the select
};

fn select(frame: *@Frame(select), select_frames: *[N]anyframe) void {
    if (N == 0) return;

    frame.taken = @as(i32, -1);
    frame.select_frames = select_frames;
    frame.fake_frames: [N]SelectFrame = undefined;
    
    for (frame.fake_frames) |*fake_frame, i| {
        fake_frame.frame_function = resume;
        fake_frame.index = i;
        fake_frame.parent_frame = @frame();
    }
    
    for (select_frames) |*frame, i| {
        if (cas(&frame.awaiter, null, &frame.fake_frames[i])) |actual_awaiter| {
            assert(actual_awaiter == ALREADY_RETURNED);
            frame.remaining_references = i;
            assert(xchg(&frame.taken, 0) == -1);
            resume(&frame.fake_frames[i]);
        }
    }
    
    // need to make sure that no frames are referencing any of
    // our frame stack allocated fake frames before we return.
    frame.remaining_references = N;
    assert(xchg(&frame.taken, 0) == -1);
}
// this function is called just like a frame resume
fn resume(frame: *FakeFrame) void {
    const base_frame = @as(@Frame(select), frame.awaiter);
    while (true) {
        if (cas(&base_frame.taken, 0, 1)) |actual_taken| {
            if (actual_taken == 1) {
                // release one reference to a fake frame
                // this one will no longer be used.
                if (atomic_decrement(&base_frame.remaining_references) == 0) goto AFTER_BLOCK;
                return_to_resume;
            } else {
                assert(actual_taken == -1);
            }
        } else break;
    }
    
    // only one thread will get here
    for (base_frame.select_frames) |*select_frame, i| {
        if (cas(&select_frame.awaiter, &base_frame.fake_frames[i], null) == null) {
            // we beat the frame to its continuation, release a reference for it
            // there is still one reference for the frame we finished, so this must be above zero.
            assert(atomic_decrement(&base_frame.remaining_references) > 0);
        }
    }
    
    const index = frame.index;
    const selected_frame = base_frame.select_frames[index];
    
    base_frame.ret_val = runSelectBlock(selected_frame, index);
    
    // release one reference count.  If the count is not zero at this point, it means that
    // one thread is currently between marking its frame as returned and running the loop at
    // the top of this function.  That thread will run `AFTER_BLOCK` when it decrements and gets zero.
    if (atomic_decrement(&base_frame.remaining_references) != 0) return_to_resume;

AFTER_BLOCK:
    return base_frame.ret_val;
}

@andrewrk andrewrk added the accepted This proposal is planned. label May 8, 2020
@komuw
Copy link

komuw commented Jun 29, 2020

talking of cancel tokens & timeouts; https://vorpus.org/blog/timeouts-and-cancellation-for-humans/

@ghost
Copy link

ghost commented Jul 7, 2020

If I understand this correctly, then on wakeup the selector will have to iterate through every frame and unawait all of them in a single atomic operation -- is that right?

@SpexGuy
Copy link
Contributor

SpexGuy commented Jul 7, 2020

Not quite. It just needs to ensure that only one select branch will execute. So there is one atomic flag for the select. Each awaiter is set up to check and set the flag. If they are the first one to resume, they reset the awaiters on the other frames. If any of those other frames have already returned to their awaiter, they will see that the flag is set and will suspend to the event loop, as if they had no awaiter. This doesn't eliminate the race but it ensures that both paths behave correctly.

@ghost
Copy link

ghost commented Jul 13, 2020

Thought: if we accept []anyframe, should we also accept ?anyframe? What about []?anyframe? ?[]anyframe? [*]anyframe?

@SpexGuy
Copy link
Contributor

SpexGuy commented Jul 14, 2020

At first glance, I don't see a good reason to support any of those. A good motivating example could change my mind though. In practice, I don't expect select to be used even with anyframe, given the constraints on frame lifetime. It should mostly be concrete frame types that are selected upon.

These are my gut reactions:

  • ?anyframe: This could be converted to []anyframe, so maybe? I think a motivating use case is necessary before bending to support this.
  • []?anyframe: This should be allowed iff ?anyframe is allowed. But if you are so far abstracted from your frames that you don't even know whether they exist, how are you possibly going to ensure you await and clean up all of them except the one that gets selected? Again, a good example could change my mind on this, but a quick look says it's unlikely that this is needed.
  • ?[]anyframe: This is trivial to check or convert with .?. There's no need for the select construct to support this, just like for doesn't support this.
  • [*]anyframe: This can't be supported without a length. When the length is supplied with [0..n], it becomes a slice, so the language only needs to support a slice.

@ghost
Copy link

ghost commented Jul 14, 2020

Well, as a motivating use case for []?anyframe, see httpRequestWithMirrors from the original comment. In that implementation, there's nothing to stop a previously checked mirror from being checked again and hanging -- nulling it would fix this. Although, this would encourage an inefficient while-select pattern in more places than would best fit, so we'd need to find a way to discourage that.

@ghost
Copy link

ghost commented Nov 27, 2020

If a frame is cancelable as per #5913, then requiring a result location is an implicit suspend point -- when a frame is suspended on such a point, select would need to know whether the frame would run to completion in one pass, or run into more suspend points. This means extra frame state, if it's known at all (conditional execution and such). Of course that's no big deal in safe modes, but select still needs to operate correctly in unsafe modes.

This requires further thought. I suspect this is one of those cases where the real solution comes from an angle no one has thought of yet. Some ideas, none of them good:

  • Provide separate result locations to each arm, automatically or manually, so they all have a chance to run (inefficient, impossible with slices, defeats the whole point of async RLS)
  • Instead of select, allow await blocks, in which can be invoked a new builtin: @callback(), which each function can use to return to this point in the caller (convoluted, requires rewriting each callee, function colouring)

@SpexGuy
Copy link
Contributor

SpexGuy commented Nov 27, 2020 via email

@Mouvedia
Copy link

Mouvedia commented Nov 27, 2020

I know SELECT from SQL; I found 3 other languages using it:

Does the familiarity criterion matter for the keyword selection?

@ghost ghost mentioned this issue Dec 3, 2020
@jorangreef
Copy link
Contributor

jorangreef commented Jan 11, 2021

Thanks, this is brilliant.

Here's another use case to throw into the mix, if you like:

How would one use select or an await block to select say the first 3 of 5 requests that complete? I am wanting to do this for a replicated state machine that uses tail-tolerance to replicate to 5 nodes, but which can proceed after any 3 requests have journaled and acked.

I suppose this would be a select on those 5 requests, then another select on those same 5 requests, and then another final select on those same 5 requests?

@andrewrk andrewrk modified the milestones: 0.10.0, 0.11.0 Apr 16, 2022
@andrewrk
Copy link
Member Author

@jorangreef that is a great question, thank you for sharing this use case. I think your idea may be the way to go, but I will make sure to keep this use case in mind when focusing on this proposal, and have a solid answer for it.

@jorangreef
Copy link
Contributor

I am wanting to do this for a replicated state machine that uses tail-tolerance to replicate to 5 nodes, but which can proceed after any 3 requests have journaled and acked.

Thanks @andrewrk — looking back at my comment, I can only guess what I was working on at the time! 😉

@andrewrk andrewrk modified the milestones: 0.11.0, 0.12.0 Apr 9, 2023
@andrewrk andrewrk modified the milestones: 0.13.0, 0.12.0 Jun 29, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted This proposal is planned. proposal This issue suggests modifications. If it also has the "accepted" label then it is planned.
Projects
None yet
Development

No branches or pull requests

7 participants