diff --git a/src/inspector_agent.cc b/src/inspector_agent.cc index 8ef95305a6f3c0..d8233f41dc2de9 100644 --- a/src/inspector_agent.cc +++ b/src/inspector_agent.cc @@ -17,6 +17,7 @@ #include "libplatform/libplatform.h" #include +#include #include // We need pid to use as ID with Chrome @@ -31,6 +32,9 @@ namespace node { namespace { +const char TAG_CONNECT[] = "#connect"; +const char TAG_DISCONNECT[] = "#disconnect"; + const char DEVTOOLS_PATH[] = "/node"; const char DEVTOOLS_HASH[] = "521e5b7e2b7cc66b4006a8a54cb9c4e57494a5ef"; @@ -154,7 +158,6 @@ bool RespondToGet(inspector_socket_t* socket, const char* path, int port) { namespace inspector { using blink::protocol::DictionaryValue; -using blink::protocol::String16; class AgentImpl { public: @@ -171,24 +174,27 @@ class AgentImpl { void WaitForDisconnect(); private: + using MessageQueue = std::vector>; + static void ThreadCbIO(void* agent); static void OnSocketConnectionIO(uv_stream_t* server, int status); static bool OnInspectorHandshakeIO(inspector_socket_t* socket, enum inspector_handshake_event state, const char* path); - static void OnRemoteDataIO(uv_stream_t* stream, ssize_t read, - const uv_buf_t* b); static void WriteCbIO(uv_async_t* async); void WorkerRunIO(); void OnInspectorConnectionIO(inspector_socket_t* socket); - void PushPendingMessage(std::vector* queue, - const std::string& message); - void SwapBehindLock(std::vector AgentImpl::*queue, - std::vector* output); + void OnRemoteDataIO(inspector_socket_t* stream, ssize_t read, + const uv_buf_t* b); void PostMessages(); void SetConnected(bool connected); - void Write(const std::string& message); + void DispatchMessages(); + void Write(int session_id, const String16& message); + void AppendMessage(MessageQueue* vector, int session_id, + const String16& message); + void SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2); + void PostIncomingMessage(const String16& message); uv_sem_t start_sem_; ConditionVariable pause_cond_; @@ -208,19 +214,28 @@ class AgentImpl { inspector_socket_t* client_socket_; blink::V8Inspector* inspector_; v8::Platform* platform_; - std::vector message_queue_; - std::vector outgoing_message_queue_; + MessageQueue incoming_message_queue_; + MessageQueue outgoing_message_queue_; bool dispatching_messages_; + int frontend_session_id_; + int backend_session_id_; friend class ChannelImpl; friend class DispatchOnInspectorBackendTask; friend class SetConnectedTask; friend class V8NodeInspector; friend void InterruptCallback(v8::Isolate*, void* agent); + friend void DataCallback(uv_stream_t* stream, ssize_t read, + const uv_buf_t* buf); }; void InterruptCallback(v8::Isolate*, void* agent) { - static_cast(agent)->PostMessages(); + static_cast(agent)->DispatchMessages(); +} + +void DataCallback(uv_stream_t* stream, ssize_t read, const uv_buf_t* buf) { + inspector_socket_t* socket = static_cast(stream->data); + static_cast(socket->data)->OnRemoteDataIO(socket, read, buf); } class DispatchOnInspectorBackendTask : public v8::Task { @@ -228,7 +243,7 @@ class DispatchOnInspectorBackendTask : public v8::Task { explicit DispatchOnInspectorBackendTask(AgentImpl* agent) : agent_(agent) {} void Run() override { - agent_->PostMessages(); + agent_->DispatchMessages(); } private: @@ -251,27 +266,12 @@ class ChannelImpl final : public blink::protocol::FrontendChannel { void flushProtocolNotifications() override { } void sendMessageToFrontend(const String16& message) { - agent_->Write(message.utf8()); + agent_->Write(agent_->frontend_session_id_, message); } AgentImpl* const agent_; }; -class SetConnectedTask : public v8::Task { - public: - SetConnectedTask(AgentImpl* agent, bool connected) - : agent_(agent), - connected_(connected) {} - - void Run() override { - agent_->SetConnected(connected_); - } - - private: - AgentImpl* agent_; - bool connected_; -}; - class V8NodeInspector : public blink::V8Inspector { public: V8NodeInspector(AgentImpl* agent, node::Environment* env, @@ -320,7 +320,9 @@ AgentImpl::AgentImpl(Environment* env) : port_(0), client_socket_(nullptr), inspector_(nullptr), platform_(nullptr), - dispatching_messages_(false) { + dispatching_messages_(false), + frontend_session_id_(0), + backend_session_id_(0) { CHECK_EQ(0, uv_sem_init(&start_sem_, 0)); memset(&data_written_, 0, sizeof(data_written_)); memset(&io_thread_req_, 0, sizeof(io_thread_req_)); @@ -355,10 +357,7 @@ void AgentImpl::Start(v8::Platform* platform, int port, bool wait) { uv_sem_wait(&start_sem_); if (wait) { - // Flush messages in case of wait to connect, see OnRemoteDataIO on how it - // should be fixed. - SetConnected(true); - PostMessages(); + DispatchMessages(); } } @@ -424,55 +423,39 @@ bool AgentImpl::OnInspectorHandshakeIO(inspector_socket_t* socket, } } -// static -void AgentImpl::OnRemoteDataIO(uv_stream_t* stream, - ssize_t read, - const uv_buf_t* b) { - inspector_socket_t* socket = static_cast(stream->data); - AgentImpl* agent = static_cast(socket->data); - Mutex::ScopedLock scoped_lock(agent->pause_lock_); +void AgentImpl::OnRemoteDataIO(inspector_socket_t* socket, + ssize_t read, + const uv_buf_t* buf) { + Mutex::ScopedLock scoped_lock(pause_lock_); if (read > 0) { - std::string str(b->base, read); - agent->PushPendingMessage(&agent->message_queue_, str); - free(b->base); - + String16 str = String16::fromUTF8(buf->base, read); + PostIncomingMessage(str); // TODO(pfeldman): Instead of blocking execution while debugger // engages, node should wait for the run callback from the remote client // and initiate its startup. This is a change to node.cc that should be // upstreamed separately. - if (agent->wait_ && str.find("\"Runtime.run\"") != std::string::npos) { - agent->wait_ = false; - uv_sem_post(&agent->start_sem_); + if (wait_ && str.find("\"Runtime.run\"") != std::string::npos) { + wait_ = false; + uv_sem_post(&start_sem_); } - agent->platform_->CallOnForegroundThread(agent->parent_env_->isolate(), - new DispatchOnInspectorBackendTask(agent)); - agent->parent_env_->isolate() - ->RequestInterrupt(InterruptCallback, agent); - uv_async_send(&agent->data_written_); + platform_->CallOnForegroundThread(parent_env_->isolate(), + new DispatchOnInspectorBackendTask(this)); + parent_env_->isolate()->RequestInterrupt(InterruptCallback, this); + uv_async_send(&data_written_); } else if (read <= 0) { // EOF - if (agent->client_socket_ == socket) { - agent->client_socket_ = nullptr; - agent->platform_->CallOnForegroundThread(agent->parent_env_->isolate(), - new SetConnectedTask(agent, false)); - uv_async_send(&agent->data_written_); + if (client_socket_ == socket) { + String16 message(TAG_DISCONNECT, sizeof(TAG_DISCONNECT) - 1); + client_socket_ = nullptr; + PostIncomingMessage(message); } DisconnectAndDisposeIO(socket); } - agent->pause_cond_.Broadcast(scoped_lock); -} - -void AgentImpl::PushPendingMessage(std::vector* queue, - const std::string& message) { - Mutex::ScopedLock scoped_lock(queue_lock_); - queue->push_back(message); -} - -void AgentImpl::SwapBehindLock(std::vector AgentImpl::*queue, - std::vector* output) { - Mutex::ScopedLock scoped_lock(queue_lock_); - (this->*queue).swap(*output); + if (buf) { + free(buf->base); + } + pause_cond_.Broadcast(scoped_lock); } // static @@ -480,11 +463,14 @@ void AgentImpl::WriteCbIO(uv_async_t* async) { AgentImpl* agent = static_cast(async->data); inspector_socket_t* socket = agent->client_socket_; if (socket) { - std::vector outgoing_messages; - agent->SwapBehindLock(&AgentImpl::outgoing_message_queue_, - &outgoing_messages); - for (auto const& message : outgoing_messages) - inspector_write(socket, message.c_str(), message.length()); + MessageQueue outgoing_messages; + agent->SwapBehindLock(&agent->outgoing_message_queue_, &outgoing_messages); + for (const MessageQueue::value_type& outgoing : outgoing_messages) { + if (outgoing.first == agent->frontend_session_id_) { + std::string message = outgoing.second.utf8(); + inspector_write(socket, message.c_str(), message.length()); + } + } } } @@ -518,49 +504,70 @@ void AgentImpl::WorkerRunIO() { uv_run(&child_loop_, UV_RUN_DEFAULT); } +void AgentImpl::AppendMessage(MessageQueue* queue, int session_id, + const String16& message) { + Mutex::ScopedLock scoped_lock(queue_lock_); + queue->push_back(std::make_pair(session_id, message)); +} + +void AgentImpl::SwapBehindLock(MessageQueue* vector1, MessageQueue* vector2) { + Mutex::ScopedLock scoped_lock(queue_lock_); + vector1->swap(*vector2); +} + +void AgentImpl::PostIncomingMessage(const String16& message) { + AppendMessage(&incoming_message_queue_, frontend_session_id_, message); + v8::Isolate* isolate = parent_env_->isolate(); + platform_->CallOnForegroundThread(isolate, + new DispatchOnInspectorBackendTask(this)); + isolate->RequestInterrupt(InterruptCallback, this); + uv_async_send(&data_written_); +} + void AgentImpl::OnInspectorConnectionIO(inspector_socket_t* socket) { if (client_socket_) { DisconnectAndDisposeIO(socket); return; } client_socket_ = socket; - inspector_read_start(socket, OnBufferAlloc, AgentImpl::OnRemoteDataIO); - platform_->CallOnForegroundThread(parent_env_->isolate(), - new SetConnectedTask(this, true)); + inspector_read_start(socket, OnBufferAlloc, DataCallback); + frontend_session_id_++; + PostIncomingMessage(String16(TAG_CONNECT, sizeof(TAG_CONNECT) - 1)); } -void AgentImpl::PostMessages() { +void AgentImpl::DispatchMessages() { if (dispatching_messages_) return; dispatching_messages_ = true; - std::vector messages; - SwapBehindLock(&AgentImpl::message_queue_, &messages); - for (auto const& message : messages) - inspector_->dispatchMessageFromFrontend( - String16::fromUTF8(message.c_str(), message.length())); + MessageQueue tasks; + SwapBehindLock(&incoming_message_queue_, &tasks); + for (const MessageQueue::value_type& pair : tasks) { + const String16& message = pair.second; + if (message == TAG_CONNECT) { + CHECK_EQ(false, connected_); + backend_session_id_++; + connected_ = true; + fprintf(stderr, "Debugger attached.\n"); + inspector_->connectFrontend(new ChannelImpl(this)); + } else if (message == TAG_DISCONNECT) { + CHECK(connected_); + connected_ = false; + if (!shutting_down_) + PrintDebuggerReadyMessage(port_); + inspector_->quitMessageLoopOnPause(); + inspector_->disconnectFrontend(); + } else { + inspector_->dispatchMessageFromFrontend(message); + } + } uv_async_send(&data_written_); dispatching_messages_ = false; } -void AgentImpl::SetConnected(bool connected) { - if (connected_ == connected) - return; - - connected_ = connected; - if (connected) { - fprintf(stderr, "Debugger attached.\n"); - inspector_->connectFrontend(new ChannelImpl(this)); - } else { - if (!shutting_down_) - PrintDebuggerReadyMessage(port_); - inspector_->quitMessageLoopOnPause(); - inspector_->disconnectFrontend(); - } -} - -void AgentImpl::Write(const std::string& message) { - PushPendingMessage(&outgoing_message_queue_, message); - ASSERT_EQ(0, uv_async_send(&io_thread_req_)); +void AgentImpl::Write(int session_id, const String16& message) { + AppendMessage(&outgoing_message_queue_, session_id, message); + int err = uv_async_send(&io_thread_req_); + CHECK_EQ(0, err); } // Exported class Agent