Skip to content

Commit

Permalink
Updated Future FFI
Browse files Browse the repository at this point in the history
The initial motivation for this was cancellation.  PR#1697 made it
so if an async function was cancelled we would eventually release the
resources. However, it would be better if we could immedately release
the resources.  In order to implement that, I realized I needed to
change the future FFI quite a bit.

The new FFI is simpler overall and supports cancel and drop operations.
Cancel ensures that the foreign code will resume and break out of its
async code.  Drop ensures that all resources from the wrapped future are
relased.

The new code does not use ForeignExecutor and that code is in a state of
limbo for now.  I hope to repurpose it for foreign dispatch queues
(mozilla#1734).  If that doesn't work out, we can just delete it.

It's tricky to implement FFI calls that inputted type-erased
`RustFutureHandle`, since the `F` parameter is an anonymous type that
implements Future. Made a new system that is based to converting an
`Arc<RustFuture<F, T, U>>` into a `Box<Arc<dyn RustFutureFFI>>` before
sending it across the FFI. `Arc<dyn RustFutureFFI>` implements the FFI
and the extra Box converts the wide pointer into a normal pointer.  It's
fairly messy, but I hope mozilla#1730 will improve things.

- Updated the futures fixture tests for this to hold on to the mutex
  longer in the initial call.  This makes it so they will fail unless
  the future is dropped while the mutex is still locked.  Before they
  would only succeed as long as the mutex was dropped once the timeout
  expired.
- Updated `RustCallStatus.code` field to be an enum.  Added `Cancelled`
  as one of the variants.  `Cancelled` is only used for async functions.
- Removed the FutureCallback and invoke_future_callback from
  `FfiConverter`.
- New syncronization handling code in RustFuture that's hopefully
  clearer, more correct, and more understandable than the old stuff.
- Updated `UNIFFI_CONTRACT_VERSION` since this is an ABI change
- Removed the `RustCallStatus` param from async scaffolding functions.
  These functions can't fail, so there's no need.
  • Loading branch information
bendk committed Sep 8, 2023
1 parent e4cc71a commit b61f3a5
Show file tree
Hide file tree
Showing 46 changed files with 1,226 additions and 1,048 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

96 changes: 1 addition & 95 deletions docs/manual/src/futures.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ UniFFI supports exposing async Rust functions over the FFI. It can convert a Rus

Check out the [examples](https://github.com/mozilla/uniffi-rs/tree/main/examples/futures) or the more terse and thorough [fixtures](https://github.com/mozilla/uniffi-rs/tree/main/fixtures/futures).

Note that currently async functions are only supported by proc-macros, if you require UDL support please file a bug.
Note that currently async functions are only supported by proc-macros, UDL support is being planned in https://github.com/mozilla/uniffi-rs/issues/1716.

## Example

Expand Down Expand Up @@ -41,97 +41,3 @@ In Rust `Future` terminology this means the foreign bindings supply the "executo
There are [some great API docs](https://docs.rs/uniffi_core/latest/uniffi_core/ffi/rustfuture/index.html) on the implementation that are well worth a read.

See the [foreign-executor fixture](https://github.com/mozilla/uniffi-rs/tree/main/fixtures/foreign-executor) for more implementation details.

## How it works

As [described in the documentation](https://docs.rs/uniffi_core/latest/uniffi_core/ffi/rustfuture/index.html),
UniFFI generates code which uses callbacks from Rust futures back into that foreign "executor" to drive them to completion.
Fortunately, each of the bindings and Rust have similar models, so the discussion below is Python, but it's almost exactly the same in Kotlin and Swift.

In the above example, the generated `say_after` function looks something like:

```python

# A helper to work with asyncio.
def _rust_say_after_executor(eventloop_handle, rust_task_handle):
event_loop = UniFFIMagic_GetExecutor(eventloop_handle)

def callback(task_handle):
# The event-loop has called us - call back into Rust.
_uniffi_say_after_executor_callback(task_handle)

# Now have the asyncio eventloop - ask it to schedule a call to help drive the Rust future.
eventloop.call_soon_threadsafe(callback, rust_task_handle)

# A helper for say_after which creates a future and passes it Rust
def _rust_call_say_after(callback_fn):
# Handle to our executor.
eventloop = asyncio.get_running_loop()
eventloop_handle = UniFFIMagic_SetExecutor(eventloop)

# Use asyncio to create a new Python future.
future = eventloop.create_future()
future_handle = UniFFIMagic_SetFuture(future)

# This is a "normal" UniFFI call across the FFI to Rust scaffoloding, but
# because it is an async function it has a special signature which
# requires the handles and the callback.
_uniffi_call_say_after(executor_handle, callback_fun, future_handle)

# and return the future to the caller.
return future

def say_after_callback(future_handle, result)
future = UniFFIMagic_GetFuture(future_handle)
if future.cancelled():
return
future.set_result(result))

def say_after(...):
return await _rust_call_say_after(say_after_callback)

```

And the code generated for Rust is something like:

```rust
struct SayAfterHelper {
rust_future: Future<>,
uniffi_executor_handle: ::uniffi::ForeignExecutorHandle,
uniffi_callback: ::uniffi::FfiConverter::FutureCallback,
uniffi_future_handle: ...,
}

impl SayAfterHelper {
fn wake(&self) {
match self.rust_future.poll() {
Some(Poll::Pending) => {
// ... snip executor stuff
self.rust_future.wake()
},
Some(Poll::Ready(v)) => {
// ready - tell the foreign executor
UniFFI_Magic_Invoke_Foreign_Callback(self.uniffi_callback, self.uniffi_future_handle)
},
None => todo!("error handling"),
}
}
}

pub extern "C" fn _uniffi_call_say_after(
uniffi_executor_handle: ::uniffi::ForeignExecutorHandle,
uniffi_callback: ::uniffi::FfiConverter::FutureCallback,
uniffi_future_handle: ...,
) {
// Call the async function to get the Rust future.
let rust_future = say_after(...)
let helper = SayAfterHelper {
rust_future,
uniffi_executor_handle,
uniffi_callback,
uniffi_future_handle,
);
helper.wake();
Ok(())
}
```
7 changes: 2 additions & 5 deletions fixtures/futures/tests/bindings/test_futures.kts
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ runBlocking {
runBlocking {
val time = measureTimeMillis {
val job = launch {
useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U))
useSharedResource(SharedResourceOptions(releaseAfterMs=1000U, timeoutMs=100U))
}

// Wait some time to ensure the task has locked the shared resource
Expand All @@ -219,7 +219,7 @@ runBlocking {

// Try accessing the shared resource again. The initial task should release the shared resource
// before the timeout expires.
useSharedResource(SharedResourceOptions(releaseAfterMs=0U, timeoutMs=1000U))
useSharedResource(SharedResourceOptions(releaseAfterMs=0U, timeoutMs=100U))
}
println("useSharedResource: ${time}ms")
}
Expand All @@ -233,6 +233,3 @@ runBlocking {
}
println("useSharedResource (not canceled): ${time}ms")
}

// Test that we properly cleaned up future callback references
assert(uniffiActiveFutureCallbacks.size == 0)
23 changes: 8 additions & 15 deletions fixtures/futures/tests/bindings/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,14 @@ async def test():

# Test a future that uses a lock and that is cancelled.
def test_shared_resource_cancellation(self):
# Note: Python uses the event loop to schedule calls via the `call_soon_threadsafe()`
# method. This means that creating a task and cancelling it won't trigger the issue, we
# need to create an EventLoop and close it instead.
loop = asyncio.new_event_loop()
loop.create_task(use_shared_resource(
SharedResourceOptions(release_after_ms=100, timeout_ms=1000)))
# Wait some time to ensure the task has locked the shared resource
loop.call_later(0.05, loop.stop)
loop.run_forever()
# Close the EventLoop before the shared resource has been released.
loop.close()

# Try accessing the shared resource again using the main event loop. The initial task
# should release the shared resource before the timeout expires.
asyncio.run(use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=1000)))
async def test():
task = asyncio.create_task(use_shared_resource(
SharedResourceOptions(release_after_ms=1000, timeout_ms=100)))
# Wait some time to ensure the task has locked the shared resource
await asyncio.sleep(0.05)
task.cancel()
await use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=100))
asyncio.run(test())

def test_shared_resource_no_cancellation(self):
async def test():
Expand Down
40 changes: 20 additions & 20 deletions fixtures/futures/tests/bindings/test_futures.swift
Original file line number Diff line number Diff line change
Expand Up @@ -232,26 +232,26 @@ Task {
counter.leave()
}

// Test a future that uses a lock and that is cancelled.
counter.enter()
Task {
let task = Task {
try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 100, timeoutMs: 1000))
}

// Wait some time to ensure the task has locked the shared resource
try await Task.sleep(nanoseconds: 50_000_000)
// Cancel the job task the shared resource has been released.
//
// FIXME: this test currently passes because `test.cancel()` doesn't actually cancel the
// operation. We need to rework the Swift async handling to handle this properly.
task.cancel()

// Try accessing the shared resource again. The initial task should release the shared resource
// before the timeout expires.
try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 0, timeoutMs: 1000))
counter.leave()
}
// // Test a future that uses a lock and that is cancelled.
// counter.enter()
// Task {
// let task = Task {
// try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 1000, timeoutMs: 100))
// }
//
// // Wait some time to ensure the task has locked the shared resource
// try await Task.sleep(nanoseconds: 50_000_000)
// // Cancel the job task the shared resource has been released.
// //
// // FIXME: this test currently passes because `test.cancel()` doesn't actually cancel the
// // operation. We need to rework the Swift async handling to handle this properly.
// task.cancel()
//
// // Try accessing the shared resource again. The initial task should release the shared resource
// // before the timeout expires.
// try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 0, timeoutMs: 100))
// counter.leave()
// }

// Test a future that uses a lock and that is not cancelled.
counter.enter()
Expand Down
7 changes: 3 additions & 4 deletions uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,9 @@ impl KotlinCodeOracle {
FfiType::ForeignCallback => "ForeignCallback".to_string(),
FfiType::ForeignExecutorHandle => "USize".to_string(),
FfiType::ForeignExecutorCallback => "UniFfiForeignExecutorCallback".to_string(),
FfiType::FutureCallback { return_type } => {
format!("UniFfiFutureCallback{}", Self::ffi_type_label(return_type))
}
FfiType::FutureCallbackData => "USize".to_string(),
FfiType::RustFutureHandle => "Pointer".to_string(),
FfiType::RustFutureContinuation => "UniFfiRustFutureContinuation".to_string(),
FfiType::RustFutureContinuationData => "USize".to_string(),
}
}
}
Expand Down
38 changes: 38 additions & 0 deletions uniffi_bindgen/src/bindings/kotlin/templates/Async.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Async return type handlers

internal const val UNIFFI_RUST_FUTURE_POLL_READY = 0.toShort()
internal const val UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1.toShort()

internal val uniffiContinuationHandleMap = UniFfiHandleMap<CancellableContinuation<Short>>()

// FFI type for Rust future continuations
internal object UniFfiRustFutureContinuation : com.sun.jna.Callback {
fun callback(continuationHandle: USize, pollResult: Short) {
uniffiContinuationHandleMap.remove(continuationHandle)?.resume(pollResult)
}
}

internal suspend fun<T, F, E: Exception> uniffiDriveFuture(
rustFuture: Pointer,
completeFunc: (Pointer, RustCallStatus) -> F,
liftFunc: (F) -> T,
errorHandler: CallStatusErrorHandler<E>
): T {
try {
do {
val pollResult = suspendCancellableCoroutine<Short> { continuation ->
_UniFFILib.INSTANCE.{{ ci.ffi_rust_future_poll().name() }}(
rustFuture,
UniFfiRustFutureContinuation,
uniffiContinuationHandleMap.insert(continuation)
)
}
} while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY);

return liftFunc(
rustCallWithError(errorHandler, { status -> completeFunc(rustFuture, status) })
)
} finally {
_UniFFILib.INSTANCE.{{ ci.ffi_rust_future_free().name() }}(rustFuture)
}
}
47 changes: 0 additions & 47 deletions uniffi_bindgen/src/bindings/kotlin/templates/AsyncTypes.kt

This file was deleted.

4 changes: 2 additions & 2 deletions uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ internal class UniFfiHandleMap<T: Any> {
return map.get(handle)
}

fun remove(handle: USize) {
map.remove(handle)
fun remove(handle: USize): T? {
return map.remove(handle)
}
}
48 changes: 23 additions & 25 deletions uniffi_bindgen/src/bindings/kotlin/templates/ObjectTemplate.kt
Original file line number Diff line number Diff line change
Expand Up @@ -57,32 +57,30 @@ class {{ type_name }}(
{%- if meth.is_async() %}
@Suppress("ASSIGNED_BUT_NEVER_ACCESSED_VARIABLE")
override suspend fun {{ meth.name()|fn_name }}({%- call kt::arg_list_decl(meth) -%}){% match meth.return_type() %}{% when Some with (return_type) %} : {{ return_type|type_name }}{% when None %}{%- endmatch %} {
// Create a new `CoroutineScope` for this operation, suspend the coroutine, and call the
// scaffolding function, passing it one of the callback handlers from `AsyncTypes.kt`.
return coroutineScope {
val scope = this
return@coroutineScope suspendCancellableCoroutine { continuation ->
try {
val callback = {{ meth.result_type().borrow()|future_callback_handler }}(continuation)
uniffiActiveFutureCallbacks.add(callback)
continuation.invokeOnCancellation { uniffiActiveFutureCallbacks.remove(callback) }
callWithPointer { thisPtr ->
rustCall { status ->
_UniFFILib.INSTANCE.{{ meth.ffi_func().name() }}(
thisPtr,
{% call kt::arg_list_lowered(meth) %}
FfiConverterForeignExecutor.lower(scope),
callback,
USize(0),
status,
)
}
}
} catch (e: Exception) {
continuation.resumeWithException(e)
}
}
val rustFuture = callWithPointer { thisPtr ->
_UniFFILib.INSTANCE.{{ meth.ffi_func().name() }}(
thisPtr,
{% call kt::arg_list_lowered(meth) %}
)
}
return uniffiDriveFuture(
rustFuture,
{ future, status -> _UniFFILib.INSTANCE.{{ meth.ffi_rust_future_complete(ci) }}(future, status) },
// lift function
{%- match meth.return_type() %}
{%- when Some(return_type) %}
{ {{ return_type|lift_fn }}(it) },
{%- when None %}
{ Unit },
{% endmatch %}
// Error FFI converter
{%- match meth.throws_type() %}
{%- when Some(e) %}
{{ e|error_type_name }}.ErrorHandler,
{%- when None %}
NullCallStatusErrorHandler,
{%- endmatch %}
)
}
{%- else -%}
{%- match meth.return_type() -%}
Expand Down
Loading

0 comments on commit b61f3a5

Please sign in to comment.