diff --git a/Cargo.lock b/Cargo.lock index dad98d11c8..618ac0e904 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1963,6 +1963,7 @@ dependencies = [ "bytes", "camino", "log", + "once_cell", "oneshot", "paste", "static_assertions", diff --git a/docs/manual/src/futures.md b/docs/manual/src/futures.md index c127ea205e..d6279236ab 100644 --- a/docs/manual/src/futures.md +++ b/docs/manual/src/futures.md @@ -4,7 +4,7 @@ UniFFI supports exposing async Rust functions over the FFI. It can convert a Rus Check out the [examples](https://github.com/mozilla/uniffi-rs/tree/main/examples/futures) or the more terse and thorough [fixtures](https://github.com/mozilla/uniffi-rs/tree/main/fixtures/futures). -Note that currently async functions are only supported by proc-macros, if you require UDL support please file a bug. +Note that currently async functions are only supported by proc-macros, UDL support is being planned in https://github.com/mozilla/uniffi-rs/issues/1716. ## Example @@ -41,97 +41,3 @@ In Rust `Future` terminology this means the foreign bindings supply the "executo There are [some great API docs](https://docs.rs/uniffi_core/latest/uniffi_core/ffi/rustfuture/index.html) on the implementation that are well worth a read. See the [foreign-executor fixture](https://github.com/mozilla/uniffi-rs/tree/main/fixtures/foreign-executor) for more implementation details. - -## How it works - -As [described in the documentation](https://docs.rs/uniffi_core/latest/uniffi_core/ffi/rustfuture/index.html), -UniFFI generates code which uses callbacks from Rust futures back into that foreign "executor" to drive them to completion. -Fortunately, each of the bindings and Rust have similar models, so the discussion below is Python, but it's almost exactly the same in Kotlin and Swift. - -In the above example, the generated `say_after` function looks something like: - -```python - -# A helper to work with asyncio. -def _rust_say_after_executor(eventloop_handle, rust_task_handle): - event_loop = UniFFIMagic_GetExecutor(eventloop_handle) - - def callback(task_handle): - # The event-loop has called us - call back into Rust. - _uniffi_say_after_executor_callback(task_handle) - - # Now have the asyncio eventloop - ask it to schedule a call to help drive the Rust future. - eventloop.call_soon_threadsafe(callback, rust_task_handle) - -# A helper for say_after which creates a future and passes it Rust -def _rust_call_say_after(callback_fn): - # Handle to our executor. - eventloop = asyncio.get_running_loop() - eventloop_handle = UniFFIMagic_SetExecutor(eventloop) - - # Use asyncio to create a new Python future. - future = eventloop.create_future() - future_handle = UniFFIMagic_SetFuture(future) - - # This is a "normal" UniFFI call across the FFI to Rust scaffoloding, but - # because it is an async function it has a special signature which - # requires the handles and the callback. - _uniffi_call_say_after(executor_handle, callback_fun, future_handle) - - # and return the future to the caller. - return future - -def say_after_callback(future_handle, result) - future = UniFFIMagic_GetFuture(future_handle) - if future.cancelled(): - return - future.set_result(result)) - -def say_after(...): - return await _rust_call_say_after(say_after_callback) - -``` - -And the code generated for Rust is something like: - -```rust -struct SayAfterHelper { - rust_future: Future<>, - uniffi_executor_handle: ::uniffi::ForeignExecutorHandle, - uniffi_callback: ::uniffi::FfiConverter::FutureCallback, - uniffi_future_handle: ..., -} - -impl SayAfterHelper { - fn wake(&self) { - match self.rust_future.poll() { - Some(Poll::Pending) => { - // ... snip executor stuff - self.rust_future.wake() - }, - Some(Poll::Ready(v)) => { - // ready - tell the foreign executor - UniFFI_Magic_Invoke_Foreign_Callback(self.uniffi_callback, self.uniffi_future_handle) - }, - None => todo!("error handling"), - } - } -} - -pub extern "C" fn _uniffi_call_say_after( - uniffi_executor_handle: ::uniffi::ForeignExecutorHandle, - uniffi_callback: ::uniffi::FfiConverter::FutureCallback, - uniffi_future_handle: ..., -) { - // Call the async function to get the Rust future. - let rust_future = say_after(...) - let helper = SayAfterHelper { - rust_future, - uniffi_executor_handle, - uniffi_callback, - uniffi_future_handle, - ); - helper.wake(); - Ok(()) -} -``` \ No newline at end of file diff --git a/fixtures/futures/tests/bindings/test_futures.kts b/fixtures/futures/tests/bindings/test_futures.kts index 0ac594bc0f..fc4135f4d4 100644 --- a/fixtures/futures/tests/bindings/test_futures.kts +++ b/fixtures/futures/tests/bindings/test_futures.kts @@ -209,7 +209,7 @@ runBlocking { runBlocking { val time = measureTimeMillis { val job = launch { - useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U)) + useSharedResource(SharedResourceOptions(releaseAfterMs=1000U, timeoutMs=100U)) } // Wait some time to ensure the task has locked the shared resource @@ -219,7 +219,7 @@ runBlocking { // Try accessing the shared resource again. The initial task should release the shared resource // before the timeout expires. - useSharedResource(SharedResourceOptions(releaseAfterMs=0U, timeoutMs=1000U)) + useSharedResource(SharedResourceOptions(releaseAfterMs=0U, timeoutMs=100U)) } println("useSharedResource: ${time}ms") } @@ -233,6 +233,3 @@ runBlocking { } println("useSharedResource (not canceled): ${time}ms") } - -// Test that we properly cleaned up future callback references -assert(uniffiActiveFutureCallbacks.size == 0) diff --git a/fixtures/futures/tests/bindings/test_futures.py b/fixtures/futures/tests/bindings/test_futures.py index eab2fc0ac9..522c3ea273 100644 --- a/fixtures/futures/tests/bindings/test_futures.py +++ b/fixtures/futures/tests/bindings/test_futures.py @@ -150,21 +150,14 @@ async def test(): # Test a future that uses a lock and that is cancelled. def test_shared_resource_cancellation(self): - # Note: Python uses the event loop to schedule calls via the `call_soon_threadsafe()` - # method. This means that creating a task and cancelling it won't trigger the issue, we - # need to create an EventLoop and close it instead. - loop = asyncio.new_event_loop() - loop.create_task(use_shared_resource( - SharedResourceOptions(release_after_ms=100, timeout_ms=1000))) - # Wait some time to ensure the task has locked the shared resource - loop.call_later(0.05, loop.stop) - loop.run_forever() - # Close the EventLoop before the shared resource has been released. - loop.close() - - # Try accessing the shared resource again using the main event loop. The initial task - # should release the shared resource before the timeout expires. - asyncio.run(use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=1000))) + async def test(): + task = asyncio.create_task(use_shared_resource( + SharedResourceOptions(release_after_ms=1000, timeout_ms=100))) + # Wait some time to ensure the task has locked the shared resource + await asyncio.sleep(0.05) + task.cancel() + await use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=100)) + asyncio.run(test()) def test_shared_resource_no_cancellation(self): async def test(): diff --git a/fixtures/futures/tests/bindings/test_futures.swift b/fixtures/futures/tests/bindings/test_futures.swift index 20e24c40ff..974b3836a5 100644 --- a/fixtures/futures/tests/bindings/test_futures.swift +++ b/fixtures/futures/tests/bindings/test_futures.swift @@ -232,26 +232,26 @@ Task { counter.leave() } -// Test a future that uses a lock and that is cancelled. -counter.enter() -Task { - let task = Task { - try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 100, timeoutMs: 1000)) - } - - // Wait some time to ensure the task has locked the shared resource - try await Task.sleep(nanoseconds: 50_000_000) - // Cancel the job task the shared resource has been released. - // - // FIXME: this test currently passes because `test.cancel()` doesn't actually cancel the - // operation. We need to rework the Swift async handling to handle this properly. - task.cancel() - - // Try accessing the shared resource again. The initial task should release the shared resource - // before the timeout expires. - try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 0, timeoutMs: 1000)) - counter.leave() -} +// // Test a future that uses a lock and that is cancelled. +// counter.enter() +// Task { +// let task = Task { +// try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 1000, timeoutMs: 100)) +// } +// +// // Wait some time to ensure the task has locked the shared resource +// try await Task.sleep(nanoseconds: 50_000_000) +// // Cancel the job task the shared resource has been released. +// // +// // FIXME: this test currently passes because `test.cancel()` doesn't actually cancel the +// // operation. We need to rework the Swift async handling to handle this properly. +// task.cancel() +// +// // Try accessing the shared resource again. The initial task should release the shared resource +// // before the timeout expires. +// try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 0, timeoutMs: 100)) +// counter.leave() +// } // Test a future that uses a lock and that is not cancelled. counter.enter() diff --git a/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs b/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs index 2c1a3e468d..611285f3a2 100644 --- a/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs +++ b/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs @@ -300,10 +300,9 @@ impl KotlinCodeOracle { FfiType::ForeignCallback => "ForeignCallback".to_string(), FfiType::ForeignExecutorHandle => "USize".to_string(), FfiType::ForeignExecutorCallback => "UniFfiForeignExecutorCallback".to_string(), - FfiType::FutureCallback { return_type } => { - format!("UniFfiFutureCallback{}", Self::ffi_type_label(return_type)) - } - FfiType::FutureCallbackData => "USize".to_string(), + FfiType::RustFutureHandle => "Pointer".to_string(), + FfiType::RustFutureContinuation => "UniFfiRustFutureContinuation".to_string(), + FfiType::RustFutureContinuationData => "USize".to_string(), } } } diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/Async.kt b/uniffi_bindgen/src/bindings/kotlin/templates/Async.kt new file mode 100644 index 0000000000..a8ff078be5 --- /dev/null +++ b/uniffi_bindgen/src/bindings/kotlin/templates/Async.kt @@ -0,0 +1,38 @@ +// Async return type handlers + +internal const val UNIFFI_RUST_FUTURE_POLL_READY = 0.toShort() +internal const val UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1.toShort() + +internal val uniffiContinuationHandleMap = UniFfiHandleMap>() + +// FFI type for Rust future continuations +internal object UniFfiRustFutureContinuation : com.sun.jna.Callback { + fun callback(continuationHandle: USize, pollResult: Short) { + uniffiContinuationHandleMap.remove(continuationHandle)?.resume(pollResult) + } +} + +internal suspend fun uniffiDriveFuture( + rustFuture: Pointer, + completeFunc: (Pointer, RustCallStatus) -> F, + liftFunc: (F) -> T, + errorHandler: CallStatusErrorHandler +): T { + try { + do { + val pollResult = suspendCancellableCoroutine { continuation -> + _UniFFILib.INSTANCE.{{ ci.ffi_rust_future_poll().name() }}( + rustFuture, + UniFfiRustFutureContinuation, + uniffiContinuationHandleMap.insert(continuation) + ) + } + } while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY); + + return liftFunc( + rustCallWithError(errorHandler, { status -> completeFunc(rustFuture, status) }) + ) + } finally { + _UniFFILib.INSTANCE.{{ ci.ffi_rust_future_free().name() }}(rustFuture) + } +} diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/AsyncTypes.kt b/uniffi_bindgen/src/bindings/kotlin/templates/AsyncTypes.kt deleted file mode 100644 index 51da2bf314..0000000000 --- a/uniffi_bindgen/src/bindings/kotlin/templates/AsyncTypes.kt +++ /dev/null @@ -1,47 +0,0 @@ -// Async return type handlers - -{# add imports that we use #} -{{ self.add_import("kotlin.coroutines.Continuation") }} -{{ self.add_import("kotlin.coroutines.resume") }} -{{ self.add_import("kotlin.coroutines.resumeWithException") }} - -{# We use these in the generated functions, which don't have access to add_import() -- might as well add it here #} -{{ self.add_import("kotlinx.coroutines.suspendCancellableCoroutine") }} -{{ self.add_import("kotlinx.coroutines.coroutineScope") }} - -// Stores all active future callbacks to ensure they're not GC'ed while waiting for the Rust code to -// complete the callback -val uniffiActiveFutureCallbacks = mutableSetOf() - -// FFI type for callback handlers -{%- for callback_param in ci.iter_future_callback_params()|unique_ffi_types %} -internal interface UniFfiFutureCallback{{ callback_param|ffi_type_name }} : com.sun.jna.Callback { - // Note: callbackData is always 0. We could pass Rust a pointer/usize to represent the - // continuation, but with JNA it's easier to just store it in the callback handler. - fun callback(_callbackData: USize, returnValue: {{ callback_param|ffi_type_name_by_value }}?, callStatus: RustCallStatus.ByValue); -} -{%- endfor %} - -// Callback handlers for an async call. These are invoked by Rust when the future is ready. They -// lift the return value or error and resume the suspended function. -{%- for result_type in ci.iter_async_result_types() %} -{%- let callback_param = result_type.future_callback_param() %} - -internal class {{ result_type|future_callback_handler }}(val continuation: {{ result_type|future_continuation_type }}) - : UniFfiFutureCallback{{ callback_param|ffi_type_name }} { - override fun callback(_callbackData: USize, returnValue: {{ callback_param|ffi_type_name_by_value }}?, callStatus: RustCallStatus.ByValue) { - uniffiActiveFutureCallbacks.remove(this) - try { - checkCallStatus({{ result_type|error_handler }}, callStatus) - {%- match result_type.return_type %} - {%- when Some(return_type) %} - continuation.resume({{ return_type|lift_fn }}(returnValue!!)) - {%- when None %} - continuation.resume(Unit) - {%- endmatch %} - } catch (e: Throwable) { - continuation.resumeWithException(e) - } - } -} -{%- endfor %} diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt b/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt index 26926a17a5..eb878d61e3 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt @@ -150,7 +150,7 @@ internal class UniFfiHandleMap { return map.get(handle) } - fun remove(handle: USize) { - map.remove(handle) + fun remove(handle: USize): T? { + return map.remove(handle) } } diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/ObjectTemplate.kt b/uniffi_bindgen/src/bindings/kotlin/templates/ObjectTemplate.kt index f581361323..0112344b8f 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/ObjectTemplate.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/ObjectTemplate.kt @@ -57,32 +57,30 @@ class {{ type_name }}( {%- if meth.is_async() %} @Suppress("ASSIGNED_BUT_NEVER_ACCESSED_VARIABLE") override suspend fun {{ meth.name()|fn_name }}({%- call kt::arg_list_decl(meth) -%}){% match meth.return_type() %}{% when Some with (return_type) %} : {{ return_type|type_name }}{% when None %}{%- endmatch %} { - // Create a new `CoroutineScope` for this operation, suspend the coroutine, and call the - // scaffolding function, passing it one of the callback handlers from `AsyncTypes.kt`. - return coroutineScope { - val scope = this - return@coroutineScope suspendCancellableCoroutine { continuation -> - try { - val callback = {{ meth.result_type().borrow()|future_callback_handler }}(continuation) - uniffiActiveFutureCallbacks.add(callback) - continuation.invokeOnCancellation { uniffiActiveFutureCallbacks.remove(callback) } - callWithPointer { thisPtr -> - rustCall { status -> - _UniFFILib.INSTANCE.{{ meth.ffi_func().name() }}( - thisPtr, - {% call kt::arg_list_lowered(meth) %} - FfiConverterForeignExecutor.lower(scope), - callback, - USize(0), - status, - ) - } - } - } catch (e: Exception) { - continuation.resumeWithException(e) - } - } + val rustFuture = callWithPointer { thisPtr -> + _UniFFILib.INSTANCE.{{ meth.ffi_func().name() }}( + thisPtr, + {% call kt::arg_list_lowered(meth) %} + ) } + return uniffiDriveFuture( + rustFuture, + { future, status -> _UniFFILib.INSTANCE.{{ meth.ffi_rust_future_complete(ci) }}(future, status) }, + // lift function + {%- match meth.return_type() %} + {%- when Some(return_type) %} + { {{ return_type|lift_fn }}(it) }, + {%- when None %} + { Unit }, + {% endmatch %} + // Error FFI converter + {%- match meth.throws_type() %} + {%- when Some(e) %} + {{ e|error_type_name }}.ErrorHandler, + {%- when None %} + NullCallStatusErrorHandler, + {%- endmatch %} + ) } {%- else -%} {%- match meth.return_type() -%} diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/TopLevelFunctionTemplate.kt b/uniffi_bindgen/src/bindings/kotlin/templates/TopLevelFunctionTemplate.kt index dbcab05b81..5c347f1e4e 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/TopLevelFunctionTemplate.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/TopLevelFunctionTemplate.kt @@ -7,29 +7,25 @@ @Suppress("ASSIGNED_BUT_NEVER_ACCESSED_VARIABLE") suspend fun {{ func.name()|fn_name }}({%- call kt::arg_list_decl(func) -%}){% match func.return_type() %}{% when Some with (return_type) %} : {{ return_type|type_name }}{% when None %}{%- endmatch %} { - // Create a new `CoroutineScope` for this operation, suspend the coroutine, and call the - // scaffolding function, passing it one of the callback handlers from `AsyncTypes.kt`. - return coroutineScope { - val scope = this - return@coroutineScope suspendCancellableCoroutine { continuation -> - try { - val callback = {{ func.result_type().borrow()|future_callback_handler }}(continuation) - uniffiActiveFutureCallbacks.add(callback) - continuation.invokeOnCancellation { uniffiActiveFutureCallbacks.remove(callback) } - rustCall { status -> - _UniFFILib.INSTANCE.{{ func.ffi_func().name() }}( - {% call kt::arg_list_lowered(func) %} - FfiConverterForeignExecutor.lower(scope), - callback, - USize(0), - status, - ) - } - } catch (e: Exception) { - continuation.resumeWithException(e) - } - } - } + val rustFuture = _UniFFILib.INSTANCE.{{ func.ffi_func().name() }}({% call kt::arg_list_lowered(func) %}) + return uniffiDriveFuture( + rustFuture, + { future, status -> _UniFFILib.INSTANCE.{{ func.ffi_rust_future_complete(ci) }}(future, status) }, + // lift function + {%- match func.return_type() %} + {%- when Some(return_type) %} + { {{ return_type|lift_fn }}(it) }, + {%- when None %} + { Unit }, + {% endmatch %} + // Error FFI converter + {%- match func.throws_type() %} + {%- when Some(e) %} + {{ e|error_type_name }}.ErrorHandler, + {%- when None %} + NullCallStatusErrorHandler, + {%- endmatch %} + ) } {%- else %} diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt b/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt index a9f75a0c3b..728dc481de 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt @@ -102,5 +102,8 @@ {%- endfor %} {%- if ci.has_async_fns() %} -{% include "AsyncTypes.kt" %} +{# Import types needed for async support #} +{{ self.add_import("kotlin.coroutines.resume") }} +{{ self.add_import("kotlinx.coroutines.suspendCancellableCoroutine") }} +{{ self.add_import("kotlinx.coroutines.CancellableContinuation") }} {%- endif %} diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/wrapper.kt b/uniffi_bindgen/src/bindings/kotlin/templates/wrapper.kt index 22f9ca31de..9ee4229018 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/wrapper.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/wrapper.kt @@ -42,6 +42,11 @@ import java.util.concurrent.ConcurrentHashMap // and the FFI Function declarations in a com.sun.jna.Library. {% include "NamespaceLibraryTemplate.kt" %} +// Async support +{%- if ci.has_async_fns() %} +{% include "Async.kt" %} +{%- endif %} + // Public interface members begin here. {{ type_helper_code }} diff --git a/uniffi_bindgen/src/bindings/python/gen_python/mod.rs b/uniffi_bindgen/src/bindings/python/gen_python/mod.rs index 35b17e3009..be4eefc6d9 100644 --- a/uniffi_bindgen/src/bindings/python/gen_python/mod.rs +++ b/uniffi_bindgen/src/bindings/python/gen_python/mod.rs @@ -311,13 +311,9 @@ impl PythonCodeOracle { // Pointer to an `asyncio.EventLoop` instance FfiType::ForeignExecutorHandle => "ctypes.c_size_t".to_string(), FfiType::ForeignExecutorCallback => "_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_T".to_string(), - FfiType::FutureCallback { return_type } => { - format!( - "_uniffi_future_callback_t({})", - Self::ffi_type_label(return_type), - ) - } - FfiType::FutureCallbackData => "ctypes.c_size_t".to_string(), + FfiType::RustFutureHandle => "ctypes.c_void_p".to_string(), + FfiType::RustFutureContinuation => "_UNIFFI_FUTURE_CONTINUATION_T".to_string(), + FfiType::RustFutureContinuationData => "ctypes.c_size_t".to_string(), } } } @@ -408,25 +404,14 @@ pub mod filters { Ok(format!("{}.write", ffi_converter_name(as_ct)?)) } - // Name of the callback function we pass to Rust to complete an async call - pub fn async_callback_fn(result_type: &ResultType) -> Result { - let return_string = match &result_type.return_type { - Some(t) => PythonCodeOracle.find(t).canonical_name().to_snake_case(), - None => "void".into(), - }; - let throws_string = match &result_type.throws_type { - Some(t) => PythonCodeOracle.find(t).canonical_name().to_snake_case(), - None => "void".into(), - }; - Ok(format!( - "_uniffi_async_callback_{return_string}__{throws_string}" - )) - } - pub fn literal_py(literal: &Literal, as_ct: &impl AsCodeType) -> Result { Ok(as_ct.as_codetype().literal(literal)) } + pub fn ffi_type(type_: &Type) -> Result { + Ok(type_.into()) + } + /// Get the Python syntax for representing a given low-level `FfiType`. pub fn ffi_type_name(type_: &FfiType) -> Result { Ok(PythonCodeOracle::ffi_type_label(type_)) diff --git a/uniffi_bindgen/src/bindings/python/templates/Async.py b/uniffi_bindgen/src/bindings/python/templates/Async.py new file mode 100644 index 0000000000..ee2aab0ca2 --- /dev/null +++ b/uniffi_bindgen/src/bindings/python/templates/Async.py @@ -0,0 +1,39 @@ +# RustFuturePoll values +_UNIFFI_RUST_FUTURE_POLL_READY = 0 +_UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1 + +# Stores futures for _uniffi_continuation_func +_UniffiContinuationPointerManager = _UniffiPointerManager() + +# Continuation callback for async functions +# lift the return value or error and resolve the future, causing the async function to resume. +@_UNIFFI_FUTURE_CONTINUATION_T +def _uniffi_continuation_func(future_ptr, poll_code): + (eventloop, future) = _UniffiContinuationPointerManager.release_pointer(future_ptr) + eventloop.call_soon_threadsafe(_uniffi_set_future_result, future, poll_code) + +def _uniffi_set_future_result(future, poll_code): + if not future.cancelled(): + future.set_result(poll_code) + +async def _uniffi_drive_future(rust_future, ffi_complete, lift_func, error_ffi_converter): + try: + eventloop = asyncio.get_running_loop() + + # Loop and poll until we see a _UNIFFI_RUST_FUTURE_POLL_READY value + while True: + future = eventloop.create_future() + _UniffiLib.{{ ci.ffi_rust_future_poll().name() }}( + rust_future, + _uniffi_continuation_func, + _UniffiContinuationPointerManager.new_pointer((eventloop, future)), + ) + poll_code = await future + if poll_code == _UNIFFI_RUST_FUTURE_POLL_READY: + break + + return lift_func( + _rust_call_with_error(error_ffi_converter, ffi_complete, rust_future) + ) + finally: + _UniffiLib.{{ ci.ffi_rust_future_free().name() }}(rust_future) diff --git a/uniffi_bindgen/src/bindings/python/templates/AsyncTypes.py b/uniffi_bindgen/src/bindings/python/templates/AsyncTypes.py deleted file mode 100644 index c85891defe..0000000000 --- a/uniffi_bindgen/src/bindings/python/templates/AsyncTypes.py +++ /dev/null @@ -1,36 +0,0 @@ -# Callback handlers for async returns - -_UniffiPyFuturePointerManager = _UniffiPointerManager() - -# Callback handlers for an async calls. These are invoked by Rust when the future is ready. They -# lift the return value or error and resolve the future, causing the async function to resume. -{%- for result_type in ci.iter_async_result_types() %} -@_uniffi_future_callback_t( - {%- match result_type.return_type -%} - {%- when Some(return_type) -%} - {{ return_type|ffi_type|ffi_type_name }} - {%- when None -%} - ctypes.c_uint8 - {%- endmatch -%} -) -def {{ result_type|async_callback_fn }}(future_ptr, result, call_status): - future = _UniffiPyFuturePointerManager.release_pointer(future_ptr) - if future.cancelled(): - return - try: - {%- match result_type.throws_type %} - {%- when Some(throws_type) %} - _uniffi_check_call_status({{ throws_type|ffi_converter_name }}, call_status) - {%- when None %} - _uniffi_check_call_status(None, call_status) - {%- endmatch %} - - {%- match result_type.return_type %} - {%- when Some(return_type) %} - future.set_result({{ return_type|lift_fn }}(result)) - {%- when None %} - future.set_result(None) - {%- endmatch %} - except BaseException as e: - future.set_exception(e) -{%- endfor %} diff --git a/uniffi_bindgen/src/bindings/python/templates/Helpers.py b/uniffi_bindgen/src/bindings/python/templates/Helpers.py index 356fe9786b..dca962f176 100644 --- a/uniffi_bindgen/src/bindings/python/templates/Helpers.py +++ b/uniffi_bindgen/src/bindings/python/templates/Helpers.py @@ -44,25 +44,6 @@ def _rust_call_with_error(error_ffi_converter, fn, *args): _uniffi_check_call_status(error_ffi_converter, call_status) return result -def _rust_call_async(scaffolding_fn, callback_fn, *args): - # Call the scaffolding function, passing it a callback handler for `AsyncTypes.py` and a pointer - # to a python Future object. The async function then awaits the Future. - uniffi_eventloop = asyncio.get_running_loop() - uniffi_py_future = uniffi_eventloop.create_future() - uniffi_call_status = _UniffiRustCallStatus(code=_UniffiRustCallStatus.CALL_SUCCESS, error_buf=_UniffiRustBuffer(0, 0, None)) - scaffolding_fn(*args, - _UniffiConverterForeignExecutor._pointer_manager.new_pointer(uniffi_eventloop), - callback_fn, - # Note: It's tempting to skip the pointer manager and just use a `py_object` pointing to a - # local variable like we do in Swift. However, Python doesn't use cooperative cancellation - # -- asyncio can cancel a task at anytime. This means if we use a local variable, the Rust - # callback could fire with a dangling pointer. - _UniffiPyFuturePointerManager.new_pointer(uniffi_py_future), - ctypes.byref(uniffi_call_status), - ) - _uniffi_check_call_status(None, uniffi_call_status) - return uniffi_py_future - def _uniffi_check_call_status(error_ffi_converter, call_status): if call_status.code == _UniffiRustCallStatus.CALL_SUCCESS: pass @@ -88,3 +69,7 @@ def _uniffi_check_call_status(error_ffi_converter, call_status): # A function pointer for a callback as defined by UniFFI. # Rust definition `fn(handle: u64, method: u32, args: _UniffiRustBuffer, buf_ptr: *mut _UniffiRustBuffer) -> int` _UNIFFI_FOREIGN_CALLBACK_T = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_ulonglong, ctypes.c_ulong, ctypes.POINTER(ctypes.c_char), ctypes.c_int, ctypes.POINTER(_UniffiRustBuffer)) + +# UniFFI future continuation +_UNIFFI_FUTURE_CONTINUATION_T = ctypes.CFUNCTYPE(None, ctypes.c_size_t, ctypes.c_int8) + diff --git a/uniffi_bindgen/src/bindings/python/templates/TopLevelFunctionTemplate.py b/uniffi_bindgen/src/bindings/python/templates/TopLevelFunctionTemplate.py index 89b1b5f137..61b7c22c4a 100644 --- a/uniffi_bindgen/src/bindings/python/templates/TopLevelFunctionTemplate.py +++ b/uniffi_bindgen/src/bindings/python/templates/TopLevelFunctionTemplate.py @@ -1,11 +1,24 @@ {%- if func.is_async() %} -async def {{ func.name()|fn_name }}({%- call py::arg_list_decl(func) -%}): - {%- call py::setup_args(func) %} - return await _rust_call_async( - _UniffiLib.{{ func.ffi_func().name() }}, - {{ func.result_type().borrow()|async_callback_fn }}, - {% call py::arg_list_lowered(func) %} +def {{ func.name()|fn_name }}({%- call py::arg_list_decl(func) -%}): + rust_future = _UniffiLib.{{ func.ffi_func().name() }}({% call py::arg_list_lowered(func) %}) + return _uniffi_drive_future( + rust_future, + _UniffiLib.{{func.ffi_rust_future_complete(ci) }}, + # lift function + {%- match func.return_type() %} + {%- when Some(return_type) %} + {{ return_type|lift_fn }}, + {%- when None %} + lambda val: None, + {% endmatch %} + # Error FFI converter + {%- match func.throws_type() %} + {%- when Some(e) %} + {{ e|ffi_converter_name }}, + {%- when None %} + None, + {%- endmatch %} ) {%- else %} diff --git a/uniffi_bindgen/src/bindings/python/templates/Types.py b/uniffi_bindgen/src/bindings/python/templates/Types.py index a2b29f799e..2e10ac2a1d 100644 --- a/uniffi_bindgen/src/bindings/python/templates/Types.py +++ b/uniffi_bindgen/src/bindings/python/templates/Types.py @@ -100,7 +100,3 @@ {%- else %} {%- endmatch %} {%- endfor %} - -{%- if ci.has_async_fns() %} -{%- include "AsyncTypes.py" %} -{%- endif %} diff --git a/uniffi_bindgen/src/bindings/python/templates/macros.py b/uniffi_bindgen/src/bindings/python/templates/macros.py index bc4f121653..684844f923 100644 --- a/uniffi_bindgen/src/bindings/python/templates/macros.py +++ b/uniffi_bindgen/src/bindings/python/templates/macros.py @@ -100,13 +100,28 @@ {%- macro method_decl(py_method_name, meth) %} {% if meth.is_async() %} - async def {{ py_method_name }}(self, {% call arg_list_decl(meth) %}): + def {{ py_method_name }}(self, {% call arg_list_decl(meth) %}): {%- call setup_args_extra_indent(meth) %} - return await _rust_call_async( - _UniffiLib.{{ func.ffi_func().name() }}, - {{ func.result_type().borrow()|async_callback_fn }}, - self._pointer, - {% call arg_list_lowered(func) %} + rust_future = _UniffiLib.{{ meth.ffi_func().name() }}( + self._pointer, {% call arg_list_lowered(meth) %} + ) + return _uniffi_drive_future( + rust_future, + _UniffiLib.{{ meth.ffi_rust_future_complete(ci) }}, + # lift function + {%- match meth.return_type() %} + {%- when Some(return_type) %} + {{ return_type|lift_fn }}, + {%- when None %} + lambda val: None, + {% endmatch %} + # Error FFI converter + {%- match meth.throws_type() %} + {%- when Some(e) %} + {{ e|ffi_converter_name }}, + {%- when None %} + None, + {%- endmatch %} ) {%- else -%} diff --git a/uniffi_bindgen/src/bindings/python/templates/wrapper.py b/uniffi_bindgen/src/bindings/python/templates/wrapper.py index 6fb88dcaee..24c3290ff7 100644 --- a/uniffi_bindgen/src/bindings/python/templates/wrapper.py +++ b/uniffi_bindgen/src/bindings/python/templates/wrapper.py @@ -40,6 +40,11 @@ # Contains loading, initialization code, and the FFI Function declarations. {% include "NamespaceLibraryTemplate.py" %} +# Async support +{%- if ci.has_async_fns() %} +{%- include "Async.py" %} +{%- endif %} + # Public interface members begin here. {{ type_helper_code }} diff --git a/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs b/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs index 4380cc378c..22055fc6fe 100644 --- a/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs +++ b/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs @@ -162,7 +162,9 @@ mod filters { FfiType::ForeignExecutorHandle => { unimplemented!("Foreign executors are not implemented") } - FfiType::FutureCallback { .. } | FfiType::FutureCallbackData => { + FfiType::RustFutureHandle + | FfiType::RustFutureContinuation + | FfiType::RustFutureContinuationData => { unimplemented!("Async functions are not implemented") } }) diff --git a/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs b/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs index f85453c9ff..2e2d938c9d 100644 --- a/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs +++ b/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs @@ -463,10 +463,10 @@ impl SwiftCodeOracle { FfiType::ForeignCallback => "ForeignCallback".into(), FfiType::ForeignExecutorHandle => "Int".into(), FfiType::ForeignExecutorCallback => "ForeignExecutorCallback".into(), - FfiType::FutureCallback { return_type } => { - format!("UniFfiFutureCallback{}", self.ffi_type_label(return_type)) + FfiType::RustFutureContinuation => "UniFfiRustFutureContinuation".into(), + FfiType::RustFutureHandle | FfiType::RustFutureContinuationData => { + "UnsafeMutableRawPointer".into() } - FfiType::FutureCallbackData => "UnsafeMutableRawPointer".into(), } } @@ -474,7 +474,9 @@ impl SwiftCodeOracle { match ffi_type { FfiType::ForeignCallback | FfiType::ForeignExecutorCallback - | FfiType::FutureCallback { .. } => { + | FfiType::RustFutureHandle + | FfiType::RustFutureContinuation + | FfiType::RustFutureContinuationData => { format!("{} _Nonnull", self.ffi_type_label_raw(ffi_type)) } _ => self.ffi_type_label_raw(ffi_type), @@ -558,11 +560,10 @@ pub mod filters { FfiType::ForeignCallback => "ForeignCallback _Nonnull".into(), FfiType::ForeignExecutorCallback => "UniFfiForeignExecutorCallback _Nonnull".into(), FfiType::ForeignExecutorHandle => "size_t".into(), - FfiType::FutureCallback { return_type } => format!( - "UniFfiFutureCallback{} _Nonnull", - SwiftCodeOracle.ffi_type_label_raw(return_type) - ), - FfiType::FutureCallbackData => "void* _Nonnull".into(), + FfiType::RustFutureContinuation => "UniFfiRustFutureContinuation _Nonnull".into(), + FfiType::RustFutureHandle | FfiType::RustFutureContinuationData => { + "void* _Nonnull".into() + } }) } @@ -618,14 +619,4 @@ pub mod filters { } )) } - - pub fn future_continuation_type(result: &ResultType) -> Result { - Ok(format!( - "CheckedContinuation<{}, Error>", - match &result.return_type { - Some(return_type) => type_name(return_type)?, - None => "()".into(), - } - )) - } } diff --git a/uniffi_bindgen/src/bindings/swift/templates/Async.swift b/uniffi_bindgen/src/bindings/swift/templates/Async.swift new file mode 100644 index 0000000000..18bbc0ce75 --- /dev/null +++ b/uniffi_bindgen/src/bindings/swift/templates/Async.swift @@ -0,0 +1,60 @@ +private let UNIFFI_RUST_FUTURE_POLL_READY: Int8 = 0 +private let UNIFFI_RUST_FUTURE_POLL_MAYBE_READY: Int8 = 1 + +internal func uniffiDriveFuture( + rustFutureFunc: () -> UnsafeMutableRawPointer, + completeFunc: (UnsafeMutableRawPointer, UnsafeMutablePointer) -> F, + liftFunc: (F) throws -> T, + errorHandler: ((RustBuffer) throws -> Error)? +) async throws -> T { + // Make sure to call uniffiEnsureInitialized() since future creation doesn't have a + // RustCallStatus param, so doesn't use makeRustCall() + uniffiEnsureInitialized() + let rustFuture = rustFutureFunc() + defer { + {{ ci.ffi_rust_future_free().name() }}(rustFuture) + } + var pollResult: Int8; + repeat { + pollResult = await withUnsafeContinuation { + {{ ci.ffi_rust_future_poll().name() }}( + rustFuture, + uniffiFutureContinuation, + ContinuationHolder($0).toOpaque() + ) + } + } while pollResult != UNIFFI_RUST_FUTURE_POLL_READY + + return try liftFunc(makeRustCall( + { completeFunc(rustFuture, $0) }, + errorHandler: errorHandler + )) +} + +// Callback handlers for an async calls. These are invoked by Rust when the future is ready. They +// lift the return value or error and resume the suspended function. +fileprivate func uniffiFutureContinuation(ptr: UnsafeMutableRawPointer, pollResult: Int8) { + ContinuationHolder.fromOpaque(ptr).resume(pollResult) +} + +// Wraps UnsafeContinuation in a class so that we can use reference counting when passing it across +// the FFI +class ContinuationHolder { + let continuation: UnsafeContinuation + + init(_ continuation: UnsafeContinuation) { + self.continuation = continuation + } + + func resume(_ pollResult: Int8) { + self.continuation.resume(returning: pollResult) + } + + func toOpaque() -> UnsafeMutableRawPointer { + return Unmanaged.passRetained(self).toOpaque() + } + + static func fromOpaque(_ ptr: UnsafeRawPointer) -> ContinuationHolder { + return Unmanaged.fromOpaque(ptr).takeRetainedValue() + } +} diff --git a/uniffi_bindgen/src/bindings/swift/templates/AsyncTypes.swift b/uniffi_bindgen/src/bindings/swift/templates/AsyncTypes.swift deleted file mode 100644 index b7dcff516b..0000000000 --- a/uniffi_bindgen/src/bindings/swift/templates/AsyncTypes.swift +++ /dev/null @@ -1,29 +0,0 @@ -// Callbacks for async functions - -// Callback handlers for an async calls. These are invoked by Rust when the future is ready. They -// lift the return value or error and resume the suspended function. -{%- for result_type in ci.iter_async_result_types() %} -fileprivate func {{ result_type|future_callback }}( - rawContinutation: UnsafeRawPointer, - returnValue: {{ result_type.future_callback_param().borrow()|ffi_type_name }}, - callStatus: RustCallStatus) { - - let continuation = rawContinutation.bindMemory( - to: {{ result_type|future_continuation_type }}.self, - capacity: 1 - ) - - do { - try uniffiCheckCallStatus(callStatus: callStatus, errorHandler: {{ result_type|error_handler }}) - {%- match result_type.return_type %} - {%- when Some(return_type) %} - continuation.pointee.resume(returning: try {{ return_type|lift_fn }}(returnValue)) - {%- when None %} - continuation.pointee.resume(returning: ()) - {%- endmatch %} - } catch let error { - continuation.pointee.resume(throwing: error) - } -} - -{%- endfor %} diff --git a/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h b/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h index d87977670f..e8cfcbefc1 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h +++ b/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h @@ -60,9 +60,7 @@ typedef struct RustCallStatus { #endif // def UNIFFI_SHARED_H // Callbacks for UniFFI Futures -{%- for ffi_type in ci.iter_future_callback_params() %} -typedef void (*UniFfiFutureCallback{{ ffi_type|ffi_canonical_name }})(const void * _Nonnull, {{ ffi_type|header_ffi_type_name }}, RustCallStatus); -{%- endfor %} +typedef void (*UniFfiRustFutureContinuation)(void * _Nonnull, int8_t); // Scaffolding functions {%- for func in ci.iter_ffi_function_definitions() %} diff --git a/uniffi_bindgen/src/bindings/swift/templates/ObjectTemplate.swift b/uniffi_bindgen/src/bindings/swift/templates/ObjectTemplate.swift index 0de3118707..7d8bc32b5e 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/ObjectTemplate.swift +++ b/uniffi_bindgen/src/bindings/swift/templates/ObjectTemplate.swift @@ -44,25 +44,29 @@ public class {{ type_name }}: {{ obj.name() }}Protocol { {%- if meth.is_async() %} public func {{ meth.name()|fn_name }}({%- call swift::arg_list_decl(meth) -%}) async {% call swift::throws(meth) %}{% match meth.return_type() %}{% when Some with (return_type) %} -> {{ return_type|type_name }}{% when None %}{% endmatch %} { - // Suspend the function and call the scaffolding function, passing it a callback handler from - // `AsyncTypes.swift` - // - // Make sure to hold on to a reference to the continuation in the top-level scope so that - // it's not freed before the callback is invoked. - var continuation: {{ meth.result_type().borrow()|future_continuation_type }}? = nil - return {% call swift::try(meth) %} await withCheckedThrowingContinuation { - continuation = $0 - try! rustCall() { + return {% call swift::try(meth) %} await uniffiDriveFuture( + rustFutureFunc: { {{ meth.ffi_func().name() }}( self.pointer, - {% call swift::arg_list_lowered(meth) %} - FfiConverterForeignExecutor.lower(UniFfiForeignExecutor()), - {{ meth.result_type().borrow()|future_callback }}, - &continuation, - $0 + {%- for arg in meth.arguments() %} + {{ arg|lower_fn }}({{ arg.name()|var_name }}){% if !loop.last %},{% endif %} + {%- endfor %} ) - } - } + }, + completeFunc: {{ meth.ffi_rust_future_complete(ci) }}, + {%- match meth.return_type() %} + {%- when Some(return_type) %} + liftFunc: {{ return_type|lift_fn }}, + {%- when None %} + liftFunc: { $0 }, + {%- endmatch %} + {%- match meth.throws_type() %} + {%- when Some with (e) %} + errorHandler: {{ e|ffi_converter_name }}.lift + {%- else %} + errorHandler: nil + {% endmatch %} + ) } {% else -%} diff --git a/uniffi_bindgen/src/bindings/swift/templates/TopLevelFunctionTemplate.swift b/uniffi_bindgen/src/bindings/swift/templates/TopLevelFunctionTemplate.swift index e3c87ca336..85f2b92c3c 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/TopLevelFunctionTemplate.swift +++ b/uniffi_bindgen/src/bindings/swift/templates/TopLevelFunctionTemplate.swift @@ -1,24 +1,28 @@ {%- if func.is_async() %} public func {{ func.name()|fn_name }}({%- call swift::arg_list_decl(func) -%}) async {% call swift::throws(func) %}{% match func.return_type() %}{% when Some with (return_type) %} -> {{ return_type|type_name }}{% when None %}{% endmatch %} { - var continuation: {{ func.result_type().borrow()|future_continuation_type }}? = nil - // Suspend the function and call the scaffolding function, passing it a callback handler from - // `AsyncTypes.swift` - // - // Make sure to hold on to a reference to the continuation in the top-level scope so that - // it's not freed before the callback is invoked. - return {% call swift::try(func) %} await withCheckedThrowingContinuation { - continuation = $0 - try! rustCall() { + return {% call swift::try(func) %} await uniffiDriveFuture( + rustFutureFunc: { {{ func.ffi_func().name() }}( - {% call swift::arg_list_lowered(func) %} - FfiConverterForeignExecutor.lower(UniFfiForeignExecutor()), - {{ func.result_type().borrow()|future_callback }}, - &continuation, - $0 + {%- for arg in func.arguments() %} + {{ arg|lower_fn }}({{ arg.name()|var_name }}){% if !loop.last %},{% endif %} + {%- endfor %} ) - } - } + }, + completeFunc: {{ func.ffi_rust_future_complete(ci) }}, + {%- match func.return_type() %} + {%- when Some(return_type) %} + liftFunc: {{ return_type|lift_fn }}, + {%- when None %} + liftFunc: { $0 }, + {%- endmatch %} + {%- match func.throws_type() %} + {%- when Some with (e) %} + errorHandler: {{ e|ffi_converter_name }}.lift + {%- else %} + errorHandler: nil + {% endmatch %} + ) } {% else %} diff --git a/uniffi_bindgen/src/bindings/swift/templates/Types.swift b/uniffi_bindgen/src/bindings/swift/templates/Types.swift index dbde9c0d4c..aba34f4b0b 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/Types.swift +++ b/uniffi_bindgen/src/bindings/swift/templates/Types.swift @@ -96,7 +96,3 @@ {%- else %} {%- endmatch %} {%- endfor %} - -{%- if ci.has_async_fns() %} -{%- include "AsyncTypes.swift" %} -{%- endif %} diff --git a/uniffi_bindgen/src/bindings/swift/templates/wrapper.swift b/uniffi_bindgen/src/bindings/swift/templates/wrapper.swift index 8aa85a9195..c34d348efb 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/wrapper.swift +++ b/uniffi_bindgen/src/bindings/swift/templates/wrapper.swift @@ -19,6 +19,10 @@ import {{ config.ffi_module_name() }} // Public interface members begin here. {{ type_helper_code }} +{%- if ci.has_async_fns() %} +{% include "Async.swift" %} +{%- endif %} + {%- for func in ci.function_definitions() %} {%- include "TopLevelFunctionTemplate.swift" %} {%- endfor %} diff --git a/uniffi_bindgen/src/interface/ffi.rs b/uniffi_bindgen/src/interface/ffi.rs index 81ba0674ad..dea7a6995b 100644 --- a/uniffi_bindgen/src/interface/ffi.rs +++ b/uniffi_bindgen/src/interface/ffi.rs @@ -54,14 +54,11 @@ pub enum FfiType { ForeignExecutorHandle, /// Pointer to the callback function that's invoked to schedule calls with a ForeignExecutor ForeignExecutorCallback, - /// Pointer to a callback function to complete an async Rust function - FutureCallback { - /// Note: `return_type` is not optional because we have a void callback parameter like we - /// can have a void return. Instead, we use `UInt8` as a placeholder value. - return_type: Box, - }, - /// Opaque pointer passed to the FutureCallback - FutureCallbackData, + /// Pointer to a Rust future + RustFutureHandle, + /// Continuation function for a Rust future + RustFutureContinuation, + RustFutureContinuationData, // TODO: you can imagine a richer structural typesystem here, e.g. `Ref` or something. // We don't need that yet and it's possible we never will, so it isn't here for now. } @@ -184,28 +181,8 @@ impl FfiFunction { ) { self.arguments = args.into_iter().collect(); if self.is_async() { - self.arguments.extend([ - // Used to schedule polls - FfiArgument { - name: "uniffi_executor".into(), - type_: FfiType::ForeignExecutorHandle, - }, - // Invoked when the future is ready - FfiArgument { - name: "uniffi_callback".into(), - type_: FfiType::FutureCallback { - return_type: Box::new(return_type.unwrap_or(FfiType::UInt8)), - }, - }, - // Data pointer passed to the callback - FfiArgument { - name: "uniffi_callback_data".into(), - type_: FfiType::FutureCallbackData, - }, - ]); - // Async scaffolding functions never return values. Instead, the callback is invoked - // when the Future is ready. - self.return_type = None; + self.return_type = Some(FfiType::RustFutureHandle); + self.has_rust_call_status_arg = false; } else { self.return_type = return_type; } diff --git a/uniffi_bindgen/src/interface/function.rs b/uniffi_bindgen/src/interface/function.rs index 1ebc4a2495..0e2bec7f4d 100644 --- a/uniffi_bindgen/src/interface/function.rs +++ b/uniffi_bindgen/src/interface/function.rs @@ -35,8 +35,7 @@ use anyhow::Result; use super::ffi::{FfiArgument, FfiFunction, FfiType}; -use super::Literal; -use super::{AsType, ObjectImpl, Type, TypeIterator}; +use super::{AsType, ComponentInterface, Literal, ObjectImpl, Type, TypeIterator}; use uniffi_meta::Checksum; /// Represents a standalone function. @@ -250,6 +249,12 @@ pub trait Callable { throws_type: self.throws_type(), } } + + fn ffi_rust_future_complete(&self, ci: &ComponentInterface) -> String { + ci.ffi_rust_future_complete(self.return_type().map(Into::into)) + .name() + .to_string() + } } impl Callable for Function { diff --git a/uniffi_bindgen/src/interface/mod.rs b/uniffi_bindgen/src/interface/mod.rs index c22a5fe756..7cf8fa4591 100644 --- a/uniffi_bindgen/src/interface/mod.rs +++ b/uniffi_bindgen/src/interface/mod.rs @@ -413,6 +413,105 @@ impl ComponentInterface { } } + /// Builtin FFI function for starting up Rust Future + pub fn ffi_rust_future_poll(&self) -> FfiFunction { + FfiFunction { + name: format!("ffi_{}_rust_future_poll", self.ffi_namespace()), + is_async: false, + arguments: vec![ + FfiArgument { + name: "handle".to_string(), + type_: FfiType::RustFutureHandle, + }, + // Continuation to call when the future can make progress + FfiArgument { + name: "continuation".into(), + type_: FfiType::RustFutureContinuation, + }, + // Data to pass to the continuation + FfiArgument { + name: "uniffi_callback".into(), + type_: FfiType::RustFutureContinuationData, + }, + ], + return_type: None, + has_rust_call_status_arg: false, + is_object_free_function: false, + } + } + + /// Builtin FFI function for starting up Rust Future + pub fn ffi_rust_future_complete(&self, return_ffi_type: Option) -> FfiFunction { + let name = match &return_ffi_type { + Some(t) => match t { + FfiType::UInt8 => format!("ffi_{}_rust_future_complete_u8", self.ffi_namespace()), + FfiType::Int8 => format!("ffi_{}_rust_future_complete_i8", self.ffi_namespace()), + FfiType::UInt16 => format!("ffi_{}_rust_future_complete_u16", self.ffi_namespace()), + FfiType::Int16 => format!("ffi_{}_rust_future_complete_i16", self.ffi_namespace()), + FfiType::UInt32 => format!("ffi_{}_rust_future_complete_u32", self.ffi_namespace()), + FfiType::Int32 => format!("ffi_{}_rust_future_complete_i32", self.ffi_namespace()), + FfiType::UInt64 => format!("ffi_{}_rust_future_complete_u64", self.ffi_namespace()), + FfiType::Int64 => format!("ffi_{}_rust_future_complete_i64", self.ffi_namespace()), + FfiType::Float32 => { + format!("ffi_{}_rust_future_complete_f32", self.ffi_namespace()) + } + FfiType::Float64 => { + format!("ffi_{}_rust_future_complete_f64", self.ffi_namespace()) + } + FfiType::RustArcPtr(_) => { + format!("ffi_{}_rust_future_complete_pointer", self.ffi_namespace()) + } + FfiType::RustBuffer(_) => format!( + "ffi_{}_rust_future_complete_rust_buffer", + self.ffi_namespace() + ), + _ => unimplemented!("Async functions for {t:?}"), + }, + None => format!("ffi_{}_rust_future_complete_void", self.ffi_namespace()), + }; + FfiFunction { + name, + is_async: false, + arguments: vec![FfiArgument { + name: "handle".to_string(), + type_: FfiType::RustFutureHandle, + }], + return_type: return_ffi_type, + has_rust_call_status_arg: true, + is_object_free_function: false, + } + } + + /// Builtin FFI function for cancelling a Rust Future + pub fn ffi_rust_future_cancel(&self) -> FfiFunction { + FfiFunction { + name: format!("ffi_{}_rust_future_cancel", self.ffi_namespace()), + is_async: false, + arguments: vec![FfiArgument { + name: "handle".to_string(), + type_: FfiType::RustFutureHandle, + }], + return_type: None, + has_rust_call_status_arg: false, + is_object_free_function: false, + } + } + + /// Builtin FFI function for freeing a Rust Future + pub fn ffi_rust_future_free(&self) -> FfiFunction { + FfiFunction { + name: format!("ffi_{}_rust_future_free", self.ffi_namespace()), + is_async: false, + arguments: vec![FfiArgument { + name: "handle".to_string(), + type_: FfiType::RustFutureHandle, + }], + return_type: None, + has_rust_call_status_arg: false, + is_object_free_function: false, + } + } + /// Does this interface contain async functions? pub fn has_async_fns(&self) -> bool { self.iter_ffi_function_definitions().any(|f| f.is_async()) @@ -444,6 +543,7 @@ impl ComponentInterface { self.iter_user_ffi_function_definitions() .cloned() .chain(self.iter_rust_buffer_ffi_function_definitions()) + .chain(self.iter_futures_ffi_function_definitons()) .chain(self.iter_checksum_ffi_functions()) .chain(self.ffi_foreign_executor_callback_set()) .chain([self.ffi_uniffi_contract_version()]) @@ -481,13 +581,39 @@ impl ComponentInterface { .into_iter() } + /// List all FFI functions definitions for async functionality. + pub fn iter_futures_ffi_function_definitons(&self) -> impl Iterator { + [ + self.ffi_rust_future_poll(), + self.ffi_rust_future_cancel(), + self.ffi_rust_future_free(), + // It could be nice to only iterate over ffi types that are actually used, but that + // gets tricky because both RustBuffer and RustArcPtr have an inner field which would + // cause duplicates to be generated. + self.ffi_rust_future_complete(Some(FfiType::UInt8)), + self.ffi_rust_future_complete(Some(FfiType::Int8)), + self.ffi_rust_future_complete(Some(FfiType::UInt16)), + self.ffi_rust_future_complete(Some(FfiType::Int16)), + self.ffi_rust_future_complete(Some(FfiType::UInt32)), + self.ffi_rust_future_complete(Some(FfiType::Int32)), + self.ffi_rust_future_complete(Some(FfiType::UInt64)), + self.ffi_rust_future_complete(Some(FfiType::Int64)), + self.ffi_rust_future_complete(Some(FfiType::Float32)), + self.ffi_rust_future_complete(Some(FfiType::Float64)), + self.ffi_rust_future_complete(Some(FfiType::RustArcPtr("".to_string()))), + self.ffi_rust_future_complete(Some(FfiType::RustBuffer(None))), + self.ffi_rust_future_complete(None), + ] + .into_iter() + } + /// The ffi_foreign_executor_callback_set FFI function /// /// We only include this in the FFI if the `ForeignExecutor` type is actually used pub fn ffi_foreign_executor_callback_set(&self) -> Option { if self.types.contains(&Type::ForeignExecutor) { Some(FfiFunction { - name: format!("ffi_{}_foreign_executor_callback_set", self.ffi_namespace()), + name: "uniffi_foreign_executor_callback_set".into(), arguments: vec![FfiArgument { name: "callback".into(), type_: FfiType::ForeignExecutorCallback, @@ -607,10 +733,6 @@ impl ComponentInterface { bail!("Conflicting type definition for \"{}\"", defn.name()); } self.types.add_known_types(defn.iter_types())?; - if defn.is_async() { - // Async functions depend on the foreign executor - self.types.add_known_type(&Type::ForeignExecutor)?; - } self.functions.push(defn); Ok(()) @@ -633,10 +755,6 @@ impl ComponentInterface { .ok_or_else(|| anyhow!("add_method_meta: object {} not found", &method.object_name))?; self.types.add_known_types(method.iter_types())?; - if method.is_async() { - // Async functions depend on the foreign executor - self.types.add_known_type(&Type::ForeignExecutor)?; - } method.object_impl = object.imp; object.methods.push(method); diff --git a/uniffi_bindgen/src/scaffolding/mod.rs b/uniffi_bindgen/src/scaffolding/mod.rs index 4473651c63..42538245d4 100644 --- a/uniffi_bindgen/src/scaffolding/mod.rs +++ b/uniffi_bindgen/src/scaffolding/mod.rs @@ -83,11 +83,10 @@ mod filters { FfiType::RustBuffer(_) => "::uniffi::RustBuffer".into(), FfiType::ForeignBytes => "::uniffi::ForeignBytes".into(), FfiType::ForeignCallback => "::uniffi::ForeignCallback".into(), + FfiType::RustFutureHandle => "::uniffi::RustFutureHandle".into(), + FfiType::RustFutureContinuation => "::uniffi::RustFutureContinuation".into(), + FfiType::RustFutureContinuationData => "*const ()".into(), FfiType::ForeignExecutorHandle => "::uniffi::ForeignExecutorHandle".into(), - FfiType::FutureCallback { return_type } => { - format!("::uniffi::FutureCallback<{}>", type_ffi(return_type)?) - } - FfiType::FutureCallbackData => "*const ()".into(), FfiType::ForeignExecutorCallback => "::uniffi::ForeignExecutorCallback".into(), }) } diff --git a/uniffi_core/Cargo.toml b/uniffi_core/Cargo.toml index 6a8b894927..cd86a87000 100644 --- a/uniffi_core/Cargo.toml +++ b/uniffi_core/Cargo.toml @@ -23,6 +23,9 @@ oneshot = { version = "0.1", features = ["async"] } paste = "1.0" static_assertions = "1.1.0" +[dev-dependencies] +once_cell = "1.10.0" + [features] default = [] # `no_mangle` RustBuffer FFI functions diff --git a/uniffi_core/src/ffi/rustbuffer.rs b/uniffi_core/src/ffi/rustbuffer.rs index 5344ab123c..e09e3be89a 100644 --- a/uniffi_core/src/ffi/rustbuffer.rs +++ b/uniffi_core/src/ffi/rustbuffer.rs @@ -49,6 +49,7 @@ use crate::ffi::{rust_call, ForeignBytes, RustCallStatus}; /// This struct is based on `ByteBuffer` from the `ffi-support` crate, but modified /// to retain unallocated capacity rather than truncating to the occupied length. #[repr(C)] +#[derive(Debug)] pub struct RustBuffer { /// The allocated capacity of the underlying `Vec`. /// In Rust this is a `usize`, but we use an `i32` for compatibility with JNA. diff --git a/uniffi_core/src/ffi/rustcalls.rs b/uniffi_core/src/ffi/rustcalls.rs index edaf480273..ff5d996e8c 100644 --- a/uniffi_core/src/ffi/rustcalls.rs +++ b/uniffi_core/src/ffi/rustcalls.rs @@ -19,13 +19,14 @@ use std::panic; /// /// ## Usage /// -/// - The consumer code creates a `RustCallStatus` with an empty `RustBuffer` and `CALL_SUCCESS` -/// (0) as the status code +/// - The consumer code creates a [RustCallStatus] with an empty [RustBuffer] and +/// [RustCallStatusCode::Success] (0) as the status code /// - A pointer to this object is passed to the rust FFI function. This is an /// "out parameter" which will be updated with any error that occurred during the function's /// execution. -/// - After the call, if `code` is `CALL_ERROR` then `error_buf` will be updated to contain -/// the serialized error object. The consumer is responsible for freeing `error_buf`. +/// - After the call, if `code` is [RustCallStatusCode::Error] or [RustCallStatusCode::UnexpectedError] +/// then `error_buf` will be updated to contain a serialized error object. See +/// [RustCallStatusCode] for what gets serialized. The consumer is responsible for freeing `error_buf`. /// /// ## Layout/fields /// @@ -38,20 +39,9 @@ use std::panic; /// RustBuffer error_buf; /// }; /// ``` -/// -/// #### The `code` field. -/// -/// - `CALL_SUCCESS` (0) for successful calls -/// - `CALL_ERROR` (1) for calls that returned an `Err` value -/// - `CALL_PANIC` (2) for calls that panicked -/// -/// #### The `error_buf` field. -/// -/// - For `CALL_ERROR` this is a `RustBuffer` with the serialized error. The consumer code is -/// responsible for freeing this `RustBuffer`. #[repr(C)] pub struct RustCallStatus { - pub code: i8, + pub code: RustCallStatusCode, // code is signed because unsigned types are experimental in Kotlin pub error_buf: MaybeUninit, // error_buf is MaybeUninit to avoid dropping the value that the consumer code sends in: @@ -65,19 +55,48 @@ pub struct RustCallStatus { // leak the first `RustBuffer`. } +impl RustCallStatus { + pub fn cancelled() -> Self { + Self { + code: RustCallStatusCode::Cancelled, + error_buf: MaybeUninit::new(RustBuffer::new()), + } + } + + pub fn error(message: impl Into) -> Self { + Self { + code: RustCallStatusCode::UnexpectedError, + error_buf: MaybeUninit::new(>::lower(message.into())), + } + } +} + impl Default for RustCallStatus { fn default() -> Self { Self { - code: 0, + code: RustCallStatusCode::Success, error_buf: MaybeUninit::uninit(), } } } -#[allow(dead_code)] -const CALL_SUCCESS: i8 = 0; // CALL_SUCCESS is set by the calling code -const CALL_ERROR: i8 = 1; -const CALL_PANIC: i8 = 2; +/// Result of a FFI call to a Rust function +#[repr(i8)] +#[derive(Debug, PartialEq, Eq)] +pub enum RustCallStatusCode { + /// Successful call. + Success = 0, + /// Expected error, corresponding to the `Result::Err` variant. [RustCallStatus::error_buf] + /// will contain the serialized error. + Error = 1, + /// Unexpected error. [RustCallStatus::error_buf] will contain a serialized message string + UnexpectedError = 2, + /// Async function cancelled. [RustCallStatus::error_buf] will be empty and does not need to + /// be freed. + /// + /// This is only returned for async functions + Cancelled = 3, +} /// Handle a scaffolding calls /// @@ -89,7 +108,7 @@ const CALL_PANIC: i8 = 2; /// - `FfiConverter::lower_return` returns `Result<>` types that meet the above criteria> /// - If the function returns a `Ok` value it will be unwrapped and returned /// - If the function returns a `Err` value: -/// - `out_status.code` will be set to `CALL_ERROR` +/// - `out_status.code` will be set to [RustCallStatusCode::Error]. /// - `out_status.error_buf` will be set to a newly allocated `RustBuffer` containing the error. The calling /// code is responsible for freeing the `RustBuffer` /// - `FfiDefault::ffi_default()` is returned, although foreign code should ignore this value @@ -125,11 +144,11 @@ where }); match result { // Happy path. Note: no need to update out_status in this case because the calling code - // initializes it to CALL_SUCCESS + // initializes it to [RustCallStatusCode::Success] Ok(Ok(v)) => Some(v), // Callback returned an Err. Ok(Err(buf)) => { - out_status.code = CALL_ERROR; + out_status.code = RustCallStatusCode::Error; unsafe { // Unsafe because we're setting the `MaybeUninit` value, see above for safety // invariants. @@ -139,7 +158,7 @@ where } // Callback panicked Err(cause) => { - out_status.code = CALL_PANIC; + out_status.code = RustCallStatusCode::UnexpectedError; // Try to coerce the cause into a RustBuffer containing a String. Since this code can // panic, we need to use a second catch_unwind(). let message_result = panic::catch_unwind(panic::AssertUnwindSafe(move || { @@ -179,7 +198,7 @@ mod test { fn create_call_status() -> RustCallStatus { RustCallStatus { - code: 0, + code: RustCallStatusCode::Success, error_buf: MaybeUninit::new(RustBuffer::new()), } } @@ -219,13 +238,13 @@ mod test { as FfiConverter>::lower_return(test_callback(0)) }); - assert_eq!(status.code, CALL_SUCCESS); + assert_eq!(status.code, RustCallStatusCode::Success); assert_eq!(return_value, 100); rust_call(&mut status, || { as FfiConverter>::lower_return(test_callback(1)) }); - assert_eq!(status.code, CALL_ERROR); + assert_eq!(status.code, RustCallStatusCode::Error); unsafe { assert_eq!( >::try_lift(status.error_buf.assume_init()) @@ -238,7 +257,7 @@ mod test { rust_call(&mut status, || { as FfiConverter>::lower_return(test_callback(2)) }); - assert_eq!(status.code, CALL_PANIC); + assert_eq!(status.code, RustCallStatusCode::UnexpectedError); unsafe { assert_eq!( >::try_lift(status.error_buf.assume_init()) diff --git a/uniffi_core/src/ffi/rustfuture.rs b/uniffi_core/src/ffi/rustfuture.rs index 6ad0001a8a..6983fbdedf 100644 --- a/uniffi_core/src/ffi/rustfuture.rs +++ b/uniffi_core/src/ffi/rustfuture.rs @@ -8,68 +8,28 @@ //! //! # The big picture //! -//! What happens when you call an async function exported from the Rust API? +//! We implement async foreign functions using a simplified version of the Future API: //! -//! 1. You make a call to a generated async function in the foreign bindings. -//! 2. That function suspends itself, then makes a scaffolding call. In addition to the normal -//! arguments, it passes a `ForeignExecutor` and callback. -//! 3. Rust uses the `ForeignExecutor` to schedules polls of the Future until it's ready. Then -//! invokes the callback. -//! 4. The callback resumes the suspended async function from (2), closing the loop. +//! 1. Call the scaffolding function to get a [RustFutureHandle] +//! 2a. In a loop: +//! - Call [rust_future_poll] +//! - Suspend the function until the [rust_future_poll] continuation function is called +//! - If the continuation was function was called with [RustFuturePoll::Ready], then break +//! otherwise continue. +//! 2b. If the async function is cancelled, then call [rust_future_cancel]. This causes the +//! continuation function to be called with [RustFuturePoll::Ready] and the [RustFuture] to +//! enter a cancelled state. +//! - This works with languages like Kotlin/Python that break out of the loop before signaling +//! cancel and ignore any future continuation calls +//! - This also works with languages like Swift that don't break out of the loop until the +//! continuation call. +//! 3. Call [rust_future_complete] to get the result of the future. +//! 4. Call [rust_future_free] to free the future, ideally in a finally block. This: +//! - Releases any resources held by the future +//! - Calls any continuation callbacks that have not been called yet //! -//! # Anatomy of an async call -//! -//! Let's consider the following Rust function: -//! -//! ```rust,ignore -//! #[uniffi::export] -//! async fn hello() -> bool { -//! true -//! } -//! ``` -//! -//! In Rust, this `async fn` syntax is strictly equivalent to a normal function that returns a -//! `Future`: -//! -//! ```rust,ignore -//! #[uniffi::export] -//! fn hello() -> impl Future { /* … */ } -//! ``` -//! -//! `uniffi-bindgen` will generate a scaffolding function for each exported async function: -//! -//! ```rust,ignore -//! // The `hello` function, as seen from the outside. It inputs 3 extra arguments: -//! // - executor: used to schedule polls of the future -//! // - callback: invoked when the future is ready -//! // - callback_data: opaque pointer that's passed to the callback. It points to any state needed to -//! // resume the async function. -//! #[no_mangle] -//! pub extern "C" fn _uniffi_hello( -//! // ...If the function inputted arguments, the lowered versions would go here -//! uniffi_executor: ForeignExecutor, -//! uniffi_callback: >::FutureCallback, -//! uniffi_callback_data: *const (), -//! uniffi_call_status: &mut ::uniffi::RustCallStatus -//! ) { -//! ::uniffi::call_with_output(uniffi_call_status, || { -//! let uniffi_rust_future = RustFuture::<_, bool, crate::UniFFITag,>::new( -//! future: hello(), // the future! -//! uniffi_executor, -//! uniffi_callback, -//! uniffi_callback_data, -//! ); -//! uniffi_rust_future.wake(); -//! }) -//! } -//! ``` -//! -//! Rust will continue to poll the future until it's ready, after that. The callback will -//! eventually be invoked with these arguments: -//! - callback_data -//! - FfiConverter::ReturnType (the type that would be returned by a sync function) -//! - RustCallStatus (used to signal errors/panics when executing the future) -//! - Rust will stop polling the future, even if it's waker is invoked again. +//! Note: Technically, the foreign code calls the scaffolding versions of the `rust_future_*` +//! functions. These are generated by the scaffolding macro, specially prefixed, and extern "C". //! //! ## How does `Future` work exactly? //! @@ -115,477 +75,664 @@ //! [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html //! [`RawWaker`]: https://doc.rust-lang.org/std/task/struct.RawWaker.html -use crate::{ - ffi::foreignexecutor::RustTaskCallbackCode, rust_call_with_out_status, schedule_raw, - FfiConverter, FfiDefault, ForeignExecutor, ForeignExecutorHandle, RustCallStatus, -}; +use crate::{rust_call_with_out_status, FfiConverter, FfiDefault, RustCallStatus}; use std::{ cell::UnsafeCell, future::Future, + marker::PhantomData, + ops::Deref, panic, pin::Pin, sync::{ - atomic::{AtomicU32, Ordering}, - Arc, + atomic::{AtomicU8, Ordering}, + Arc, Mutex, }, - task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + task::{Context, Poll, Wake}, }; -/// Callback that we invoke when a `RustFuture` is ready. +/// Result code for [rust_future_poll]. This is passed to the continuation function. +#[repr(i8)] +#[derive(Debug, PartialEq, Eq)] +pub enum RustFuturePoll { + /// The future is ready and is waiting for [rust_future_complete] to be called + Ready = 0, + /// The future might be ready and [rust_future_poll] should be called again + MaybeReady = 1, +} + +/// Opaque handle for a Rust future that's stored by the foreign language code +#[repr(transparent)] +pub struct RustFutureHandle(*const ()); + +/// Foreign callback that's passed to [rust_future_poll] /// -/// The foreign code passes a pointer to one of these callbacks along with an opaque data pointer. -/// When the future is ready, we invoke the callback. -pub type FutureCallback = - extern "C" fn(callback_data: *const (), result: T, status: RustCallStatus); +/// The Rust side of things calls this when the foreign side should call [rust_future_poll] and +/// continue progress on the future. +pub type RustFutureContinuation = extern "C" fn(callback_data: *const (), status: RustFuturePoll); -/// Future that the foreign code is awaiting +// === Public FFI API === + +/// Create a new [RustFutureHandle] /// -/// RustFuture is always stored inside a Pin>. The `Arc<>` allows it to be shared between -/// wakers and Pin<> signals that it must not move, since this would break any self-references in -/// the future. -pub struct RustFuture +/// For each exported async function, UniFFI will create a scaffolding function that uses this to +/// create the [RustFutureHandle] to pass to the foreign code. +pub fn rust_future_new(future: F, tag: UT) -> RustFutureHandle where - // The future needs to be `Send`, since it will move to whatever thread the foreign executor - // chooses. However, it doesn't need to be `Sync', since we don't share references between - // threads (see do_wake()). - F: Future + Send, - T: FfiConverter, + // F is the future type returned by the exported async function. It needs to be Send + `static + // since it will move between threads for an indeterminate amount of time as the foreign + // executor calls polls it and the Rust executor wakes it. It does not need to by `Sync`, + // since we synchronize all access to the values. + F: Future + Send + 'static, + // T is the output of the Future. It needs to implement FfiConverter. Also it must be Send + + // 'static for the same reason as F. + T: FfiConverter + Send + 'static, + // UT is the UniFfiTag type, which is always a ZST. Send + 'static bound is to rustc happy. + UT: Send + 'static, { - future: UnsafeCell, - executor: ForeignExecutor, - wake_counter: AtomicU32, - callback: T::FutureCallback, - callback_data: *const (), + // Create a RustFuture and coerce to `Arc`, which is what we use to + // implement the FFI + let future_ffi = RustFuture::new(future, tag) as Arc; + // Box the Arc, to convert the wide pointer into a normal sized pointer so that we can pass it + // to the foreign code. + let boxed_ffi = Box::new(future_ffi); + // We can now create a RustFutureHandle + RustFutureHandle(Box::into_raw(boxed_ffi) as *mut ()) } -// Mark `RustFuture` as `Send` + `Sync`, since we will be sharing it between threads. -// This means we need to serialize access to any fields that aren't `Send` + `Sync` (`future`, `callback`, and `callback_data`). -// See `do_wake()` for details on this. +/// Poll a Rust future +/// +/// When the future is ready to progress the continuation will be called with the `data` value and +/// a [RustFuturePoll] value. For each [rust_future_poll] call the continuation will be called +/// exactly once. +/// +/// # Safety +/// +/// The [RustFutureHandle] must not previously have been passed to [rust_future_free] +pub unsafe fn rust_future_poll( + handle: RustFutureHandle, + continuation: RustFutureContinuation, + data: *const (), +) { + let future = &*(handle.0 as *mut Arc); + future.clone().ffi_poll(continuation, data) +} -unsafe impl Send for RustFuture -where - F: Future + Send, - T: FfiConverter, -{ +/// Cancel a Rust future +/// +/// Any current and future continuations will be immediately called with RustFuturePoll::Ready. +/// +/// # Safety +/// +/// The [RustFutureHandle] must not previously have been passed to [rust_future_free] +pub unsafe fn rust_future_cancel(handle: RustFutureHandle) { + let future = &*(handle.0 as *mut Arc); + future.clone().ffi_cancel() } -unsafe impl Sync for RustFuture -where - F: Future + Send, - T: FfiConverter, -{ +/// Complete a Rust future +/// +/// Note: the actually extern "C" scaffolding functions can't be generic, so we generate one for +/// each supported FFI type. +/// +/// # Safety +/// +/// The [RustFutureHandle] must not previously have been passed to [rust_future_free] +pub unsafe fn rust_future_complete( + handle: RustFutureHandle, + out_status: &mut RustCallStatus, +) -> T { + let future = &*(handle.0 as *mut Arc); + let mut return_value = T::ffi_default(); + let out_return = std::mem::transmute::<&mut T, &mut ()>(&mut return_value); + future.ffi_complete(out_return, out_status); + return_value } -impl RustFuture -where - F: Future + Send, - T: FfiConverter, -{ - pub fn new( - future: F, - executor_handle: ForeignExecutorHandle, - callback: T::FutureCallback, - callback_data: *const (), - ) -> Pin> { - let executor = - >::try_lift(executor_handle) - .expect("Error lifting ForeignExecutorHandle"); - Arc::pin(Self { - future: UnsafeCell::new(future), - wake_counter: AtomicU32::new(0), - executor, - callback, - callback_data, - }) - } +/// Free a Rust future, dropping the strong reference and releasing all references held by the +/// future. +/// +/// # Safety +/// +/// The [RustFutureHandle] must not previously have been passed to [rust_future_free] +pub unsafe fn rust_future_free(handle: RustFutureHandle) { + let future = Box::from_raw(handle.0 as *mut Arc); + future.ffi_free() +} - /// Wake up soon and poll our future. - /// - /// This method ensures that a call to `do_wake()` is scheduled. Only one call will be scheduled - /// at any time, even if `wake_soon` called multiple times from multiple threads. - pub fn wake(self: Pin>) { - if self.wake_counter.fetch_add(1, Ordering::Relaxed) == 0 { - self.schedule_do_wake(); +/// Thread-safe storage for a RustFutureContinuation +/// +/// The basic guarantee is that all continuations passed to [Self::store] are called exactly once +/// assuming that [Self::try_call_continuation] is called after the last store. This enables us to +/// uphold the [rust_future_poll] guarantee. +/// +/// AtomicContinuationCell uses atomic trickery to make all operations thread-safe but non-blocking. +struct AtomicContinuationCell { + state: AtomicU8, + stored: UnsafeCell>, +} + +impl AtomicContinuationCell { + /// Lock bit + const STATE_LOCK: u8 = 1 << 0; + /// Bit signalling that we should call the continuation + const STATE_NEEDS_CALL: u8 = 1 << 1; + /// Bit signalling that the RustFuture has been cancelled + const STATE_CANCELLED: u8 = 1 << 2; + + fn new() -> Self { + Self { + state: AtomicU8::new(0), + stored: UnsafeCell::new(None), } } - /// Schedule `do_wake`. + /// Try to take a lock, optionally setting the other bits + fn try_lock(&self, extra_bits: u8) -> bool { + let prev_state = self + .state + .fetch_or(Self::STATE_LOCK | extra_bits, Ordering::Acquire); + (prev_state & Self::STATE_LOCK) == 0 + } + + /// Release a lock, calling any stored continuation + fn unlock_and_call(&self) { + self.call_continuation_unchecked(); + self.state.fetch_and( + !(Self::STATE_LOCK | Self::STATE_NEEDS_CALL), + Ordering::Release, + ); + } + + /// Release a lock with the intention of keeping a stored continuation /// - /// `self` is consumed but _NOT_ dropped, it's purposely leaked via `into_raw()`. - /// `wake_callback()` will call `from_raw()` to reverse the leak. - fn schedule_do_wake(self: Pin>) { - unsafe { - let handle = self.executor.handle; - let raw_ptr = Arc::into_raw(Pin::into_inner_unchecked(self)); - // SAFETY: The `into_raw()` / `from_raw()` contract guarantees that our executor cannot - // be dropped before we call `from_raw()` on the raw pointer. This means we can safely - // use its handle to schedule a callback. - if !schedule_raw(handle, 0, Self::wake_callback, raw_ptr as *const ()) { - // There was an error scheduling the callback, drop the arc reference since - // `wake_callback()` will never be called - // - // Note: specifying the `` generic is a good safety measure. Things would go - // very bad if Rust inferred the wrong type. - // - // However, the `Pin<>` part doesn't matter since its `repr(transparent)`. - Arc::::decrement_strong_count(raw_ptr); + /// However, if another thread set the STATE_NEEDS_CALL or STATE_READY bit, then instead call + /// the stored continuation for them. + fn unlock_and_store(&self, new_continuation: RustFutureContinuation, data: *const ()) { + // Set the continuation + let stored = unsafe { &mut *self.stored.get() }; + if stored.is_some() { + log::error!("AtomicContinuationCell::unlock_and_store: continuation already set"); + self.call_continuation_unchecked(); + } + *stored = Some((new_continuation, data)); + + match self + .state + .compare_exchange(Self::STATE_LOCK, 0, Ordering::Release, Ordering::Relaxed) + { + // Success! + Ok(_) => (), + Err(_) => { + // Another thread set the STATE_NEEDS_CALL or STATE_READY bit, so we should call the + // continuation for them. + self.call_continuation_unchecked(); + // We can now unlock unconditionally + self.state.fetch_and( + !(Self::STATE_LOCK | Self::STATE_NEEDS_CALL), + Ordering::Release, + ); } } } - extern "C" fn wake_callback(self_ptr: *const (), status_code: RustTaskCallbackCode) { - // No matter what, call `Arc::from_raw()` to balance the `Arc::into_raw()` call in - // `schedule_do_wake()`. - let task = unsafe { Pin::new_unchecked(Arc::from_raw(self_ptr as *const Self)) }; - if status_code == RustTaskCallbackCode::Success { - // Only drive the future forward on `RustTaskCallbackCode::Success`. - // `RUST_TASK_CALLBACK_CANCELED` indicates the foreign executor has been cancelled / - // shutdown and we should not continue. - task.do_wake(); + // Take the data out of self.continuation. If it was set, then call the continuation. + // + // Only call this if you have the lock + fn call_continuation_unchecked(&self) { + let stored = unsafe { &mut *self.stored.get() }; + if let Some((continuation, data)) = stored.take() { + continuation(data, self.poll_code()); } } - // Does the work for wake, we take care to ensure this always runs in a serialized fashion. - fn do_wake(self: Pin>) { - // Store 1 in `waker_counter`, which we'll use at the end of this call. - self.wake_counter.store(1, Ordering::Relaxed); - - // Pin<&mut> from our UnsafeCell. &mut is is safe, since this is the only reference we - // ever take to `self.future` and calls to this function are serialized. Pin<> is safe - // since we never move the future out of `self.future`. - let future = unsafe { Pin::new_unchecked(&mut *self.future.get()) }; - let waker = self.make_waker(); + fn try_call_continuation(&self, cancelled: bool) { + let extra_bits = if cancelled { + Self::STATE_NEEDS_CALL | Self::STATE_CANCELLED + } else { + Self::STATE_NEEDS_CALL + }; + if self.try_lock(extra_bits) { + self.unlock_and_call(); + } + } - // Run the poll and lift the result if it's ready - let mut out_status = RustCallStatus::default(); - let result: Option> = rust_call_with_out_status( - &mut out_status, - // This closure uses a `&mut F` value, which means it's not UnwindSafe by default. If - // the closure panics, the future may be in an invalid state. + fn store(&self, continuation: RustFutureContinuation, data: *const ()) { + if self.try_lock(0) { + self.unlock_and_store(continuation, data); + } else { + // Failed to acquire the lock + // - If the other thread was calling `try_call_continuation`, that means they locked us out + // just before we could store the continuation. + // - If the other thread was calling `store`, then something weird happened and + // there's already a stored continuation. // - // However, we can safely use `AssertUnwindSafe` since a panic will lead the `Err()` - // case below. In that case, we will never run `do_wake()` again and will no longer - // access the future. - panic::AssertUnwindSafe(|| match future.poll(&mut Context::from_waker(&waker)) { - Poll::Pending => Ok(Poll::Pending), - Poll::Ready(v) => T::lower_return(v).map(Poll::Ready), - }), - ); + // In either case, call the continuation now. + continuation(data, self.poll_code()); + } + } - // All the main work is done, time to finish up - match result { - Some(Poll::Pending) => { - // Since we set `wake_counter` to 1 at the start of this function... - // - If it's > 1 now, then some other code tried to wake us while we were polling - // the future. Schedule another poll in this case. - // - If it's still 1, then exit after decrementing it. The next call to `wake()` - // will schedule a poll. - if self.wake_counter.fetch_sub(1, Ordering::Relaxed) > 1 { - self.schedule_do_wake(); - } - } - // Success! Call our callback. - // - // Don't decrement `wake_counter'. This way, if wake() is called in the future, we - // will just ignore it - Some(Poll::Ready(v)) => { - T::invoke_future_callback(self.callback, self.callback_data, v, out_status); - } - // Error/panic polling the future. Call the callback with a default value. - // `out_status` contains the error code and serialized error. Again, don't decrement - // `wake_counter'. - None => { - T::invoke_future_callback( - self.callback, - self.callback_data, - T::ReturnType::ffi_default(), - out_status, + fn poll_code(&self) -> RustFuturePoll { + if self.state.load(Ordering::Relaxed) & Self::STATE_CANCELLED == 0 { + RustFuturePoll::MaybeReady + } else { + RustFuturePoll::Ready + } + } + + fn is_cancelled(&self) -> bool { + self.state.load(Ordering::Relaxed) & Self::STATE_CANCELLED != 0 + } +} + +// AtomicContinuationCell is Send + Sync as long the previous code is working correctly. + +unsafe impl Send for AtomicContinuationCell {} +unsafe impl Sync for AtomicContinuationCell {} + +/// Wraps the actual future we're polling +/// +/// This tracks the pending/ready status and also handles the FFI conversions. +enum WrappedFuture +where + // See rust_future_new for an explanation of these trait bounds + F: Future + Send + 'static, + T: FfiConverter + Send + 'static, + UT: Send + 'static, +{ + Pending(F), + Ok(T::ReturnType), + Err(RustCallStatus), + Complete, +} + +impl WrappedFuture +where + // See rust_future_new for an explanation of these trait bounds + F: Future + Send + 'static, + T: FfiConverter + Send + 'static, + UT: Send + 'static, +{ + // Poll the future and check if it's ready or not + fn poll(&mut self, context: &mut Context<'_>) -> bool { + match self { + Self::Pending(f) => { + // SAFETY: We can call Pin::new_unchecked because: + // - This is the only time we get a &mut to `self.future` + // - Once we advance future output RustFuturePoll::Pending, we never poll it again. + // - We never move RustFuture, which is the our only parent type + // - RustFuture is private to this module so no other code can move it. + let pinned = unsafe { Pin::new_unchecked(f) }; + // Run the poll and lift the result if it's ready + let mut out_status = RustCallStatus::default(); + let result: Option> = rust_call_with_out_status( + &mut out_status, + // This closure uses a `&mut F` value, which means it's not UnwindSafe by + // default. If the future panics, it may be in an invalid state. + // + // However, we can safely use `AssertUnwindSafe` since a panic will lead the `None` + // case below and we will never poll the future again. + panic::AssertUnwindSafe(|| match pinned.poll(context) { + Poll::Pending => Ok(Poll::Pending), + Poll::Ready(v) => T::lower_return(v).map(Poll::Ready), + }), ); + + match result { + Some(Poll::Pending) => false, + Some(Poll::Ready(v)) => { + *self = Self::Ok(v); + true + } + None => { + *self = Self::Err(out_status); + true + } + } } + _ => true, + } + } + + fn complete(&mut self, out_return: &mut T::ReturnType, out_status: &mut RustCallStatus) { + let starting_value = std::mem::replace(self, Self::Complete); + + match starting_value { + Self::Ok(v) => *out_return = v, + Self::Err(call_status) => *out_status = call_status, + Self::Pending(_) => *out_status = RustCallStatus::cancelled(), + Self::Complete => *out_status = RustCallStatus::error("Future already complete"), }; } +} - fn make_waker(self: &Pin>) -> Waker { - // This is safe as long as we implement the waker interface correctly. - unsafe { - Waker::from_raw(RawWaker::new( - self.clone().into_raw(), - &Self::RAW_WAKER_VTABLE, - )) +// If F and T are Send, then WrappedFuture is too +// +// It's not by when T::ReturnType is a raw pointer. This impl is promising that we will treat the +// raw pointer properly, for example by not lifting it twice. +unsafe impl Send for WrappedFuture +where + // See rust_future_new for an explanation of these trait bounds + F: Future + Send + 'static, + T: FfiConverter + Send + 'static, + UT: Send + 'static, +{ +} + +/// Future that the foreign code is awaiting +struct RustFuture +where + // See rust_future_new for an explanation of these trait bounds + F: Future + Send + 'static, + T: FfiConverter + Send + 'static, + UT: Send + 'static, +{ + // This Mutex should never block if our code is working correctly, since there should not be + // multiple threads calling [Self::poll] and/or [Self::complete] at the same time. + future: Mutex>, + continuation: AtomicContinuationCell, + // UT is used as the generic parameter for FfiConverter. + // Let's model this with PhantomData as a function that inputs a UT value. + _phantom: PhantomData ()>, +} + +impl RustFuture +where + // See rust_future_new for an explanation of these trait bounds + F: Future + Send + 'static, + T: FfiConverter + Send + 'static, + UT: Send + 'static, +{ + fn new(future: F, _tag: UT) -> Arc { + Arc::new(Self { + future: Mutex::new(WrappedFuture::Pending(future)), + continuation: AtomicContinuationCell::new(), + _phantom: PhantomData, + }) + } + + fn poll(self: Arc, new_continuation: RustFutureContinuation, data: *const ()) { + let ready = self.continuation.is_cancelled() || { + let mut locked = self.future.lock().unwrap(); + let waker: std::task::Waker = Arc::clone(&self).into(); + locked.poll(&mut Context::from_waker(&waker)) + }; + if ready { + new_continuation(data, RustFuturePoll::Ready); + } else { + self.continuation.store(new_continuation, data); } } - /// SAFETY: Ensure that all calls to `into_raw()` are balanced with a call to `from_raw()` - fn into_raw(self: Pin>) -> *const () { - unsafe { Arc::into_raw(Pin::into_inner_unchecked(self)) as *const () } + fn wake(&self) { + self.continuation.try_call_continuation(false) } - /// Consume a pointer to get an arc - /// - /// SAFETY: The pointer must have come from `into_raw()` or was cloned with `raw_clone()`. - /// Once a pointer is passed into this function, it should no longer be used. - fn from_raw(self_ptr: *const ()) -> Pin> { - unsafe { Pin::new_unchecked(Arc::from_raw(self_ptr as *const Self)) } + fn cancel(&self) { + self.continuation.try_call_continuation(true); } - // Implement the waker interface by defining a RawWakerVTable - // - // We could also handle this by implementing the `Wake` interface, but that uses an `Arc` - // instead of a `Pin>` and in theory it could try to move the `T` value out of the arc - // with something like `Arc::try_unwrap()` which would break the pinning contract with - // `Future`. Implementing this using `RawWakerVTable` allows us verify that this doesn't - // happen. - const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( - Self::raw_clone, - Self::raw_wake, - Self::raw_wake_by_ref, - Self::raw_drop, - ); - - /// This function will be called when the `RawWaker` gets cloned, e.g. when - /// the `Waker` in which the `RawWaker` is stored gets cloned. - unsafe fn raw_clone(self_ptr: *const ()) -> RawWaker { - Arc::increment_strong_count(self_ptr as *const Self); - RawWaker::new(self_ptr, &Self::RAW_WAKER_VTABLE) - } - - /// This function will be called when `wake` is called on the `Waker`. It - /// must wake up the task associated with this `RawWaker`. - unsafe fn raw_wake(self_ptr: *const ()) { - Self::from_raw(self_ptr).wake() - } - - /// This function will be called when `wake_by_ref` is called on the - /// `Waker`. It must wake up the task associated with this `RawWaker`. - unsafe fn raw_wake_by_ref(self_ptr: *const ()) { - // This could be optimized by only incrementing the strong count if we end up calling - // `schedule_do_wake()`, but it's not clear that's worth the extra complexity - Arc::increment_strong_count(self_ptr as *const Self); - Self::from_raw(self_ptr).wake() - } - - /// This function gets called when a `RawWaker` gets dropped. - /// This function gets called when a `RawWaker` gets dropped. - unsafe fn raw_drop(self_ptr: *const ()) { - drop(Self::from_raw(self_ptr)) + fn complete(&self, return_value: &mut T::ReturnType, call_status: &mut RustCallStatus) { + self.future + .lock() + .unwrap() + .complete(return_value, call_status) + } + + fn free(self: Arc) { + // Call any leftover continuation callbacks now + self.continuation.try_call_continuation(true); + // Ensure we drop our inner future, releasing all held references + *self.future.lock().unwrap() = WrappedFuture::Complete; } } -#[cfg(test)] -mod tests { - use super::*; - use crate::{try_lift_from_rust_buffer, MockEventLoop}; - use std::sync::Weak; +impl Wake for RustFuture +where + // See rust_future_new for an explanation of these trait bounds + F: Future + Send + 'static, + T: FfiConverter + Send + 'static, + UT: Send + 'static, +{ + // Required method + fn wake(self: Arc) { + self.deref().wake() + } - // Mock future that we can manually control using an Option<> - struct MockFuture(Option>); + // Provided method + fn wake_by_ref(self: &Arc) { + self.deref().wake() + } +} - impl Future for MockFuture { - type Output = Result; +/// RustFuture FFI trait. This allows `Arc>` to be cast to +/// `Arc`, which is needed to implement the public FFI API. In particular, this +/// allows you to use RustFuture functionality without knowing the concrete Future type, which is +/// unnamable. +#[doc(hidden)] +trait RustFutureFFI { + fn ffi_poll(self: Arc, continuation: RustFutureContinuation, data: *const ()); + fn ffi_cancel(&self); + unsafe fn ffi_complete(&self, return_value: &mut (), call_status: &mut RustCallStatus); + fn ffi_free(self: Arc); +} - fn poll(self: Pin<&mut Self>, _context: &mut Context<'_>) -> Poll { - match &self.0 { - Some(v) => Poll::Ready(v.clone()), - None => Poll::Pending, - } - } +impl RustFutureFFI for RustFuture +where + // See rust_future_new for an explanation of these trait bounds + F: Future + Send + 'static, + T: FfiConverter + Send + 'static, + UT: Send + 'static, +{ + fn ffi_poll(self: Arc, continuation: RustFutureContinuation, data: *const ()) { + self.poll(continuation, data) } - // Type alias for the RustFuture we'll use in our tests - type TestRustFuture = RustFuture, crate::UniFfiTag>; + fn ffi_cancel(&self) { + self.cancel() + } - // Stores the result that we send to the foreign code - #[derive(Default)] - struct MockForeignResult { - value: i8, - status: RustCallStatus, + unsafe fn ffi_complete(&self, return_value: &mut (), call_status: &mut RustCallStatus) { + // Unsafely transmute return_value. This works as long as the foreign code calls the + // correct `rust_future_complete_*` function. + let return_value = std::mem::transmute::<&mut (), &mut T::ReturnType>(return_value); + self.complete(return_value, call_status) } - extern "C" fn mock_foreign_callback(data_ptr: *const (), value: i8, status: RustCallStatus) { - println!("mock_foreign_callback: {value} {data_ptr:?}"); - let result: &mut Option = - unsafe { &mut *(data_ptr as *mut Option) }; - *result = Some(MockForeignResult { value, status }); + fn ffi_free(self: Arc) { + self.free(); } +} - // Bundles everything together so that we can run some tests - struct TestFutureEnvironment { - rust_future: Pin>, - foreign_result: Pin>>, +#[cfg(test)] +mod tests { + use super::*; + use crate::{try_lift_from_rust_buffer, RustBuffer, RustCallStatusCode}; + use once_cell::sync::OnceCell; + use std::task::Waker; + + // Sender/Receiver pair that we use for testing + struct Channel { + result: Option>, + waker: Option, } - impl TestFutureEnvironment { - fn new(eventloop: &Arc) -> Self { - let foreign_result = Box::pin(None); - let foreign_result_ptr = &*foreign_result as *const Option<_> as *const (); + struct Sender(Arc>); - let rust_future = TestRustFuture::new( - MockFuture(None), - eventloop.new_handle(), - mock_foreign_callback, - foreign_result_ptr, - ); - Self { - rust_future, - foreign_result, + impl Sender { + fn wake(&self) { + let inner = self.0.lock().unwrap(); + if let Some(waker) = &inner.waker { + waker.wake_by_ref(); } } - fn wake(&self) { - self.rust_future.clone().wake(); + fn send(&self, value: Result) { + let mut inner = self.0.lock().unwrap(); + if inner.result.replace(value).is_some() { + panic!("value already sent"); + } + if let Some(waker) = &inner.waker { + waker.wake_by_ref(); + } } + } - fn rust_future_weak(&self) -> Weak { - // It seems like there's not a great way to get an &Arc from a Pin, so we need to - // do a little dance here - Arc::downgrade(&Pin::into_inner(Clone::clone(&self.rust_future))) - } + struct Receiver(Arc>); - fn complete_future(&self, value: Result) { - unsafe { - (*self.rust_future.future.get()).0 = Some(value); + impl Future for Receiver { + type Output = Result; + + fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll> { + let mut inner = self.0.lock().unwrap(); + match &inner.result { + Some(v) => Poll::Ready(v.clone()), + None => { + inner.waker = Some(context.waker().clone()); + Poll::Pending + } } } } + // Create a sender and rust future that we can use for testing + fn channel() -> (Sender, Arc) { + let channel = Arc::new(Mutex::new(Channel { + result: None, + waker: None, + })); + let rust_future = RustFuture::new(Receiver(channel.clone()), crate::UniFfiTag); + (Sender(channel), rust_future) + } + + /// Poll a Rust future and get an OnceCell that's set when the continuation is called + fn poll(rust_future: &Arc) -> OnceCell { + let cell = OnceCell::new(); + let cell_ptr = &cell as *const OnceCell as *const (); + rust_future.clone().ffi_poll(poll_continuation, cell_ptr); + cell + } + + extern "C" fn poll_continuation(data: *const (), code: RustFuturePoll) { + let cell = unsafe { &*(data as *const OnceCell) }; + cell.set(code).expect("Error setting OnceCell"); + } + + fn complete(rust_future: Arc) -> (RustBuffer, RustCallStatus) { + let mut out_return = RustBuffer::new(); + let mut out_status_code = RustCallStatus::default(); + unsafe { + rust_future.ffi_complete( + std::mem::transmute::<_, &mut ()>(&mut out_return), + &mut out_status_code, + ); + } + (out_return, out_status_code) + } + #[test] - fn test_wake() { - let eventloop = MockEventLoop::new(); - let mut test_env = TestFutureEnvironment::new(&eventloop); - // Initially, we shouldn't have a result and nothing should be scheduled - assert!(test_env.foreign_result.is_none()); - assert_eq!(eventloop.call_count(), 0); - - // wake() should schedule a call - test_env.wake(); - assert_eq!(eventloop.call_count(), 1); - - // When that call runs, we should still not have a result yet - eventloop.run_all_calls(); - assert!(test_env.foreign_result.is_none()); - assert_eq!(eventloop.call_count(), 0); - - // Multiple wakes should only result in 1 scheduled call - test_env.wake(); - test_env.wake(); - assert_eq!(eventloop.call_count(), 1); - - // Make the future ready, which should call mock_foreign_callback and set the result - test_env.complete_future(Ok(true)); - eventloop.run_all_calls(); - let result = test_env - .foreign_result - .take() - .expect("Expected result to be set"); - assert_eq!(result.value, 1); - assert_eq!(result.status.code, 0); - assert_eq!(eventloop.call_count(), 0); - - // Future wakes shouldn't schedule any calls - test_env.wake(); - assert_eq!(eventloop.call_count(), 0); + fn test_success() { + let (sender, rust_future) = channel(); + + // Test polling the rust future before it's ready + let continuation_result = poll(&rust_future); + assert_eq!(continuation_result.get(), None); + sender.wake(); + assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady)); + + // Test polling the rust future when it's ready + let continuation_result = poll(&rust_future); + assert_eq!(continuation_result.get(), None); + sender.send(Ok("All done".into())); + assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady)); + + // Future polls should immediately return ready + let continuation_result = poll(&rust_future); + assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + + // Complete the future + let (return_buf, call_status) = complete(rust_future); + assert_eq!(call_status.code, RustCallStatusCode::Success); + assert_eq!( + >::try_lift(return_buf).unwrap(), + "All done" + ); } #[test] fn test_error() { - let eventloop = MockEventLoop::new(); - let mut test_env = TestFutureEnvironment::new(&eventloop); - test_env.complete_future(Err("Something went wrong".into())); - test_env.wake(); - eventloop.run_all_calls(); - let result = test_env - .foreign_result - .take() - .expect("Expected result to be set"); - assert_eq!(result.status.code, 1); + let (sender, rust_future) = channel(); + + let continuation_result = poll(&rust_future); + assert_eq!(continuation_result.get(), None); + sender.send(Err("Something went wrong".into())); + assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady)); + + let continuation_result = poll(&rust_future); + assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + + let (_, call_status) = complete(rust_future); + assert_eq!(call_status.code, RustCallStatusCode::Error); unsafe { assert_eq!( try_lift_from_rust_buffer::( - result.status.error_buf.assume_init() + call_status.error_buf.assume_init() ) .unwrap(), String::from("Something went wrong"), ) } - assert_eq!(eventloop.call_count(), 0); - } - - #[test] - fn test_raw_clone_and_drop() { - let test_env = TestFutureEnvironment::new(&MockEventLoop::new()); - let waker = test_env.rust_future.make_waker(); - let weak_ref = test_env.rust_future_weak(); - assert_eq!(weak_ref.strong_count(), 2); - let waker2 = waker.clone(); - assert_eq!(weak_ref.strong_count(), 3); - drop(waker); - assert_eq!(weak_ref.strong_count(), 2); - drop(waker2); - assert_eq!(weak_ref.strong_count(), 1); - drop(test_env); - assert_eq!(weak_ref.strong_count(), 0); - assert!(weak_ref.upgrade().is_none()); } + // Once `complete` is called, the inner future should be released, even if wakers still hold a + // reference to the RustFuture #[test] - fn test_raw_wake() { - let eventloop = MockEventLoop::new(); - let test_env = TestFutureEnvironment::new(&eventloop); - let waker = test_env.rust_future.make_waker(); - let weak_ref = test_env.rust_future_weak(); - // `test_env` and `waker` both hold a strong reference to the `RustFuture` - assert_eq!(weak_ref.strong_count(), 2); + fn test_cancel() { + let (_sender, rust_future) = channel(); - // wake_by_ref() should schedule a wake - waker.wake_by_ref(); - assert_eq!(eventloop.call_count(), 1); + let continuation_result = poll(&rust_future); + assert_eq!(continuation_result.get(), None); + rust_future.ffi_cancel(); + // Cancellation should immediately invoke the callback with RustFuturePoll::Ready + assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); - // Once the wake runs, the strong could should not have changed - eventloop.run_all_calls(); - assert_eq!(weak_ref.strong_count(), 2); + // Future polls should immediately invoke the callback with RustFuturePoll::Ready + let continuation_result = poll(&rust_future); + assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); - // wake() should schedule a wake - waker.wake(); - assert_eq!(eventloop.call_count(), 1); + let (_, call_status) = complete(rust_future); + assert_eq!(call_status.code, RustCallStatusCode::Cancelled); + } - // Once the wake runs, the strong have decremented, since wake() consumes the waker - eventloop.run_all_calls(); - assert_eq!(weak_ref.strong_count(), 1); + // Once `free` is called, the inner future should be released, even if wakers still hold a + // reference to the RustFuture + #[test] + fn test_release_future() { + let (sender, rust_future) = channel(); + // Create a weak reference to the channel to use to check if rust_future has dropped its + // future. + let channel_weak = Arc::downgrade(&sender.0); + drop(sender); + // Create an extra ref to rust_future, simulating a waker that still holds a reference to + // it + let rust_future2 = rust_future.clone(); + + // Complete the rust future + rust_future.ffi_free(); + // Even though rust_future is still alive, the channel shouldn't be + assert!(Arc::strong_count(&rust_future2) > 0); + assert_eq!(channel_weak.strong_count(), 0); + assert!(channel_weak.upgrade().is_none()); } - // Test trying to create a RustFuture before the executor is shutdown. + // If `free` is called with a continuation still stored, we should call it them then. // - // The main thing we're testing is that we correctly drop the Future in this case + // This shouldn't happen in practice, but it seems like good defensive programming #[test] - fn test_executor_shutdown() { - let eventloop = MockEventLoop::new(); - eventloop.shutdown(); - let test_env = TestFutureEnvironment::new(&eventloop); - let weak_ref = test_env.rust_future_weak(); - // When we wake the future, it should try to schedule a callback and fail. This should - // cause the future to be dropped - test_env.wake(); - drop(test_env); - assert!(weak_ref.upgrade().is_none()); - } - - // Similar run a similar test to the last, but simulate an executor shutdown after the future was - // scheduled, but before the callback is called. - #[test] - fn test_executor_shutdown_after_schedule() { - let eventloop = MockEventLoop::new(); - let test_env = TestFutureEnvironment::new(&eventloop); - let weak_ref = test_env.rust_future_weak(); - test_env.complete_future(Ok(true)); - test_env.wake(); - eventloop.shutdown(); - eventloop.run_all_calls(); - - // Test that the foreign async side wasn't completed. Even though we could have - // driven the future to completion, we shouldn't have since the executor was shutdown - assert!(test_env.foreign_result.is_none()); - // Also test that we've dropped all references to the future - drop(test_env); - assert!(weak_ref.upgrade().is_none()); + fn test_complete_with_stored_continuation() { + let (_sender, rust_future) = channel(); + + let continuation_result = poll(&rust_future); + rust_future.ffi_free(); + assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); } } diff --git a/uniffi_core/src/ffi_converter_impls.rs b/uniffi_core/src/ffi_converter_impls.rs index 789014c9f3..a84a24e49d 100644 --- a/uniffi_core/src/ffi_converter_impls.rs +++ b/uniffi_core/src/ffi_converter_impls.rs @@ -23,8 +23,8 @@ /// "UT" means an abitrary `UniFfiTag` type. use crate::{ check_remaining, ffi_converter_default_return, ffi_converter_rust_buffer_lift_and_lower, - lower_into_rust_buffer, metadata, try_lift_from_rust_buffer, FfiConverter, FutureCallback, - MetadataBuffer, Result, RustBuffer, RustCallStatus, UnexpectedUniFFICallbackError, + lower_into_rust_buffer, metadata, try_lift_from_rust_buffer, FfiConverter, MetadataBuffer, + Result, RustBuffer, UnexpectedUniFFICallbackError, }; use anyhow::bail; use bytes::buf::{Buf, BufMut}; @@ -120,9 +120,6 @@ unsafe impl FfiConverter for () { type FfiType = (); // Returning `()` is FFI-safe, since it gets translated into a void return type ReturnType = (); - // However, we can't use `FutureCallback<()>` since passing `()` as an argument is not - // FFI-safe. So we used an arbitrary non-ZST type instead. - type FutureCallback = FutureCallback; fn try_lift(_: Self::FfiType) -> Result<()> { Ok(()) @@ -140,15 +137,6 @@ unsafe impl FfiConverter for () { Ok(()) } - fn invoke_future_callback( - callback: Self::FutureCallback, - callback_data: *const (), - _return_value: (), - call_status: RustCallStatus, - ) { - callback(callback_data, 0, call_status) - } - const TYPE_ID_META: MetadataBuffer = MetadataBuffer::from_code(metadata::codes::TYPE_UNIT); } @@ -427,7 +415,7 @@ where .concat(V::TYPE_ID_META); } -/// FFI support for ForeignSchedulers +/// FFI support for [ForeignExecutor] /// /// These are passed over the FFI as opaque pointer-sized types representing the foreign executor. /// The foreign bindings may use an actual pointer to the executor object, or a usized integer @@ -481,7 +469,6 @@ where { type FfiType = (); // Placeholder while lower/lift/serializing is unimplemented type ReturnType = R::ReturnType; - type FutureCallback = R::FutureCallback; fn try_lift(_: Self::FfiType) -> Result { unimplemented!("try_lift"); @@ -520,15 +507,6 @@ where Err(E::handle_callback_unexpected_error(e)) } - fn invoke_future_callback( - callback: Self::FutureCallback, - callback_data: *const (), - return_value: Self::ReturnType, - call_status: RustCallStatus, - ) { - R::invoke_future_callback(callback, callback_data, return_value, call_status) - } - const TYPE_ID_META: MetadataBuffer = MetadataBuffer::from_code(metadata::codes::TYPE_RESULT) .concat(R::TYPE_ID_META) .concat(E::TYPE_ID_META); diff --git a/uniffi_core/src/ffi_converter_traits.rs b/uniffi_core/src/ffi_converter_traits.rs index 068484f069..47e66a07d7 100644 --- a/uniffi_core/src/ffi_converter_traits.rs +++ b/uniffi_core/src/ffi_converter_traits.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use crate::{ - try_lift_from_rust_buffer, FfiDefault, MetadataBuffer, Result, RustBuffer, RustCallStatus, + try_lift_from_rust_buffer, FfiDefault, MetadataBuffer, Result, RustBuffer, UnexpectedUniFFICallbackError, }; @@ -67,12 +67,6 @@ pub unsafe trait FfiConverter: Sized { /// This is usually the same as `FfiType`, but `Result<>` has specialized handling. type ReturnType: FfiDefault; - /// The `FutureCallback` type used for async functions - /// - /// This is almost always `FutureCallback`. The one exception is the - /// unit type, see that `FfiConverter` impl for details. - type FutureCallback: Copy; - /// Lower a rust value of the target type, into an FFI value of type Self::FfiType. /// /// This trait method is used for sending data from rust to the foreign language code, @@ -151,14 +145,6 @@ pub unsafe trait FfiConverter: Sized { /// from it (but will not mutate the actual contents of the slice). fn try_read(buf: &mut &[u8]) -> Result; - /// Invoke a `FutureCallback` to complete a async call - fn invoke_future_callback( - callback: Self::FutureCallback, - callback_data: *const (), - return_value: Self::ReturnType, - call_status: RustCallStatus, - ); - /// Type ID metadata, serialized into a [MetadataBuffer] const TYPE_ID_META: MetadataBuffer; } @@ -177,7 +163,6 @@ pub unsafe trait FfiConverter: Sized { pub unsafe trait FfiConverterArc: Send + Sync { type FfiType; type ReturnType: FfiDefault; - type FutureCallback: Copy; fn lower(obj: Arc) -> Self::FfiType; fn lower_return(obj: Arc) -> Result; @@ -193,12 +178,7 @@ pub unsafe trait FfiConverterArc: Send + Sync { } fn write(obj: Arc, buf: &mut Vec); fn try_read(buf: &mut &[u8]) -> Result>; - fn invoke_future_callback( - callback: Self::FutureCallback, - callback_data: *const (), - return_value: Self::ReturnType, - call_status: RustCallStatus, - ); + const TYPE_ID_META: MetadataBuffer; } @@ -208,7 +188,6 @@ where { type FfiType = T::FfiType; type ReturnType = T::ReturnType; - type FutureCallback = T::FutureCallback; fn lower(obj: Self) -> Self::FfiType { T::lower(obj) @@ -242,14 +221,5 @@ where T::try_read(buf) } - fn invoke_future_callback( - callback: Self::FutureCallback, - callback_data: *const (), - return_value: Self::ReturnType, - call_status: RustCallStatus, - ) { - T::invoke_future_callback(callback, callback_data, return_value, call_status) - } - const TYPE_ID_META: MetadataBuffer = T::TYPE_ID_META; } diff --git a/uniffi_core/src/lib.rs b/uniffi_core/src/lib.rs index f5e4a26cbb..d81f13ffdc 100644 --- a/uniffi_core/src/lib.rs +++ b/uniffi_core/src/lib.rs @@ -184,20 +184,10 @@ pub fn try_lift_from_rust_buffer, UT>(v: RustBuffer) -> Resu macro_rules! ffi_converter_default_return { ($uniffi_tag:ty) => { type ReturnType = >::FfiType; - type FutureCallback = $crate::FutureCallback; fn lower_return(v: Self) -> ::std::result::Result { Ok(>::lower(v)) } - - fn invoke_future_callback( - callback: Self::FutureCallback, - callback_data: *const (), - return_value: Self::ReturnType, - call_status: $crate::RustCallStatus, - ) { - callback(callback_data, return_value, call_status); - } }; } @@ -264,7 +254,6 @@ macro_rules! do_ffi_converter_forward { unsafe impl $crate::$trait<$new_impl_tag> for $T { type FfiType = <$T as $crate::$trait<$existing_impl_tag>>::FfiType; type ReturnType = <$T as $crate::$trait<$existing_impl_tag>>::FfiType; - type FutureCallback = <$T as $crate::$trait<$existing_impl_tag>>::FutureCallback; fn lower(obj: $rust_type) -> Self::FfiType { <$T as $crate::$trait<$existing_impl_tag>>::lower(obj) @@ -288,20 +277,6 @@ macro_rules! do_ffi_converter_forward { <$T as $crate::$trait<$existing_impl_tag>>::try_read(buf) } - fn invoke_future_callback( - callback: Self::FutureCallback, - callback_data: *const (), - return_value: Self::ReturnType, - call_status: $crate::RustCallStatus, - ) { - <$T as $crate::$trait<$existing_impl_tag>>::invoke_future_callback( - callback, - callback_data, - return_value, - call_status, - ) - } - const TYPE_ID_META: ::uniffi::MetadataBuffer = <$T as $crate::$trait<$existing_impl_tag>>::TYPE_ID_META; } diff --git a/uniffi_macros/src/export.rs b/uniffi_macros/src/export.rs index c021e85968..e0aa3105f4 100644 --- a/uniffi_macros/src/export.rs +++ b/uniffi_macros/src/export.rs @@ -187,7 +187,6 @@ pub(crate) fn ffi_converter_trait_impl(trait_ident: &Ident, tag: Option<&Path>) unsafe #impl_spec { type FfiType = *const ::std::os::raw::c_void; type ReturnType = Self::FfiType; - type FutureCallback = ::uniffi::FutureCallback; fn lower(obj: ::std::sync::Arc) -> Self::FfiType { ::std::boxed::Box::into_raw(::std::boxed::Box::new(obj)) as *const ::std::os::raw::c_void @@ -218,15 +217,6 @@ pub(crate) fn ffi_converter_trait_impl(trait_ident: &Ident, tag: Option<&Path>) Ok(>::lower(v)) } - fn invoke_future_callback( - callback: Self::FutureCallback, - callback_data: *const (), - return_value: Self::ReturnType, - call_status: ::uniffi::RustCallStatus, - ) { - callback(callback_data, return_value, call_status); - } - const TYPE_ID_META: ::uniffi::MetadataBuffer = ::uniffi::MetadataBuffer::from_code(::uniffi::metadata::codes::TYPE_INTERFACE) .concat_str(#mod_path) .concat_str(#name) diff --git a/uniffi_macros/src/export/scaffolding.rs b/uniffi_macros/src/export/scaffolding.rs index addc76752c..2c9598e67c 100644 --- a/uniffi_macros/src/export/scaffolding.rs +++ b/uniffi_macros/src/export/scaffolding.rs @@ -188,25 +188,10 @@ fn gen_ffi_function( quote! { #[doc(hidden)] #[no_mangle] - pub extern "C" fn #ffi_ident( - #(#params,)* - uniffi_executor_handle: ::uniffi::ForeignExecutorHandle, - uniffi_callback: <#return_ty as ::uniffi::FfiConverter>::FutureCallback, - uniffi_callback_data: *const (), - uniffi_call_status: &mut ::uniffi::RustCallStatus, - ) { + pub extern "C" fn #ffi_ident(#(#params,)*) -> ::uniffi::RustFutureHandle { ::uniffi::deps::log::debug!(#name); - ::uniffi::rust_call(uniffi_call_status, || { - #pre_fn_call; - let uniffi_rust_future = ::uniffi::RustFuture::<_, #return_ty, crate::UniFfiTag>::new( - #future_expr, - uniffi_executor_handle, - uniffi_callback, - uniffi_callback_data - ); - uniffi_rust_future.wake(); - Ok(()) - }); + #pre_fn_call; + ::uniffi::rust_future_new(#future_expr, crate::UniFfiTag) } } }) diff --git a/uniffi_macros/src/object.rs b/uniffi_macros/src/object.rs index d7f0307c64..2158275cf9 100644 --- a/uniffi_macros/src/object.rs +++ b/uniffi_macros/src/object.rs @@ -74,7 +74,6 @@ pub(crate) fn interface_impl(ident: &Ident, tag: Option<&Path>) -> TokenStream { // Don't use a pointer to as that requires a `pub ` type FfiType = *const ::std::os::raw::c_void; type ReturnType = *const ::std::os::raw::c_void; - type FutureCallback = ::uniffi::FutureCallback; /// When lowering, we have an owned `Arc` and we transfer that ownership /// to the foreign-language code, "leaking" it out of Rust's ownership system @@ -130,15 +129,6 @@ pub(crate) fn interface_impl(ident: &Ident, tag: Option<&Path>) -> TokenStream { Ok(>::lower(v)) } - fn invoke_future_callback( - callback: Self::FutureCallback, - callback_data: *const (), - return_value: Self::ReturnType, - call_status: ::uniffi::RustCallStatus, - ) { - callback(callback_data, return_value, call_status); - } - const TYPE_ID_META: ::uniffi::MetadataBuffer = ::uniffi::MetadataBuffer::from_code(::uniffi::metadata::codes::TYPE_INTERFACE) .concat_str(#mod_path) .concat_str(#name) diff --git a/uniffi_macros/src/setup_scaffolding.rs b/uniffi_macros/src/setup_scaffolding.rs index ce9bb534b7..e499d9a51d 100644 --- a/uniffi_macros/src/setup_scaffolding.rs +++ b/uniffi_macros/src/setup_scaffolding.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use proc_macro2::TokenStream; +use proc_macro2::{Ident, TokenStream}; use quote::{format_ident, quote}; use syn::Result; @@ -22,6 +22,69 @@ pub fn setup_scaffolding(namespace: String) -> Result { let reexport_hack_ident = format_ident!("{namespace}_uniffi_reexport_hack"); let ffi_foreign_executor_callback_set_ident = format_ident!("ffi_{namespace}_foreign_executor_callback_set"); + let ffi_rust_future_poll = format_ident!("ffi_{namespace}_rust_future_poll"); + let ffi_rust_future_cancel = format_ident!("ffi_{namespace}_rust_future_cancel"); + let ffi_rust_future_free = format_ident!("ffi_{namespace}_rust_future_free"); + + let (ffi_rust_future_complete_types, ffi_rust_future_complete_names): ( + Vec, + Vec, + ) = [ + ( + quote! { u8 }, + format_ident!("ffi_{namespace}_rust_future_complete_u8"), + ), + ( + quote! { i8 }, + format_ident!("ffi_{namespace}_rust_future_complete_i8"), + ), + ( + quote! { u16 }, + format_ident!("ffi_{namespace}_rust_future_complete_u16"), + ), + ( + quote! { i16 }, + format_ident!("ffi_{namespace}_rust_future_complete_i16"), + ), + ( + quote! { u32 }, + format_ident!("ffi_{namespace}_rust_future_complete_u32"), + ), + ( + quote! { i32 }, + format_ident!("ffi_{namespace}_rust_future_complete_i32"), + ), + ( + quote! { u64 }, + format_ident!("ffi_{namespace}_rust_future_complete_u64"), + ), + ( + quote! { i64 }, + format_ident!("ffi_{namespace}_rust_future_complete_i64"), + ), + ( + quote! { f32 }, + format_ident!("ffi_{namespace}_rust_future_complete_f32"), + ), + ( + quote! { f64 }, + format_ident!("ffi_{namespace}_rust_future_complete_f64"), + ), + ( + quote! { *const ::std::ffi::c_void }, + format_ident!("ffi_{namespace}_rust_future_complete_pointer"), + ), + ( + quote! { ::uniffi::RustBuffer }, + format_ident!("ffi_{namespace}_rust_future_complete_rust_buffer"), + ), + ( + quote! { () }, + format_ident!("ffi_{namespace}_rust_future_complete_void"), + ), + ] + .into_iter() + .unzip(); Ok(quote! { // Unit struct to parameterize the FfiConverter trait. @@ -93,6 +156,43 @@ pub fn setup_scaffolding(namespace: String) -> Result { uniffi::ffi::foreign_executor_callback_set(callback) } + #[allow(clippy::missing_safety_doc, missing_docs)] + #[doc(hidden)] + #[no_mangle] + pub unsafe extern "C" fn #ffi_rust_future_poll( + handle: ::uniffi::RustFutureHandle, + continuation: ::uniffi::RustFutureContinuation, + data: *const () + ) { + ::uniffi::ffi::rust_future_poll(handle, continuation, data); + } + + #[allow(clippy::missing_safety_doc, missing_docs)] + #[doc(hidden)] + #[no_mangle] + pub unsafe extern "C" fn #ffi_rust_future_cancel(handle: ::uniffi::RustFutureHandle) { + ::uniffi::ffi::rust_future_cancel(handle) + } + + #( + #[allow(clippy::missing_safety_doc, missing_docs)] + #[doc(hidden)] + #[no_mangle] + pub unsafe extern "C" fn #ffi_rust_future_complete_names( + handle: ::uniffi::RustFutureHandle, + out_status: &mut ::uniffi::RustCallStatus + ) -> #ffi_rust_future_complete_types { + ::uniffi::ffi::rust_future_complete(handle, out_status) + } + )* + + #[allow(clippy::missing_safety_doc, missing_docs)] + #[doc(hidden)] + #[no_mangle] + pub unsafe extern "C" fn #ffi_rust_future_free(handle: ::uniffi::RustFutureHandle) { + ::uniffi::ffi::rust_future_free(handle) + } + // Code to re-export the UniFFI scaffolding functions. // // Rust won't always re-export the functions from dependencies diff --git a/uniffi_meta/src/lib.rs b/uniffi_meta/src/lib.rs index a55f4ad613..f2ec0fbb4b 100644 --- a/uniffi_meta/src/lib.rs +++ b/uniffi_meta/src/lib.rs @@ -23,7 +23,7 @@ mod metadata; // `docs/uniffi-versioning.md` for details. // // Once we get to 1.0, then we'll need to update the scheme to something like 100 + major_version -pub const UNIFFI_CONTRACT_VERSION: u32 = 23; +pub const UNIFFI_CONTRACT_VERSION: u32 = 24; /// Similar to std::hash::Hash. ///