Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WASI] WasiEventLoop to keep Task alive #106633

Merged
merged 11 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable
using System;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ CancellationToken cancellationToken
await Task.WhenAll(
new Task<ITypes.IncomingResponse?>[]
{
SendRequestAsync(outgoingRequest),
SendRequestAsync(outgoingRequest, cancellationToken),
sendContent()
}
)
Expand Down Expand Up @@ -279,7 +279,8 @@ await Task.WhenAll(
}

private static async Task<ITypes.IncomingResponse?> SendRequestAsync(
ITypes.OutgoingRequest request
ITypes.OutgoingRequest request,
CancellationToken cancellationToken
)
{
ITypes.FutureIncomingResponse future;
Expand Down Expand Up @@ -314,7 +315,7 @@ ITypes.OutgoingRequest request
}
else
{
await RegisterWasiPollable(future.Subscribe()).ConfigureAwait(false);
await RegisterWasiPollable(future.Subscribe(), cancellationToken).ConfigureAwait(false);
}
}
}
Expand Down Expand Up @@ -461,7 +462,7 @@ private static async Task SendContentAsync(HttpContent? content, Stream stream)
}
}

private static Task RegisterWasiPollable(IPoll.Pollable pollable)
private static Task RegisterWasiPollable(IPoll.Pollable pollable, CancellationToken cancellationToken)
{
var handle = pollable.Handle;

Expand All @@ -470,12 +471,12 @@ private static Task RegisterWasiPollable(IPoll.Pollable pollable)
pollable.Handle = 0;
GC.SuppressFinalize(pollable);

return CallRegisterWasiPollableHandle((Thread)null!, handle);
return CallRegisterWasiPollableHandle((Thread)null!, handle, cancellationToken);

}

[UnsafeAccessor(UnsafeAccessorKind.StaticMethod, Name = "RegisterWasiPollableHandle")]
private static extern Task CallRegisterWasiPollableHandle(Thread t, int handle);
private static extern Task CallRegisterWasiPollableHandle(Thread t, int handle, CancellationToken cancellationToken);

private sealed class InputStream : Stream
{
Expand Down Expand Up @@ -562,7 +563,7 @@ CancellationToken cancellationToken
var buffer = result;
if (buffer.Length == 0)
{
await RegisterWasiPollable(stream.Subscribe())
await RegisterWasiPollable(stream.Subscribe(), cancellationToken)
.ConfigureAwait(false);
}
else
Expand Down Expand Up @@ -699,7 +700,7 @@ CancellationToken cancellationToken
var count = (int)stream.CheckWrite();
if (count == 0)
{
await RegisterWasiPollable(stream.Subscribe()).ConfigureAwait(false);
await RegisterWasiPollable(stream.Subscribe(), cancellationToken).ConfigureAwait(false);
}
else if (offset == limit)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Prerequisites:
# powershell
# tar
# [cargo](https://rustup.rs/)
$ProgressPreference = 'SilentlyContinue'
$ErrorActionPreference='Stop'
$scriptpath = $MyInvocation.MyCommand.Path
$dir = Split-Path $scriptpath

Push-Location $dir


cargo install --locked --no-default-features --features csharp --version 0.30.0 wit-bindgen-cli
Invoke-WebRequest -Uri https://github.com/WebAssembly/wasi-http/archive/refs/tags/v0.2.1.tar.gz -OutFile v0.2.1.tar.gz
tar xzf v0.2.1.tar.gz
cp world.wit wasi-http-0.2.1/wit/world.wit
wit-bindgen c-sharp -w wasi-http -r native-aot --internal --skip-support-files wasi-http-0.2.1/wit
rm -r wasi-http-0.2.1
rm v0.2.1.tar.gz

Pop-Location
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,9 @@ set -ex
# [cargo](https://rustup.rs/)
# [curl](https://curl.se/download.html)

cargo install --locked --no-default-features --features csharp --version 0.29.0 wit-bindgen-cli
cargo install --locked --no-default-features --features csharp --version 0.30.0 wit-bindgen-cli
curl -OL https://github.com/WebAssembly/wasi-http/archive/refs/tags/v0.2.1.tar.gz
tar xzf v0.2.1.tar.gz
cat >wasi-http-0.2.1/wit/world.wit <<EOF
world wasi-http {
import outgoing-handler;
}
EOF
wit-bindgen c-sharp -w wasi-http -r native-aot --internal wasi-http-0.2.1/wit
rm -r wasi-http-0.2.1 v0.2.1.tar.gz WasiHttpWorld_wasm_import_linkage_attribute.cs WasiHttpWorld_cabi_realloc.c WasiHttpWorld_component_type.o
cp world.wit wasi-http-0.2.1/wit/world.wit
wit-bindgen c-sharp -w wasi-http -r native-aot --internal --skip-support-files wasi-http-0.2.1/wit
rm -r wasi-http-0.2.1 v0.2.1.tar.gz WasiHttpWorld_wasm_import_linkage_attribute.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
world wasi-http {
import outgoing-handler;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ public sealed partial class Thread
{
// these methods are temporarily accessed via UnsafeAccessor from generated code until we have it in public API, probably in WASI preview3 and promises
#if TARGET_WASI
internal static System.Threading.Tasks.Task RegisterWasiPollableHandle(int handle)
internal static System.Threading.Tasks.Task RegisterWasiPollableHandle(int handle, CancellationToken cancellationToken)
{
return WasiEventLoop.RegisterWasiPollableHandle(handle);
return WasiEventLoop.RegisterWasiPollableHandle(handle, cancellationToken);
}

internal static int PollWasiEventLoopUntilResolved(Task<int> mainTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,72 +4,122 @@
using System.Collections.Generic;
using System.Threading.Tasks;
using WasiPollWorld.wit.imports.wasi.io.v0_2_1;
using Pollable = WasiPollWorld.wit.imports.wasi.io.v0_2_1.IPoll.Pollable;

namespace System.Threading
{
internal static class WasiEventLoop
{
private static List<WeakReference<TaskCompletionSource>> s_pollables = new();
// TODO: if the Pollable never resolves and and the Task is abandoned
Copy link
Contributor

@dicej dicej Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my naive attempt at this. I'm sure yours is better, but one thing to make sure of is that we give all tasks a chance to run once any of them have been cancelled (see this comment). I can't tell if your code accounts for this.

FWIW, this is the code I've been using to test HttpClient timeouts. It's paired with this server code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My solution will cancel the Task synchronously when the CancellationTokenSource.Cancel is called because of the CancellationTokenRegistration callback.
And that could possibly trigger execution of other synchronous/inline continuations.
It seems to me, that synchronous cancellation is better. And it's enough to pass the test.

I added test similar to yours with some azure endpoint, which is not perfect, but I plan to fix it later.

// it will be leaked and stay in this list forever.
// it will also keep the Pollable handle alive and prevent it from being disposed
private static readonly List<PollableHolder> s_pollables = new();

internal static Task RegisterWasiPollableHandle(int handle)
internal static Task RegisterWasiPollableHandle(int handle, CancellationToken cancellationToken)
{
// note that this is duplicate of the original Pollable
// the original should be neutralized without disposing the handle
var pollableCpy = new IPoll.Pollable(new IPoll.Pollable.THandle(handle));
return RegisterWasiPollable(pollableCpy);
// the original should have been neutralized without disposing the handle
var pollableCpy = new Pollable(new Pollable.THandle(handle));
return RegisterWasiPollable(pollableCpy, cancellationToken);
}

internal static Task RegisterWasiPollable(IPoll.Pollable pollable)
internal static Task RegisterWasiPollable(Pollable pollable, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource(pollable);
var weakRef = new WeakReference<TaskCompletionSource>(tcs);
s_pollables.Add(weakRef);
return tcs.Task;
// this will register the pollable holder into s_pollables
var holder = new PollableHolder(pollable, cancellationToken);
s_pollables.Add(holder);
return holder.taskCompletionSource.Task;
}

// this is not thread safe
internal static void DispatchWasiEventLoop()
{
ThreadPoolWorkQueue.Dispatch();

if (s_pollables.Count > 0)
var holders = new List<PollableHolder>(s_pollables.Count);
var pending = new List<Pollable>(s_pollables.Count);
for (int i = 0; i < s_pollables.Count; i++)
{
var pollables = s_pollables;
s_pollables = new List<WeakReference<TaskCompletionSource>>(pollables.Count);
var arguments = new List<IPoll.Pollable>(pollables.Count);
var indexes = new List<int>(pollables.Count);
for (var i = 0; i < pollables.Count; i++)
var holder = s_pollables[i];
if (!holder.isDisposed)
{
var weakRef = pollables[i];
if (weakRef.TryGetTarget(out TaskCompletionSource? tcs))
{
var pollable = (IPoll.Pollable)tcs!.Task.AsyncState!;
arguments.Add(pollable);
indexes.Add(i);
}
holders.Add(holder);
pending.Add(holder.pollable);
}
}

// this is blocking until at least one pollable resolves
var readyIndexes = PollInterop.Poll(arguments);
s_pollables.Clear();

var ready = new bool[arguments.Count];
foreach (int readyIndex in readyIndexes)
if (pending.Count > 0)
{
var readyIndexes = PollInterop.Poll(pending);
for (int i = 0; i < readyIndexes.Length; i++)
{
ready[readyIndex] = true;
arguments[readyIndex].Dispose();
var weakRef = pollables[indexes[readyIndex]];
if (weakRef.TryGetTarget(out TaskCompletionSource? tcs))
{
tcs!.SetResult();
}
uint readyIndex = readyIndexes[i];
var holder = holders[(int)readyIndex];
holder.ResolveAndDispose();
}
for (var i = 0; i < arguments.Count; ++i)
for (int i = 0; i < holders.Count; i++)
{
if (!ready[i])
PollableHolder holder = holders[i];
if (!holder.isDisposed)
{
s_pollables.Add(pollables[indexes[i]]);
s_pollables.Add(holder);
}
}
}
}

private sealed class PollableHolder
{
public bool isDisposed;
public readonly Pollable pollable;
public readonly TaskCompletionSource taskCompletionSource;
public readonly CancellationTokenRegistration cancellationTokenRegistration;
public readonly CancellationToken cancellationToken;

public PollableHolder(Pollable pollable, CancellationToken cancellationToken)
{
this.pollable = pollable;
this.cancellationToken = cancellationToken;

// this means that taskCompletionSource.Task.AsyncState -> this;
// which means PollableHolder will be alive until the Task alive
taskCompletionSource = new TaskCompletionSource(this);

// static method is used to avoid allocating a delegate
cancellationTokenRegistration = cancellationToken.Register(CancelAndDispose, this);
}

public void ResolveAndDispose()
{
if (isDisposed)
{
return;
}

// no need to unregister the holder from s_pollables, when this is called
isDisposed = true;
taskCompletionSource.TrySetResult();
pollable.Dispose();
cancellationTokenRegistration.Dispose();
}

// for GC of abandoned Tasks or for cancellation
private static void CancelAndDispose(object? s)
{
PollableHolder self = (PollableHolder)s!;
if (self.isDisposed)
{
return;
}

// it will be removed from s_pollables on the next run
self.isDisposed = true;
self.taskCompletionSource.TrySetCanceled(self.cancellationToken);
self.pollable.Dispose();
self.cancellationTokenRegistration.Dispose();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable
using System;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Generated by `wit-bindgen` 0.29.0. DO NOT EDIT!
// Generated by `wit-bindgen` 0.30.0. DO NOT EDIT!
// <auto-generated />
#nullable enable

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Prerequisites:
# powershell
# tar
# [cargo](https://rustup.rs/)
$ProgressPreference = 'SilentlyContinue'
$ErrorActionPreference='Stop'
$scriptpath = $MyInvocation.MyCommand.Path
$dir = Split-Path $scriptpath

Push-Location $dir


cargo install --locked --no-default-features --features csharp --version 0.30.0 wit-bindgen-cli
Invoke-WebRequest -Uri https://github.com/WebAssembly/wasi-http/archive/refs/tags/v0.2.1.tar.gz -OutFile v0.2.1.tar.gz
tar xzf v0.2.1.tar.gz
cp world.wit wasi-http-0.2.1/wit/world.wit
wit-bindgen c-sharp -w wasi-poll -r native-aot --internal --skip-support-files wasi-http-0.2.1/wit
rm -r wasi-http-0.2.1
rm v0.2.1.tar.gz

Pop-Location
Loading
Loading