Skip to content

Commit

Permalink
Updated Future FFI
Browse files Browse the repository at this point in the history
The initial motivation for this was cancellation.  PR#1697 made it
so if an async function was cancelled we would eventually release the
resources. However, it would be better if we could immedately release
the resources.  In order to implement that, I realized I needed to
change the future FFI quite a bit.

The new FFI is simpler overall and supports cancel and drop operations.
Cancel ensures that the foreign code will resume and break out of its
async code.  Drop ensures that all resources from the wrapped future are
relased.

The new code does not use ForeignExecutor anymore, so that code is in a
state of limbo.  I hope to repurpose it for foreign dispatch queues
(mozilla#1734).  If that doesn't work out, we can just delete it.

The FFI calls need some care since we pass a type-erased handle to the
foreign code, while RustFuture is generic over an anonymous Future type:
  - The concrete RustFuture type implements the RustFutureFFI.
  - We give the foreign side a raw Box<Arc<dyn RustFutureFFI>>>.  This
    makes it easy to call the FFI from the scaffolding code.  The extra
    Box turns the wide pointer into a normal sized pointer, which
    simplifies the foreign code.
  - The last bit of complexity is completing a Rust future, since the
    future can output any of the FFI types that we return from sync
    functions.  Handled this with a brute-force monomorphization.  The
    scaffolding code defines a completion function for each FFI type and
    the bindings code has to call the correct one for the future.

More changes:

- Updated the futures fixture tests for this to hold on to the mutex
  longer in the initial call.  This makes it so they will fail unless
  the future is dropped while the mutex is still locked.  Before they
  would only succeed as long as the mutex was dropped once the timeout
  expired.
- Updated `RustCallStatus.code` field to be an enum.  Added `Cancelled`
  as one of the variants.  `Cancelled` is only used for async functions.
- Removed the FutureCallback and invoke_future_callback from
  `FfiConverter`.
- New syncronization handling code in RustFuture that's hopefully
  clearer, more correct, and more understandable than the old stuff.
- Updated `UNIFFI_CONTRACT_VERSION` since this is an ABI change
- Removed the `RustCallStatus` param from async scaffolding functions.
  These functions can't fail, so there's no need.
- Added is_async() to the Callable trait.
  • Loading branch information
bendk committed Sep 12, 2023
1 parent e190f3f commit 264729b
Show file tree
Hide file tree
Showing 52 changed files with 1,305 additions and 1,056 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
- [Custom Types](https://mozilla.github.io/uniffi-rs/proc_macro/index.html#the-unifficustomtype-derive) are now supported for proc-macros, including a very
low-friction way of exposing types implementing the new-type idiom.
- Proc-macros: Added support for ByRef arguments
- Swift async functions/methods now always throw, since they can throw CancellationError when the parent
task is cancelled.

### What's Fixed

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 1 addition & 95 deletions docs/manual/src/futures.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ UniFFI supports exposing async Rust functions over the FFI. It can convert a Rus

Check out the [examples](https://github.com/mozilla/uniffi-rs/tree/main/examples/futures) or the more terse and thorough [fixtures](https://github.com/mozilla/uniffi-rs/tree/main/fixtures/futures).

Note that currently async functions are only supported by proc-macros, if you require UDL support please file a bug.
Note that currently async functions are only supported by proc-macros, UDL support is being planned in https://github.com/mozilla/uniffi-rs/issues/1716.

## Example

Expand Down Expand Up @@ -41,97 +41,3 @@ In Rust `Future` terminology this means the foreign bindings supply the "executo
There are [some great API docs](https://docs.rs/uniffi_core/latest/uniffi_core/ffi/rustfuture/index.html) on the implementation that are well worth a read.

See the [foreign-executor fixture](https://github.com/mozilla/uniffi-rs/tree/main/fixtures/foreign-executor) for more implementation details.

## How it works

As [described in the documentation](https://docs.rs/uniffi_core/latest/uniffi_core/ffi/rustfuture/index.html),
UniFFI generates code which uses callbacks from Rust futures back into that foreign "executor" to drive them to completion.
Fortunately, each of the bindings and Rust have similar models, so the discussion below is Python, but it's almost exactly the same in Kotlin and Swift.

In the above example, the generated `say_after` function looks something like:

```python

# A helper to work with asyncio.
def _rust_say_after_executor(eventloop_handle, rust_task_handle):
event_loop = UniFFIMagic_GetExecutor(eventloop_handle)

def callback(task_handle):
# The event-loop has called us - call back into Rust.
_uniffi_say_after_executor_callback(task_handle)

# Now have the asyncio eventloop - ask it to schedule a call to help drive the Rust future.
eventloop.call_soon_threadsafe(callback, rust_task_handle)

# A helper for say_after which creates a future and passes it Rust
def _rust_call_say_after(callback_fn):
# Handle to our executor.
eventloop = asyncio.get_running_loop()
eventloop_handle = UniFFIMagic_SetExecutor(eventloop)

# Use asyncio to create a new Python future.
future = eventloop.create_future()
future_handle = UniFFIMagic_SetFuture(future)

# This is a "normal" UniFFI call across the FFI to Rust scaffoloding, but
# because it is an async function it has a special signature which
# requires the handles and the callback.
_uniffi_call_say_after(executor_handle, callback_fun, future_handle)

# and return the future to the caller.
return future

def say_after_callback(future_handle, result)
future = UniFFIMagic_GetFuture(future_handle)
if future.cancelled():
return
future.set_result(result))

def say_after(...):
return await _rust_call_say_after(say_after_callback)

```

And the code generated for Rust is something like:

```rust
struct SayAfterHelper {
rust_future: Future<>,
uniffi_executor_handle: ::uniffi::ForeignExecutorHandle,
uniffi_callback: ::uniffi::FfiConverter::FutureCallback,
uniffi_future_handle: ...,
}

impl SayAfterHelper {
fn wake(&self) {
match self.rust_future.poll() {
Some(Poll::Pending) => {
// ... snip executor stuff
self.rust_future.wake()
},
Some(Poll::Ready(v)) => {
// ready - tell the foreign executor
UniFFI_Magic_Invoke_Foreign_Callback(self.uniffi_callback, self.uniffi_future_handle)
},
None => todo!("error handling"),
}
}
}

pub extern "C" fn _uniffi_call_say_after(
uniffi_executor_handle: ::uniffi::ForeignExecutorHandle,
uniffi_callback: ::uniffi::FfiConverter::FutureCallback,
uniffi_future_handle: ...,
) {
// Call the async function to get the Rust future.
let rust_future = say_after(...)
let helper = SayAfterHelper {
rust_future,
uniffi_executor_handle,
uniffi_callback,
uniffi_future_handle,
);
helper.wake();
Ok(())
}
```
5 changes: 1 addition & 4 deletions fixtures/futures/tests/bindings/test_futures.kts
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ runBlocking {
runBlocking {
val time = measureTimeMillis {
val job = launch {
useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U))
useSharedResource(SharedResourceOptions(releaseAfterMs=5000U, timeoutMs=100U))
}

// Wait some time to ensure the task has locked the shared resource
Expand All @@ -233,6 +233,3 @@ runBlocking {
}
println("useSharedResource (not canceled): ${time}ms")
}

// Test that we properly cleaned up future callback references
assert(uniffiActiveFutureCallbacks.size == 0)
23 changes: 8 additions & 15 deletions fixtures/futures/tests/bindings/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,14 @@ async def test():

# Test a future that uses a lock and that is cancelled.
def test_shared_resource_cancellation(self):
# Note: Python uses the event loop to schedule calls via the `call_soon_threadsafe()`
# method. This means that creating a task and cancelling it won't trigger the issue, we
# need to create an EventLoop and close it instead.
loop = asyncio.new_event_loop()
loop.create_task(use_shared_resource(
SharedResourceOptions(release_after_ms=100, timeout_ms=1000)))
# Wait some time to ensure the task has locked the shared resource
loop.call_later(0.05, loop.stop)
loop.run_forever()
# Close the EventLoop before the shared resource has been released.
loop.close()

# Try accessing the shared resource again using the main event loop. The initial task
# should release the shared resource before the timeout expires.
asyncio.run(use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=1000)))
async def test():
task = asyncio.create_task(use_shared_resource(
SharedResourceOptions(release_after_ms=5000, timeout_ms=100)))
# Wait some time to ensure the task has locked the shared resource
await asyncio.sleep(0.05)
task.cancel()
await use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=1000))
asyncio.run(test())

def test_shared_resource_no_cancellation(self):
async def test():
Expand Down
23 changes: 12 additions & 11 deletions fixtures/futures/tests/bindings/test_futures.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ counter.enter()

Task {
let t0 = Date()
let result = await alwaysReady()
let result = try! await alwaysReady()
let t1 = Date()

let tDelta = DateInterval(start: t0, end: t1)
Expand All @@ -22,7 +22,7 @@ Task {
counter.enter()

Task {
let result = await newMyRecord(a: "foo", b: 42)
let result = try! await newMyRecord(a: "foo", b: 42)

assert(result.a == "foo")
assert(result.b == 42)
Expand All @@ -35,7 +35,7 @@ counter.enter()

Task {
let t0 = Date()
await void()
try! await void()
let t1 = Date()

let tDelta = DateInterval(start: t0, end: t1)
Expand All @@ -49,7 +49,7 @@ counter.enter()

Task {
let t0 = Date()
let result = await sleep(ms: 2000)
let result = try! await sleep(ms: 2000)
let t1 = Date()

let tDelta = DateInterval(start: t0, end: t1)
Expand All @@ -64,8 +64,8 @@ counter.enter()

Task {
let t0 = Date()
let result_alice = await sayAfter(ms: 1000, who: "Alice")
let result_bob = await sayAfter(ms: 2000, who: "Bob")
let result_alice = try! await sayAfter(ms: 1000, who: "Alice")
let result_bob = try! await sayAfter(ms: 2000, who: "Bob")
let t1 = Date()

let tDelta = DateInterval(start: t0, end: t1)
Expand All @@ -84,7 +84,7 @@ Task {
async let bob = sayAfter(ms: 2000, who: "Bob")

let t0 = Date()
let (result_alice, result_bob) = await (alice, bob)
let (result_alice, result_bob) = try! await (alice, bob)
let t1 = Date()

let tDelta = DateInterval(start: t0, end: t1)
Expand All @@ -102,7 +102,7 @@ Task {
let megaphone = newMegaphone()

let t0 = Date()
let result_alice = await megaphone.sayAfter(ms: 2000, who: "Alice")
let result_alice = try! await megaphone.sayAfter(ms: 2000, who: "Alice")
let t1 = Date()

let tDelta = DateInterval(start: t0, end: t1)
Expand All @@ -116,7 +116,7 @@ Task {
counter.enter()

Task {
let megaphone = await asyncNewMegaphone()
let megaphone = try! await asyncNewMegaphone()

let result = try await megaphone.fallibleMe(doFail: false)
assert(result == 42)
Expand All @@ -129,7 +129,7 @@ counter.enter()

Task {
let t0 = Date()
let result_alice = await sayAfterWithTokio(ms: 2000, who: "Alice")
let result_alice = try! await sayAfterWithTokio(ms: 2000, who: "Alice")
let t1 = Date()

let tDelta = DateInterval(start: t0, end: t1)
Expand Down Expand Up @@ -236,7 +236,8 @@ Task {
counter.enter()
Task {
let task = Task {
try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 100, timeoutMs: 1000))
// Use try? to ignore cancellation errors
try? await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 5000, timeoutMs: 100))
}

// Wait some time to ensure the task has locked the shared resource
Expand Down
14 changes: 7 additions & 7 deletions fixtures/reexport-scaffolding-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod tests {
use std::ffi::CString;
use std::os::raw::c_void;
use std::process::{Command, Stdio};
use uniffi::{FfiConverter, ForeignCallback, RustBuffer, RustCallStatus};
use uniffi::{FfiConverter, ForeignCallback, RustBuffer, RustCallStatus, RustCallStatusCode};
use uniffi_bindgen::ComponentInterface;

struct UniFfiTag;
Expand Down Expand Up @@ -159,7 +159,7 @@ mod tests {
get_symbol(&library, object_def.ffi_object_free().name());

let num_alive = unsafe { get_num_alive(&mut call_status) };
assert_eq!(call_status.code, 0);
assert_eq!(call_status.code, RustCallStatusCode::Success);
assert_eq!(num_alive, 0);

let obj_id = unsafe {
Expand All @@ -168,24 +168,24 @@ mod tests {
&mut call_status,
)
};
assert_eq!(call_status.code, 0);
assert_eq!(call_status.code, RustCallStatusCode::Success);

let name_buf = unsafe { coveralls_get_name(obj_id, &mut call_status) };
assert_eq!(call_status.code, 0);
assert_eq!(call_status.code, RustCallStatusCode::Success);
assert_eq!(
<String as FfiConverter<UniFfiTag>>::try_lift(name_buf).unwrap(),
"TestName"
);

let num_alive = unsafe { get_num_alive(&mut call_status) };
assert_eq!(call_status.code, 0);
assert_eq!(call_status.code, RustCallStatusCode::Success);
assert_eq!(num_alive, 1);

unsafe { coveralls_free(obj_id, &mut call_status) };
assert_eq!(call_status.code, 0);
assert_eq!(call_status.code, RustCallStatusCode::Success);

let num_alive = unsafe { get_num_alive(&mut call_status) };
assert_eq!(call_status.code, 0);
assert_eq!(call_status.code, RustCallStatusCode::Success);
assert_eq!(num_alive, 0);
}
}
7 changes: 3 additions & 4 deletions uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,9 @@ impl KotlinCodeOracle {
FfiType::ForeignCallback => "ForeignCallback".to_string(),
FfiType::ForeignExecutorHandle => "USize".to_string(),
FfiType::ForeignExecutorCallback => "UniFfiForeignExecutorCallback".to_string(),
FfiType::FutureCallback { return_type } => {
format!("UniFfiFutureCallback{}", Self::ffi_type_label(return_type))
}
FfiType::FutureCallbackData => "USize".to_string(),
FfiType::RustFutureHandle => "Pointer".to_string(),
FfiType::RustFutureContinuation => "UniFffiRustFutureContinutationType".to_string(),
FfiType::RustFutureContinuationData => "USize".to_string(),
}
}
}
Expand Down
38 changes: 38 additions & 0 deletions uniffi_bindgen/src/bindings/kotlin/templates/Async.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Async return type handlers

internal const val UNIFFI_RUST_FUTURE_POLL_READY = 0.toShort()
internal const val UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1.toShort()

internal val uniffiContinuationHandleMap = UniFfiHandleMap<CancellableContinuation<Short>>()

// FFI type for Rust future continuations
internal object uniffiRustFutureContinuation: UniFffiRustFutureContinutationType {
override fun callback(continuationHandle: USize, pollResult: Short) {
uniffiContinuationHandleMap.remove(continuationHandle)?.resume(pollResult)
}
}

internal suspend fun<T, F, E: Exception> uniffiRustCallAsync(
rustFuture: Pointer,
completeFunc: (Pointer, RustCallStatus) -> F,
liftFunc: (F) -> T,
errorHandler: CallStatusErrorHandler<E>
): T {
try {
do {
val pollResult = suspendCancellableCoroutine<Short> { continuation ->
_UniFFILib.INSTANCE.{{ ci.ffi_rust_future_poll().name() }}(
rustFuture,
uniffiRustFutureContinuation,
uniffiContinuationHandleMap.insert(continuation)
)
}
} while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY);

return liftFunc(
rustCallWithError(errorHandler, { status -> completeFunc(rustFuture, status) })
)
} finally {
_UniFFILib.INSTANCE.{{ ci.ffi_rust_future_free().name() }}(rustFuture)
}
}
Loading

0 comments on commit 264729b

Please sign in to comment.