Skip to content

Commit

Permalink
[browser][MT] WebSocket thread affinity & server initiated close (#96618
Browse files Browse the repository at this point in the history
)

Co-authored-by: Marek Fišera <mara@neptuo.com>
Co-authored-by: Katelyn Gadd <kg@luminance.org>
  • Loading branch information
3 people authored Jan 16, 2024
1 parent 1c860e2 commit e047cce
Show file tree
Hide file tree
Showing 10 changed files with 664 additions and 430 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,15 @@ internal static partial class WebSocketValidate
internal const int MaxDeflateWindowBits = 15;

internal const int MaxControlFramePayloadLength = 123;
#if TARGET_BROWSER
private const int ValidCloseStatusCodesFrom = 3000;
private const int ValidCloseStatusCodesTo = 4999;
#else
private const int CloseStatusCodeAbort = 1006;
private const int CloseStatusCodeFailedTLSHandshake = 1015;
private const int InvalidCloseStatusCodesFrom = 0;
private const int InvalidCloseStatusCodesTo = 999;
#endif

// [0x21, 0x7E] except separators "()<>@,;:\\\"/[]?={} ".
private static readonly SearchValues<char> s_validSubprotocolChars =
Expand Down Expand Up @@ -84,11 +89,15 @@ internal static void ValidateCloseStatus(WebSocketCloseStatus closeStatus, strin
}

int closeStatusCode = (int)closeStatus;

#if TARGET_BROWSER
// as defined in https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close#code
if (closeStatus != WebSocketCloseStatus.NormalClosure && (closeStatusCode < ValidCloseStatusCodesFrom || closeStatusCode > ValidCloseStatusCodesTo))
#else
if ((closeStatusCode >= InvalidCloseStatusCodesFrom &&
closeStatusCode <= InvalidCloseStatusCodesTo) ||
closeStatusCode == CloseStatusCodeAbort ||
closeStatusCode == CloseStatusCodeFailedTLSHandshake)
#endif
{
// CloseStatus 1006 means Aborted - this will never appear on the wire and is reflected by calling WebSocket.Abort
throw new ArgumentException(SR.Format(SR.net_WebSockets_InvalidCloseStatusCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ public static async Task InvokeAsync(HttpContext context)

if (context.Request.QueryString.HasValue && context.Request.QueryString.Value.Contains("delay10sec"))
{
Thread.Sleep(10000);
await Task.Delay(10000);
}
else if (context.Request.QueryString.HasValue && context.Request.QueryString.Value.Contains("delay20sec"))
{
Thread.Sleep(20000);
await Task.Delay(20000);
}

try
Expand Down Expand Up @@ -124,14 +124,15 @@ await socket.CloseAsync(
}

bool sendMessage = false;
string receivedMessage = null;
if (receiveResult.MessageType == WebSocketMessageType.Text)
{
string receivedMessage = Encoding.UTF8.GetString(receiveBuffer, 0, offset);
receivedMessage = Encoding.UTF8.GetString(receiveBuffer, 0, offset);
if (receivedMessage == ".close")
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, receivedMessage, CancellationToken.None);
}
if (receivedMessage == ".shutdown")
else if (receivedMessage == ".shutdown")
{
await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, receivedMessage, CancellationToken.None);
}
Expand Down Expand Up @@ -161,6 +162,14 @@ await socket.SendAsync(
!replyWithPartialMessages,
CancellationToken.None);
}
if (receivedMessage == ".closeafter")
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, receivedMessage, CancellationToken.None);
}
else if (receivedMessage == ".shutdownafter")
{
await socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, receivedMessage, CancellationToken.None);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,69 @@ internal static partial class BrowserInterop
{
public static string? GetProtocol(JSObject? webSocket)
{
if (webSocket == null || webSocket.IsDisposed) return null;
if (webSocket == null || webSocket.IsDisposed)
{
return null;
}

string? protocol = webSocket.GetPropertyAsString("protocol");
return protocol;
}

public static WebSocketCloseStatus? GetCloseStatus(JSObject? webSocket)
{
if (webSocket == null || webSocket.IsDisposed)
{
return null;
}
if (!webSocket.HasProperty("close_status"))
{
return null;
}

int status = webSocket.GetPropertyAsInt32("close_status");
return (WebSocketCloseStatus)status;
}

public static string? GetCloseStatusDescription(JSObject? webSocket)
{
if (webSocket == null || webSocket.IsDisposed)
{
return null;
}

string? description = webSocket.GetPropertyAsString("close_status_description");
return description;
}

public static int GetReadyState(JSObject? webSocket)
{
if (webSocket == null || webSocket.IsDisposed) return -1;
if (webSocket == null || webSocket.IsDisposed)
{
return -1;
}

int? readyState = webSocket.GetPropertyAsInt32("readyState");
if (!readyState.HasValue) return -1;
if (!readyState.HasValue)
{
return -1;
}

return readyState.Value;
}

[JSImport("INTERNAL.ws_wasm_create")]
public static partial JSObject WebSocketCreate(
string uri,
string?[]? subProtocols,
IntPtr responseStatusPtr,
[JSMarshalAs<JSType.Function<JSType.Number, JSType.String>>] Action<int, string> onClosed);
IntPtr responseStatusPtr);

public static unsafe JSObject UnsafeCreate(
string uri,
string?[]? subProtocols,
MemoryHandle responseHandle,
[JSMarshalAs<JSType.Function<JSType.Number, JSType.String>>] Action<int, string> onClosed)
MemoryHandle responseHandle)
{
return WebSocketCreate(uri, subProtocols, (IntPtr)responseHandle.Pointer, onClosed);
return WebSocketCreate(uri, subProtocols, (IntPtr)responseHandle.Pointer);
}

[JSImport("INTERNAL.ws_wasm_open")]
Expand All @@ -52,19 +88,9 @@ public static partial Task WebSocketOpen(
int messageType,
bool endOfMessage);

public static unsafe Task? UnsafeSendSync(JSObject jsWs, ArraySegment<byte> buffer, WebSocketMessageType messageType, bool endOfMessage)
public static unsafe Task? UnsafeSend(JSObject jsWs, MemoryHandle pinBuffer, int length, WebSocketMessageType messageType, bool endOfMessage)
{
if (buffer.Count == 0)
{
return WebSocketSend(jsWs, IntPtr.Zero, 0, (int)messageType, endOfMessage);
}

var span = buffer.AsSpan();
// we can do this because the bytes in the buffer are always consumed synchronously (not later with Task resolution)
fixed (void* spanPtr = span)
{
return WebSocketSend(jsWs, (IntPtr)spanPtr, buffer.Count, (int)messageType, endOfMessage);
}
return WebSocketSend(jsWs, (IntPtr)pinBuffer.Pointer, length, (int)messageType, endOfMessage);
}

[JSImport("INTERNAL.ws_wasm_receive")]
Expand All @@ -73,7 +99,7 @@ public static partial Task WebSocketOpen(
IntPtr bufferPtr,
int bufferLength);

public static unsafe Task? ReceiveUnsafeSync(JSObject jsWs, MemoryHandle pinBuffer, int length)
public static unsafe Task? ReceiveUnsafe(JSObject jsWs, MemoryHandle pinBuffer, int length)
{
return WebSocketReceive(jsWs, (IntPtr)pinBuffer.Pointer, length);
}
Expand Down
Loading

0 comments on commit e047cce

Please sign in to comment.