Skip to content

Commit

Permalink
Solve race conditions in Connection response processing
Browse files Browse the repository at this point in the history
  • Loading branch information
ggeurts committed Sep 4, 2018
1 parent 00ef7b1 commit f6376a2
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 113 deletions.
1 change: 0 additions & 1 deletion lib/PuppeteerSharp/Browser.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ public async Task<string> GetUserAgentAsync()

private async Task CloseCoreAsync()
{
Connection.StopReading();
try
{
try
Expand Down
65 changes: 32 additions & 33 deletions lib/PuppeteerSharp/CDPSession.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Collections.Generic;
using Newtonsoft.Json.Linq;
using Microsoft.Extensions.Logging;
using PuppeteerSharp.Helpers;

namespace PuppeteerSharp
{
Expand Down Expand Up @@ -43,16 +43,14 @@ internal CDPSession(IConnection connection, TargetType targetType, string sessio
TargetType = targetType;
SessionId = sessionId;

_callbacks = new Dictionary<int, MessageTask>();
_logger = Connection.LoggerFactory.CreateLogger<CDPSession>();
_sessions = new Dictionary<string, CDPSession>();
}

#region Private Members
private int _lastId;
private readonly Dictionary<int, MessageTask> _callbacks;
private readonly ConcurrentDictionary<int, MessageTask> _callbacks = new ConcurrentDictionary<int, MessageTask>();
private readonly ConcurrentDictionary<string, CDPSession> _sessions = new ConcurrentDictionary<string, CDPSession>();
private readonly ILogger _logger;
private readonly Dictionary<string, CDPSession> _sessions;
#endregion

#region Properties
Expand All @@ -79,11 +77,12 @@ internal CDPSession(IConnection connection, TargetType targetType, string sessio
/// Occurs when tracing is completed.
/// </summary>
public event EventHandler<TracingCompleteEventArgs> TracingComplete;

/// <summary>
/// Gets or sets a value indicating whether this <see cref="CDPSession"/> is closed.
/// </summary>
/// <value><c>true</c> if is closed; otherwise, <c>false</c>.</value>
public bool IsClosed { get; internal set; }
public bool IsClosed => Connection == null;

/// <summary>
/// Gets the logger factory.
Expand Down Expand Up @@ -111,6 +110,7 @@ internal async Task<dynamic> SendAsync(string method, bool rawContent, dynamic a
{
throw new Exception($"Protocol error ({method}): Session closed. Most likely the {TargetType} has been closed.");
}

var id = ++_lastId;
var message = JsonConvert.SerializeObject(new Dictionary<string, object>
{
Expand All @@ -126,8 +126,7 @@ internal async Task<dynamic> SendAsync(string method, bool rawContent, dynamic a
Method = method,
RawContent = rawContent
};

_callbacks[id] = callback;
_callbacks.TryAdd(id, callback);

try
{
Expand All @@ -139,10 +138,9 @@ internal async Task<dynamic> SendAsync(string method, bool rawContent, dynamic a
}
catch (Exception ex)
{
if (_callbacks.ContainsKey(id))
if (_callbacks.TryRemove(id, out _))
{
_callbacks.Remove(id);
callback.TaskWrapper.SetException(new MessageException(ex.Message, ex));
callback.TaskWrapper.TrySetException(new MessageException(ex.Message, ex));
}
}

Expand All @@ -163,30 +161,28 @@ public Task DetachAsync()
internal void OnMessage(string message)
{
dynamic obj = JsonConvert.DeserializeObject(message);
var objAsJObject = obj as JObject;
var objAsJObject = (JObject)obj;

_logger.LogTrace("◀ Receive {Message}", message);

if (objAsJObject["id"] != null && _callbacks.ContainsKey((int)obj.id))
var id = (int?)objAsJObject["id"];
if (id.HasValue && _callbacks.TryRemove(id.Value, out var callback))
{
var callback = _callbacks[(int)obj.id];
_callbacks.Remove((int)obj.id);

if (objAsJObject["error"] != null)
{
callback.TaskWrapper.SetException(new MessageException(
callback.TaskWrapper.TrySetException(new MessageException(
$"Protocol error ({ callback.Method }): {obj.error.message} {obj.error.data}"
));
}
else
{
if (callback.RawContent)
{
callback.TaskWrapper.SetResult(JsonConvert.SerializeObject(obj.result));
callback.TaskWrapper.TrySetResult(JsonConvert.SerializeObject(obj.result));
}
else
{
callback.TaskWrapper.SetResult(obj.result);
callback.TaskWrapper.TrySetResult(obj.result);
}
}
}
Expand All @@ -201,19 +197,16 @@ internal void OnMessage(string message)
}
else if (obj.method == "Target.receivedMessageFromTarget")
{
var session = _sessions.GetValueOrDefault(objAsJObject["params"]["sessionId"].ToString());
if (session != null)
if (_sessions.TryGetValue(objAsJObject["params"]["sessionId"].ToString(), out var session))
{
session.OnMessage(objAsJObject["params"]["message"].ToString());
}
}
else if (obj.method == "Target.detachedFromTarget")
{
var session = _sessions.GetValueOrDefault(objAsJObject["params"]["sessionId"].ToString());
if (!(session?.IsClosed ?? true))
if (_sessions.TryRemove(objAsJObject["params"]["sessionId"].ToString(), out var session) && !session.IsClosed)
{
session.OnClosed();
_sessions.Remove(objAsJObject["params"]["sessionId"].ToString());
}
}

Expand All @@ -227,24 +220,30 @@ internal void OnMessage(string message)

internal void OnClosed()
{
IsClosed = true;
foreach (var callback in _callbacks.Values)
if (Connection == null)
{
callback.TaskWrapper.SetException(new TargetClosedException(
$"Protocol error({callback.Method}): Target closed."
));
return;
}
_callbacks.Clear();
Connection = null;

foreach (var entry in _callbacks)
{
if (_callbacks.TryRemove(entry.Key, out _))
{
entry.Value.TaskWrapper.TrySetException(
new TargetClosedException($"Protocol error({entry.Value.Method}): Target closed."));
}
}
}

internal CDPSession CreateSession(TargetType targetType, string sessionId)
{
var session = new CDPSession(this, targetType, sessionId);
_sessions[sessionId] = session;
_sessions.TryAdd(sessionId, session);
return session;
}
#endregion

#region IConnection
ILoggerFactory IConnection.LoggerFactory => LoggerFactory;
bool IConnection.IsClosed => IsClosed;
Expand Down
Loading

0 comments on commit f6376a2

Please sign in to comment.