Skip to content

Commit

Permalink
Addresses deadlock, thread-safety in mixed sync/async apps (#1549)
Browse files Browse the repository at this point in the history
* Fixed deadlock, thread-safety in mixed sync/async apps

where if a continuation from CDP.SendAsync in returns calls CDP.SendAsync but uses .GetResult(), then deadlock.
CDP event messages are still delivered like before, since their order must be preserved. There's potential for deadlock here, but this is still the caller's responsibility to handle events correctly.

* Using expr body like before

* Reverted queue change, added test

* Fixed race in test

* Reverted if branch change

* Reverted change to test

* Reverted dropped unused using

* Added lost BOM from my previous commits to this

* Refactored new SendAsync response handling into its own place

* Possible solution for ensuring async tasks are done

* Addressed false positive disposal warning

* Simplified new task queue

Important part is really only DrainAsync

* Added EnqueueSendAsyncResponses option

similar to EnqueueTransportMessages, to at least provide a way to turn this new async behavior off

* Implemented handling for new EnqueueSendAsyncResponses option

* Temp disabling EnqueueSendAsyncResponses for test run

* Reverted temp change

* Temp reverting CSSCoverage changes

which could be causing tests to hang

* Reverted previous commit

* Ensuring canceled task is handled

* Fixed null ref founding during local test run

* Attempting to roll back regression causing deadlock

* Reverted previous, only logging unhandled exception

* Reverted cancel token param

* Using alternate dispose impl

* Reverted accidental whitespace and BOM changes

* Renames, code cleanup, defaulting new async option to false

* Consolidated into single ConcurrentDictionary

* Renamed test var to match renamed option name

* Modified test to more closely match what was reported in #1354

* Added ENQUEUE_ASYNC_MESSAGES variation

* Second attempt on appveyor matrix

* Ignoring test unless required option is set

* Update lib/PuppeteerSharp/CDPSession.cs

* Update appveyor.yml

Co-authored-by: Jeff Peirson <Jeff.Peirson@a2ius.com>
Co-authored-by: Darío Kondratiuk <dariokondratiuk@gmail.com>
  • Loading branch information
3 people authored Nov 8, 2020
1 parent 616425c commit d5044c6
Show file tree
Hide file tree
Showing 12 changed files with 317 additions and 54 deletions.
10 changes: 9 additions & 1 deletion appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ environment:
matrix:
- framework: net48
- framework: netcoreapp2.2
- framework: net48
ENQUEUE_ASYNC_MESSAGES: true
- framework: netcoreapp2.2
ENQUEUE_ASYNC_MESSAGES: true
- framework: net48
ENQUEUE_ASYNC_MESSAGES: false
- framework: netcoreapp2.2
ENQUEUE_ASYNC_MESSAGES: false
install:
- ps: >-
if($env:APPVEYOR_REPO_TAG -eq 'True' -And $env:framework -eq 'netcoreapp2.0') {
Expand Down Expand Up @@ -39,4 +47,4 @@ on_success:
c:\projects\puppeteer-sharp\appveyor\GenerateDocs.ps1
cache:
- $HOME/.nuget/packages
- $HOME/.nuget/packages
53 changes: 53 additions & 0 deletions lib/PuppeteerSharp.Tests/Issues/Issue1354.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System.Threading.Tasks;
using Xunit;
using Xunit.Abstractions;

namespace PuppeteerSharp.Tests.Issues
{
[Collection(TestConstants.TestFixtureCollectionName)]
public class Issue1354 : PuppeteerPageBaseTest
{
public Issue1354(ITestOutputHelper output) : base(output)
{
}

[Fact(Timeout = 30000)]
public async Task ShouldAllowSyncClose()
{
var options = TestConstants.DefaultBrowserOptions();
if (!options.EnqueueAsyncMessages)
{
// This test won't pass unless this option is set to true.
return;
}

using (var browser = await Puppeteer.LaunchAsync(options).ConfigureAwait(false))
{
// In issue #1354, this line hangs forever
browser.CloseAsync().Wait();
}
}

[Fact(Timeout = 30000)]
public async Task ShouldAllowSyncPageMethod()
{
var options = TestConstants.DefaultBrowserOptions();
if (!options.EnqueueAsyncMessages)
{
return;
}

using (var browser = await Puppeteer.LaunchAsync(options))
{
// Async low-level use
await using var page = await browser.NewPageAsync().ConfigureAwait(false);
await page.GoToAsync("http://ipecho.net/plain", WaitUntilNavigation.DOMContentLoaded).ConfigureAwait(false);
await page.SetContentAsync("<html><body>REPLACED</body></html>").ConfigureAwait(false);

// Deep inside an existing mostly sync app...
var content = page.GetContentAsync().ConfigureAwait(false).GetAwaiter().GetResult();
Assert.Contains("REPLACE", content);
}
}
}
}
5 changes: 3 additions & 2 deletions lib/PuppeteerSharp.Tests/TestConstants.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
Expand Down Expand Up @@ -43,6 +43,7 @@ public static class TestConstants
{
SlowMo = Convert.ToInt32(Environment.GetEnvironmentVariable("SLOW_MO")),
Headless = Convert.ToBoolean(Environment.GetEnvironmentVariable("HEADLESS") ?? "true"),
EnqueueAsyncMessages = Convert.ToBoolean(Environment.GetEnvironmentVariable("ENQUEUE_ASYNC_MESSAGES") ?? "false"),
Timeout = 0,
LogProcess = true,
#if NETCOREAPP
Expand Down Expand Up @@ -70,4 +71,4 @@ public static void SetupLogging(ITestOutputHelper output)
}
}
}
}
}
15 changes: 4 additions & 11 deletions lib/PuppeteerSharp/CDPSession.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -118,7 +118,7 @@ public async Task<T> SendAsync<T>(string method, object args = null)
/// If <c>false</c> the task will be considered complete after sending the message to Chromium.
/// </param>
/// <returns>The task.</returns>
/// <exception cref="PuppeteerSharp.PuppeteerException"></exception>
/// <exception cref="PuppeteerSharp.PuppeteerException"></exception>
public async Task<JObject> SendAsync(string method, object args = null, bool waitForCallback = true)
{
if (Connection == null)
Expand Down Expand Up @@ -186,14 +186,7 @@ internal void OnMessage(ConnectionResponse obj)

if (id.HasValue && _callbacks.TryRemove(id.Value, out var callback))
{
if (obj.Error != null)
{
callback.TaskWrapper.TrySetException(new MessageException(callback, obj.Error));
}
else
{
callback.TaskWrapper.TrySetResult(obj.Result);
}
Connection.MessageQueue.Enqueue(callback, obj);
}
else
{
Expand Down Expand Up @@ -232,4 +225,4 @@ internal void Close(string closeReason)

#endregion
}
}
}
12 changes: 12 additions & 0 deletions lib/PuppeteerSharp/ConnectOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,17 @@ public class ConnectOptions : IBrowserOptions, IConnectionOptions
/// Setting this to <c>false</c> proved to work in .NET Core but it tends to fail on .NET Framework.
/// </remarks>
public bool EnqueueTransportMessages { get; set; } = true;

/// <summary>
/// Affects how responses to <see cref="CDPSession.SendAsync"/> are returned to the caller. If <c>true</c> (default), the
/// response is delivered to the caller on its own thread; otherwise, the response is delivered the same way <see cref="CDPSession.MessageReceived"/>
/// events are raised.
/// </summary>
/// <remarks>
/// This should normally be set to <c>true</c> to support applications that aren't <c>async</c> "all the way up"; i.e., the application
/// has legacy code that is not async which makes calls into PuppeteerSharp. If you experience issues, or your application is not mixed sync/async use, you
/// can set this to <c>false</c> (default).
/// </remarks>
public bool EnqueueAsyncMessages { get; set; }
}
}
19 changes: 8 additions & 11 deletions lib/PuppeteerSharp/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class Connection : IDisposable
private readonly ILogger _logger;
private TaskQueue _callbackQueue = new TaskQueue();

internal Connection(string url, int delay, IConnectionTransport transport, ILoggerFactory loggerFactory = null)
internal Connection(string url, int delay, bool enqueueAsyncMessages, IConnectionTransport transport, ILoggerFactory loggerFactory = null)
{
LoggerFactory = loggerFactory ?? new LoggerFactory();
Url = url;
Expand All @@ -34,6 +34,7 @@ internal Connection(string url, int delay, IConnectionTransport transport, ILogg
Transport.Closed += Transport_Closed;
_callbacks = new ConcurrentDictionary<int, MessageTask>();
_sessions = new ConcurrentDictionary<string, CDPSession>();
MessageQueue = new AsyncMessageQueue(enqueueAsyncMessages, _logger);
_asyncSessions = new AsyncDictionaryHelper<string, CDPSession>(_sessions, "Session {0} not found");
}

Expand Down Expand Up @@ -85,6 +86,8 @@ internal Connection(string url, int delay, IConnectionTransport transport, ILogg
/// <value>The logger factory.</value>
public ILoggerFactory LoggerFactory { get; }

internal AsyncMessageQueue MessageQueue { get; }

#endregion

#region Public Methods
Expand Down Expand Up @@ -176,6 +179,7 @@ internal void Close(string closeReason)
}

_callbacks.Clear();
MessageQueue.Dispose();
}

internal static Connection FromSession(CDPSession session) => session.Connection;
Expand Down Expand Up @@ -245,22 +249,15 @@ private void ProcessIncomingMessage(ConnectionResponse obj)
if (!string.IsNullOrEmpty(obj.SessionId))
{
var session = GetSession(obj.SessionId);
session.OnMessage(obj);
session?.OnMessage(obj);
}
else if (obj.Id.HasValue)
{
// If we get the object we are waiting for we return if
// if not we add this to the list, sooner or later some one will come for it
if (_callbacks.TryRemove(obj.Id.Value, out var callback))
{
if (obj.Error != null)
{
callback.TaskWrapper.TrySetException(new MessageException(callback, obj.Error));
}
else
{
callback.TaskWrapper.TrySetResult(obj.Result);
}
MessageQueue.Enqueue(callback, obj);
}
}
else
Expand Down Expand Up @@ -296,7 +293,7 @@ internal static async Task<Connection> Create(string url, IConnectionOptions con
transport = await transportFactory(new Uri(url), connectionOptions, cancellationToken).ConfigureAwait(false);
}

return new Connection(url, connectionOptions.SlowMo, transport, loggerFactory);
return new Connection(url, connectionOptions.SlowMo, connectionOptions.EnqueueAsyncMessages, transport, loggerFactory);
}

/// <inheritdoc />
Expand Down
21 changes: 9 additions & 12 deletions lib/PuppeteerSharp/FrameManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace PuppeteerSharp
{
internal class FrameManager
{
private readonly Dictionary<int, ExecutionContext> _contextIdToContext;
private readonly ConcurrentDictionary<int, ExecutionContext> _contextIdToContext;
private bool _ensureNewDocumentNavigation;
private readonly ILogger _logger;
private readonly ConcurrentDictionary<string, Frame> _frames;
Expand All @@ -27,7 +27,7 @@ private FrameManager(CDPSession client, Page page, bool ignoreHTTPSErrors, Timeo
Client = client;
Page = page;
_frames = new ConcurrentDictionary<string, Frame>();
_contextIdToContext = new Dictionary<int, ExecutionContext>();
_contextIdToContext = new ConcurrentDictionary<int, ExecutionContext>();
_logger = Client.Connection.LoggerFactory.CreateLogger<FrameManager>();
NetworkManager = new NetworkManager(client, ignoreHTTPSErrors, this);
TimeoutSettings = timeoutSettings;
Expand Down Expand Up @@ -240,24 +240,21 @@ private void OnExecutionContextsCleared()
{
while (_contextIdToContext.Count > 0)
{
var contextItem = _contextIdToContext.ElementAt(0);
_contextIdToContext.Remove(contextItem.Key);

if (contextItem.Value.World != null)
int key0 = _contextIdToContext.Keys.ElementAtOrDefault(0);
if (_contextIdToContext.TryRemove(key0, out var context))
{
contextItem.Value.World.SetContext(null);
if (context.World != null)
{
context.World.SetContext(null);
}
}
}
}

private void OnExecutionContextDestroyed(int executionContextId)
{
_contextIdToContext.TryGetValue(executionContextId, out var context);

if (context != null)
if (_contextIdToContext.TryRemove(executionContextId, out var context))
{
_contextIdToContext.Remove(executionContextId);

if (context.World != null)
{
context.World.SetContext(null);
Expand Down
102 changes: 102 additions & 0 deletions lib/PuppeteerSharp/Helpers/AsyncMessageQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using PuppeteerSharp.Messaging;

namespace PuppeteerSharp.Helpers
{
/// <summary>
/// Provides an async queue for responses for <see cref="CDPSession.SendAsync"/>, so that responses can be handled
/// async without risk callers causing a deadlock.
/// </summary>
internal class AsyncMessageQueue : IDisposable
{
private bool _disposed;
private readonly List<MessageTask> _pendingTasks;
private readonly bool _enqueueAsyncMessages;
private readonly ILogger _logger;

public AsyncMessageQueue(bool enqueueAsyncMessages, ILogger logger = null)
{
_enqueueAsyncMessages = enqueueAsyncMessages;
_logger = logger ?? NullLogger.Instance;
_pendingTasks = new List<MessageTask>();
}

public void Enqueue(MessageTask callback, ConnectionResponse obj)
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}

if (!_enqueueAsyncMessages)
{
HandleAsyncMessage(callback, obj);
return;
}

// Keep a ref to this task until it completes. If it can't finish by the time we dispose this queue,
// then we'll find it and cancel it.
lock (_pendingTasks)
{
_pendingTasks.Add(callback);
}

var task = Task.Run(() => HandleAsyncMessage(callback, obj));

// Unhandled error handler
task.ContinueWith(t =>
{
_logger.LogError(t.Exception, "Failed to complete async handling of SendAsync for {callback}", callback.Method);
callback.TaskWrapper.TrySetException(t.Exception!); // t.Exception is available since this runs only on faulted
}, TaskContinuationOptions.OnlyOnFaulted);

// Always remove from the queue when done, regardless of outcome.
task.ContinueWith(_ =>
{
lock (_pendingTasks)
{
_pendingTasks.Remove(callback);
}
});
}

public void Dispose()
{
if (_disposed)
{
return;
}

// Ensure all tasks are finished since we're disposing now. Any pending tasks will be canceled.
MessageTask[] pendingTasks;
lock (_pendingTasks)
{
pendingTasks = _pendingTasks.ToArray();
_pendingTasks.Clear();
}

foreach (var pendingTask in pendingTasks)
{
pendingTask.TaskWrapper.TrySetCanceled();
}

_disposed = true;
}

private static void HandleAsyncMessage(MessageTask callback, ConnectionResponse obj)
{
if (obj.Error != null)
{
callback.TaskWrapper.TrySetException(new MessageException(callback, obj.Error));
}
else
{
callback.TaskWrapper.TrySetResult(obj.Result);
}
}
}
}
Loading

0 comments on commit d5044c6

Please sign in to comment.