-
Notifications
You must be signed in to change notification settings - Fork 30k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
inspector: Put connect and disconnect events into message queue #7271
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
#include "libplatform/libplatform.h" | ||
|
||
#include <string.h> | ||
#include <utility> | ||
#include <vector> | ||
|
||
// We need pid to use as ID with Chrome | ||
|
@@ -31,6 +32,9 @@ | |
namespace node { | ||
namespace { | ||
|
||
const char TAG_CONNECT[] = "#connect"; | ||
const char TAG_DISCONNECT[] = "#disconnect"; | ||
|
||
const char DEVTOOLS_PATH[] = "/node"; | ||
const char DEVTOOLS_HASH[] = "521e5b7e2b7cc66b4006a8a54cb9c4e57494a5ef"; | ||
|
||
|
@@ -154,7 +158,6 @@ bool RespondToGet(inspector_socket_t* socket, const char* path, int port) { | |
namespace inspector { | ||
|
||
using blink::protocol::DictionaryValue; | ||
using blink::protocol::String16; | ||
|
||
class AgentImpl { | ||
public: | ||
|
@@ -171,24 +174,27 @@ class AgentImpl { | |
void WaitForDisconnect(); | ||
|
||
private: | ||
using MessageQueue = std::vector<std::pair<int, String16>>; | ||
|
||
static void ThreadCbIO(void* agent); | ||
static void OnSocketConnectionIO(uv_stream_t* server, int status); | ||
static bool OnInspectorHandshakeIO(inspector_socket_t* socket, | ||
enum inspector_handshake_event state, | ||
const char* path); | ||
static void OnRemoteDataIO(uv_stream_t* stream, ssize_t read, | ||
const uv_buf_t* b); | ||
static void WriteCbIO(uv_async_t* async); | ||
|
||
void WorkerRunIO(); | ||
void OnInspectorConnectionIO(inspector_socket_t* socket); | ||
void PushPendingMessage(std::vector<std::string>* queue, | ||
const std::string& message); | ||
void SwapBehindLock(std::vector<std::string> AgentImpl::*queue, | ||
std::vector<std::string>* output); | ||
void OnRemoteDataIO(inspector_socket_t* stream, ssize_t read, | ||
const uv_buf_t* b); | ||
void PostMessages(); | ||
void SetConnected(bool connected); | ||
void Write(const std::string& message); | ||
void DispatchMessages(); | ||
void Write(int session_id, const String16& message); | ||
void AppendMessage(MessageQueue* vector, int session_id, | ||
const String16& message); | ||
void SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2); | ||
void PostIncomingMessage(const String16& message); | ||
|
||
uv_sem_t start_sem_; | ||
ConditionVariable pause_cond_; | ||
|
@@ -208,27 +214,36 @@ class AgentImpl { | |
inspector_socket_t* client_socket_; | ||
blink::V8Inspector* inspector_; | ||
v8::Platform* platform_; | ||
std::vector<std::string> message_queue_; | ||
std::vector<std::string> outgoing_message_queue_; | ||
MessageQueue incoming_message_queue_; | ||
MessageQueue outgoing_message_queue_; | ||
bool dispatching_messages_; | ||
int frontend_session_id_; | ||
int backend_session_id_; | ||
|
||
friend class ChannelImpl; | ||
friend class DispatchOnInspectorBackendTask; | ||
friend class SetConnectedTask; | ||
friend class V8NodeInspector; | ||
friend void InterruptCallback(v8::Isolate*, void* agent); | ||
friend void DataCallback(uv_stream_t* stream, ssize_t read, | ||
const uv_buf_t* buf); | ||
}; | ||
|
||
void InterruptCallback(v8::Isolate*, void* agent) { | ||
static_cast<AgentImpl*>(agent)->PostMessages(); | ||
static_cast<AgentImpl*>(agent)->DispatchMessages(); | ||
} | ||
|
||
void DataCallback(uv_stream_t* stream, ssize_t read, const uv_buf_t* buf) { | ||
inspector_socket_t* socket = static_cast<inspector_socket_t*>(stream->data); | ||
static_cast<AgentImpl*>(socket->data)->OnRemoteDataIO(socket, read, buf); | ||
} | ||
|
||
class DispatchOnInspectorBackendTask : public v8::Task { | ||
public: | ||
explicit DispatchOnInspectorBackendTask(AgentImpl* agent) : agent_(agent) {} | ||
|
||
void Run() override { | ||
agent_->PostMessages(); | ||
agent_->DispatchMessages(); | ||
} | ||
|
||
private: | ||
|
@@ -251,27 +266,12 @@ class ChannelImpl final : public blink::protocol::FrontendChannel { | |
void flushProtocolNotifications() override { } | ||
|
||
void sendMessageToFrontend(const String16& message) { | ||
agent_->Write(message.utf8()); | ||
agent_->Write(agent_->frontend_session_id_, message); | ||
} | ||
|
||
AgentImpl* const agent_; | ||
}; | ||
|
||
class SetConnectedTask : public v8::Task { | ||
public: | ||
SetConnectedTask(AgentImpl* agent, bool connected) | ||
: agent_(agent), | ||
connected_(connected) {} | ||
|
||
void Run() override { | ||
agent_->SetConnected(connected_); | ||
} | ||
|
||
private: | ||
AgentImpl* agent_; | ||
bool connected_; | ||
}; | ||
|
||
class V8NodeInspector : public blink::V8Inspector { | ||
public: | ||
V8NodeInspector(AgentImpl* agent, node::Environment* env, | ||
|
@@ -320,7 +320,9 @@ AgentImpl::AgentImpl(Environment* env) : port_(0), | |
client_socket_(nullptr), | ||
inspector_(nullptr), | ||
platform_(nullptr), | ||
dispatching_messages_(false) { | ||
dispatching_messages_(false), | ||
frontend_session_id_(0), | ||
backend_session_id_(0) { | ||
CHECK_EQ(0, uv_sem_init(&start_sem_, 0)); | ||
memset(&data_written_, 0, sizeof(data_written_)); | ||
memset(&io_thread_req_, 0, sizeof(io_thread_req_)); | ||
|
@@ -355,10 +357,7 @@ void AgentImpl::Start(v8::Platform* platform, int port, bool wait) { | |
uv_sem_wait(&start_sem_); | ||
|
||
if (wait) { | ||
// Flush messages in case of wait to connect, see OnRemoteDataIO on how it | ||
// should be fixed. | ||
SetConnected(true); | ||
PostMessages(); | ||
DispatchMessages(); | ||
} | ||
} | ||
|
||
|
@@ -424,67 +423,54 @@ bool AgentImpl::OnInspectorHandshakeIO(inspector_socket_t* socket, | |
} | ||
} | ||
|
||
// static | ||
void AgentImpl::OnRemoteDataIO(uv_stream_t* stream, | ||
ssize_t read, | ||
const uv_buf_t* b) { | ||
inspector_socket_t* socket = static_cast<inspector_socket_t*>(stream->data); | ||
AgentImpl* agent = static_cast<AgentImpl*>(socket->data); | ||
Mutex::ScopedLock scoped_lock(agent->pause_lock_); | ||
void AgentImpl::OnRemoteDataIO(inspector_socket_t* socket, | ||
ssize_t read, | ||
const uv_buf_t* buf) { | ||
Mutex::ScopedLock scoped_lock(pause_lock_); | ||
if (read > 0) { | ||
std::string str(b->base, read); | ||
agent->PushPendingMessage(&agent->message_queue_, str); | ||
free(b->base); | ||
|
||
String16 str = String16::fromUTF8(buf->base, read); | ||
PostIncomingMessage(str); | ||
// TODO(pfeldman): Instead of blocking execution while debugger | ||
// engages, node should wait for the run callback from the remote client | ||
// and initiate its startup. This is a change to node.cc that should be | ||
// upstreamed separately. | ||
if (agent->wait_ && str.find("\"Runtime.run\"") != std::string::npos) { | ||
agent->wait_ = false; | ||
uv_sem_post(&agent->start_sem_); | ||
if (wait_ && str.find("\"Runtime.run\"") != std::string::npos) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question: this assumes that the message doesn't get spread across TCP packets, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This message comes over WebSocket protocol. Current implementation only reports full frames. |
||
wait_ = false; | ||
uv_sem_post(&start_sem_); | ||
} | ||
|
||
agent->platform_->CallOnForegroundThread(agent->parent_env_->isolate(), | ||
new DispatchOnInspectorBackendTask(agent)); | ||
agent->parent_env_->isolate() | ||
->RequestInterrupt(InterruptCallback, agent); | ||
uv_async_send(&agent->data_written_); | ||
platform_->CallOnForegroundThread(parent_env_->isolate(), | ||
new DispatchOnInspectorBackendTask(this)); | ||
parent_env_->isolate()->RequestInterrupt(InterruptCallback, this); | ||
uv_async_send(&data_written_); | ||
} else if (read <= 0) { | ||
// EOF | ||
if (agent->client_socket_ == socket) { | ||
agent->client_socket_ = nullptr; | ||
agent->platform_->CallOnForegroundThread(agent->parent_env_->isolate(), | ||
new SetConnectedTask(agent, false)); | ||
uv_async_send(&agent->data_written_); | ||
if (client_socket_ == socket) { | ||
String16 message(TAG_DISCONNECT, sizeof(TAG_DISCONNECT) - 1); | ||
client_socket_ = nullptr; | ||
PostIncomingMessage(message); | ||
} | ||
DisconnectAndDisposeIO(socket); | ||
} | ||
agent->pause_cond_.Broadcast(scoped_lock); | ||
} | ||
|
||
void AgentImpl::PushPendingMessage(std::vector<std::string>* queue, | ||
const std::string& message) { | ||
Mutex::ScopedLock scoped_lock(queue_lock_); | ||
queue->push_back(message); | ||
} | ||
|
||
void AgentImpl::SwapBehindLock(std::vector<std::string> AgentImpl::*queue, | ||
std::vector<std::string>* output) { | ||
Mutex::ScopedLock scoped_lock(queue_lock_); | ||
(this->*queue).swap(*output); | ||
if (buf) { | ||
free(buf->base); | ||
} | ||
pause_cond_.Broadcast(scoped_lock); | ||
} | ||
|
||
// static | ||
void AgentImpl::WriteCbIO(uv_async_t* async) { | ||
AgentImpl* agent = static_cast<AgentImpl*>(async->data); | ||
inspector_socket_t* socket = agent->client_socket_; | ||
if (socket) { | ||
std::vector<std::string> outgoing_messages; | ||
agent->SwapBehindLock(&AgentImpl::outgoing_message_queue_, | ||
&outgoing_messages); | ||
for (auto const& message : outgoing_messages) | ||
inspector_write(socket, message.c_str(), message.length()); | ||
MessageQueue outgoing_messages; | ||
agent->SwapBehindLock(&agent->outgoing_message_queue_, &outgoing_messages); | ||
for (const MessageQueue::value_type& outgoing : outgoing_messages) { | ||
if (outgoing.first == agent->frontend_session_id_) { | ||
std::string message = outgoing.second.utf8(); | ||
inspector_write(socket, message.c_str(), message.length()); | ||
} | ||
} | ||
} | ||
} | ||
|
||
|
@@ -518,49 +504,70 @@ void AgentImpl::WorkerRunIO() { | |
uv_run(&child_loop_, UV_RUN_DEFAULT); | ||
} | ||
|
||
void AgentImpl::AppendMessage(MessageQueue* queue, int session_id, | ||
const String16& message) { | ||
Mutex::ScopedLock scoped_lock(queue_lock_); | ||
queue->push_back(std::make_pair(session_id, message)); | ||
} | ||
|
||
void AgentImpl::SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2) { | ||
Mutex::ScopedLock scoped_lock(queue_lock_); | ||
vector1->swap(*vector2); | ||
} | ||
|
||
void AgentImpl::PostIncomingMessage(const String16& message) { | ||
AppendMessage(&incoming_message_queue_, frontend_session_id_, message); | ||
v8::Isolate* isolate = parent_env_->isolate(); | ||
platform_->CallOnForegroundThread(isolate, | ||
new DispatchOnInspectorBackendTask(this)); | ||
isolate->RequestInterrupt(InterruptCallback, this); | ||
uv_async_send(&data_written_); | ||
} | ||
|
||
void AgentImpl::OnInspectorConnectionIO(inspector_socket_t* socket) { | ||
if (client_socket_) { | ||
DisconnectAndDisposeIO(socket); | ||
return; | ||
} | ||
client_socket_ = socket; | ||
inspector_read_start(socket, OnBufferAlloc, AgentImpl::OnRemoteDataIO); | ||
platform_->CallOnForegroundThread(parent_env_->isolate(), | ||
new SetConnectedTask(this, true)); | ||
inspector_read_start(socket, OnBufferAlloc, DataCallback); | ||
frontend_session_id_++; | ||
PostIncomingMessage(String16(TAG_CONNECT, sizeof(TAG_CONNECT) - 1)); | ||
} | ||
|
||
void AgentImpl::PostMessages() { | ||
void AgentImpl::DispatchMessages() { | ||
if (dispatching_messages_) | ||
return; | ||
dispatching_messages_ = true; | ||
std::vector<std::string> messages; | ||
SwapBehindLock(&AgentImpl::message_queue_, &messages); | ||
for (auto const& message : messages) | ||
inspector_->dispatchMessageFromFrontend( | ||
String16::fromUTF8(message.c_str(), message.length())); | ||
MessageQueue tasks; | ||
SwapBehindLock(&incoming_message_queue_, &tasks); | ||
for (const MessageQueue::value_type& pair : tasks) { | ||
const String16& message = pair.second; | ||
if (message == TAG_CONNECT) { | ||
CHECK_EQ(false, connected_); | ||
backend_session_id_++; | ||
connected_ = true; | ||
fprintf(stderr, "Debugger attached.\n"); | ||
inspector_->connectFrontend(new ChannelImpl(this)); | ||
} else if (message == TAG_DISCONNECT) { | ||
CHECK(connected_); | ||
connected_ = false; | ||
if (!shutting_down_) | ||
PrintDebuggerReadyMessage(port_); | ||
inspector_->quitMessageLoopOnPause(); | ||
inspector_->disconnectFrontend(); | ||
} else { | ||
inspector_->dispatchMessageFromFrontend(message); | ||
} | ||
} | ||
uv_async_send(&data_written_); | ||
dispatching_messages_ = false; | ||
} | ||
|
||
void AgentImpl::SetConnected(bool connected) { | ||
if (connected_ == connected) | ||
return; | ||
|
||
connected_ = connected; | ||
if (connected) { | ||
fprintf(stderr, "Debugger attached.\n"); | ||
inspector_->connectFrontend(new ChannelImpl(this)); | ||
} else { | ||
if (!shutting_down_) | ||
PrintDebuggerReadyMessage(port_); | ||
inspector_->quitMessageLoopOnPause(); | ||
inspector_->disconnectFrontend(); | ||
} | ||
} | ||
|
||
void AgentImpl::Write(const std::string& message) { | ||
PushPendingMessage(&outgoing_message_queue_, message); | ||
ASSERT_EQ(0, uv_async_send(&io_thread_req_)); | ||
void AgentImpl::Write(int session_id, const String16& message) { | ||
AppendMessage(&outgoing_message_queue_, session_id, message); | ||
int err = uv_async_send(&io_thread_req_); | ||
CHECK_EQ(0, err); | ||
} | ||
|
||
// Exported class Agent | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: why is
socket->data
avoid*
when it always seems to be anAgentImpl*
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just trying to follow the libuv conventions. Technically, inspector_socket.{h,cc} is a partial implementation of the WebSockets protocol - so it can either be removed (if a better option is introduced into Node.js) or extended if other clients within Node.js need the protocol.