From ed32089fbf17691ee12b9c7cf29e56a8b6b5a56e Mon Sep 17 00:00:00 2001 From: Orion Hodson Date: Wed, 10 May 2023 19:47:09 +0100 Subject: [PATCH 1/2] Support for breakpoint debugging (#371 / EW-7264) This change places the InspectorService in a separate thread that manages communication with over the websocket to the inspector. The change also adds support for runMessageLoopOnPause() and quitMessageLoopOnPause() to support breakpoints and debugger break statements. There is also refactoring of the CDP message handling code so it can be called with or without the isolate lock held. This requires workerd to run with the command-line flag -i to turn on inspector support. This change only works for single service configurations. Support for multi service configurations to follow. To try this out using samples/helloworld as an example: 1) Edit "samples/helloworld/worker.js" and add a debugger statement to the handle(request) method. 2) Open workerd in VSCode, select 'workerd with inspector enabled (dbg)' as the Run and Debug Target panel. Hit F5 to run and select `samples/helloworld/config.capnp` as the config to use. 3) Open devtools in Chrome using either: * https://devtools.devprod.cloudflare.dev/js_app?ws=localhost:9229/main * chrome:://inspect 4) On the command-line run, `curl http://localhost:8080/` 5) Devtools should break into the running worker. Bug: https://github.com/cloudflare/workerd/issues/371 Test: manual Test: existing internal ew tests do not break --- src/workerd/io/worker.c++ | 739 +++++++++++++++++++++------------- src/workerd/io/worker.h | 1 - src/workerd/server/server.c++ | 73 ++-- src/workerd/server/server.h | 7 +- 4 files changed, 511 insertions(+), 309 deletions(-) diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index d0acfe3c7af..3a8c016c5f6 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -449,37 +449,84 @@ public: // We're on a request-serving thread. auto& ioContext = IoContext::current(); timePoint = ioContext.now(); - } else KJ_IF_MAYBE(info, inspectorTimerInfo) { - if (info->threadId == getCurrentThreadId()) { - // We're on an inspector-serving thread. + } else { + auto lockedState = state.lockExclusive(); + KJ_IF_MAYBE(info, lockedState->inspectorTimerInfo) { timePoint = info->timer.now() + info->timerOffset - kj::origin() + kj::UNIX_EPOCH; } + // We're at script startup time -- just return the Epoch. } + return (timePoint - kj::UNIX_EPOCH) / kj::MILLISECONDS; + } - // If we're on neither a request- nor inspector-serving thread, then we're at script startup - // time -- just return the Epoch. + void setInspectorTimerInfo(kj::Timer& timer, kj::Duration timerOffset) { + auto lockedState = state.lockExclusive(); + lockedState->inspectorTimerInfo = InspectorTimerInfo { timer, timerOffset, getCurrentThreadId() }; + } - return (timePoint - kj::UNIX_EPOCH) / kj::MILLISECONDS; + void setChannel(Worker::Isolate::InspectorChannelImpl& channel) { + auto lockedState = state.lockExclusive(); + // There is only one active inspector channel at a time in workerd. The teardown of any + // previous channel should have invalidated `lockedState->channel`. + KJ_REQUIRE(lockedState->channel == nullptr); + lockedState->channel = channel; } - // Nothing else. We ignore everything the inspector tells us, because we only care about the - // devtools inspector protocol, which is handled separately. + void resetChannel() { + auto lockedState = state.lockExclusive(); + lockedState->channel = {}; + } - void setInspectorTimerInfo(kj::Timer& timer, kj::Duration timerOffset) { - // Helper for attachInspector(). - inspectorTimerInfo = InspectorTimerInfo { timer, timerOffset, getCurrentThreadId() }; + void runMessageLoopOnPause(int contextGroupId) override { + // This method is called by v8 when a breakpoint or debugger statement is hit. This method + // processes debugger messages until `Debugger.resume()` is called, when v8 then calls + // `quitMessageLoopOnPause()`. + // + // This method is ultimately called from the `InspectorChannelImpl` and the isolate lock is + // held when this method is called. + auto lockedState = state.lockExclusive(); + KJ_IF_MAYBE(channel, lockedState->channel) { + pauseIncomingMessages(*channel); + runMessageLoop = true; + do { + if (!dispatchOneMessageDuringPause(*channel)) { + break; + } + } while (runMessageLoop); + resumeIncomingMessages(*channel); + } + } + + void quitMessageLoopOnPause() override { + // This method is called by v8 to resume execution after a breakpoint is hit. + runMessageLoop = false; } private: + static void pauseIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel); + static void resumeIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel); + static bool dispatchOneMessageDuringPause(Worker::Isolate::InspectorChannelImpl& channel); + struct InspectorTimerInfo { kj::Timer& timer; kj::Duration timerOffset; uint64_t threadId; }; - kj::Maybe inspectorTimerInfo; - // The timer and offset for the inspector-serving thread. + bool runMessageLoop; + + struct State { + // State that may be set on a thread other than the isolate thread. + // These are typically set in attachInspector when an inspector connection is + // made. + kj::Maybe channel; + // Inspector channel to use to pump messages. + + kj::Maybe inspectorTimerInfo; + // The timer and offset for the inspector-serving thread. + }; + kj::MutexGuarded state; }; void setWebAssemblyModuleHasInstance(jsg::Lock& lock, v8::Local context); @@ -1635,6 +1682,7 @@ Worker::Lock::Lock(const Worker& constWorker, LockType lockType) impl(kj::heap(worker, lockType, stackScope)) { kj::requireOnStack(this, "Worker::Lock MUST be allocated on the stack."); } + Worker::Lock::~Lock() noexcept(false) { // const_cast OK because we hold -- nay, we *are* -- a lock on the script. auto& isolate = const_cast(worker.getIsolate()); @@ -2041,10 +2089,10 @@ private: class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel { public: - InspectorChannelImpl(kj::Own isolateParam, - kj::WebSocket& webSocket) - : webSocket(webSocket), - state(kj::heap(this, kj::mv(isolateParam))) {} + InspectorChannelImpl(kj::Own isolateParam, kj::WebSocket& webSocket) + : ioHandler(webSocket), state(kj::heap(this, kj::mv(isolateParam))) { + ioHandler.connect(*this); + } using InspectorLock = Worker::Lock::TakeSynchronously; // In preview sessions, synchronous locks are not an issue. We declare an alternate spelling of @@ -2052,7 +2100,8 @@ public: // locks. ~InspectorChannelImpl() noexcept try { - KJ_DEFER(outgoingQueueNotifier->clear()); + // Stop message pump. + ioHandler.disconnect(); // Delete session under lock. auto state = this->state.lockExclusive(); @@ -2060,9 +2109,7 @@ public: jsg::V8StackScope stackScope; Isolate::Impl::Lock recordedLock(*state->get()->isolate, InspectorLock(nullptr), stackScope); KJ_IF_MAYBE(p, state->get()->isolate->currentInspectorSession) { - if (p == this) { - const_cast(*state->get()->isolate).currentInspectorSession = nullptr;; - } + const_cast(*state->get()->isolate).disconnectInspector(); } state->get()->teardownUnderLock(); } catch (...) { @@ -2086,193 +2133,135 @@ public: void disconnect() { // Fake like the client requested close. This will cause outgoingLoop() to exit and everything // will be cleaned up. - receivedClose = true; - outgoingQueueNotifier->notify(); + ioHandler.disconnect(); } - kj::Promise outgoingLoop() { - return outgoingQueueNotifier->awaitNotification().then([this]() { - auto messages = kj::mv(*outgoingQueue.lockExclusive()); - auto promise = sendToWebsocket(messages).attach(kj::mv(messages)); + void dispatchProtocolMessage(kj::String message, + v8_inspector::V8InspectorSession& session, + Isolate& isolate, + jsg::V8StackScope& stackScope, + Isolate::Impl::Lock& recordedLock) { + capnp::MallocMessageBuilder messageBuilder; + auto cmd = messageBuilder.initRoot(); + getCdpJsonCodec().decode(message, cmd); - if (receivedClose) { - return promise.then([this]() { - return webSocket.close(1000, "client closed connection"); - }); - } else if (*state.lockShared() == nullptr) { - // Another connection superseded us, or the isolate died. - return promise.then([this]() { - // TODO(soon): What happens if the other side never hangs up? - return webSocket.disconnect(); - }); + switch (cmd.which()) { + case cdp::Command::UNKNOWN: { + break; } + case cdp::Command::NETWORK_ENABLE: { + setNetworkEnabled(true); + cmd.getNetworkEnable().initResult(); + break; + } + case cdp::Command::NETWORK_DISABLE: { + setNetworkEnabled(false); + cmd.getNetworkDisable().initResult(); + break; + } + case cdp::Command::NETWORK_GET_RESPONSE_BODY: { + auto err = cmd.getNetworkGetResponseBody().initError(); + err.setCode(-32600); + err.setMessage("Network.getResponseBody is not supported in this fork"); + break; + } + case cdp::Command::PROFILER_STOP: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto& lock = recordedLock.lock; + stopProfiling(**p, lock->v8Isolate, cmd); + } + break; + } + case cdp::Command::PROFILER_START: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto& lock = recordedLock.lock; + startProfiling(**p, lock->v8Isolate); + } + break; + } + case cdp::Command::PROFILER_SET_SAMPLING_INTERVAL: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto interval = cmd.getProfilerSetSamplingInterval().getParams().getInterval(); + setSamplingInterval(**p, interval); + } + break; + } + case cdp::Command::PROFILER_ENABLE: { + auto& lock = recordedLock.lock; + isolate.impl->profiler = kj::Own( + v8::CpuProfiler::New(lock->v8Isolate, v8::kDebugNaming, v8::kLazyLogging), + CpuProfilerDisposer::instance); + break; + } + case cdp::Command::TAKE_HEAP_SNAPSHOT: { + auto& lock = recordedLock.lock; + auto params = cmd.getTakeHeapSnapshot().getParams(); + takeHeapSnapshot(*lock, + params.getExposeInternals(), + params.getCaptureNumericValue()); + break; + } + } - return promise.then([this]() { return outgoingLoop(); }); - }); - } - - kj::Promise incomingLoop() { - return webSocket.receive().then([this](kj::WebSocket::Message&& message) -> kj::Promise { - KJ_SWITCH_ONEOF(message) { - KJ_CASE_ONEOF(text, kj::String) { - { - capnp::MallocMessageBuilder message; - auto cmd = message.initRoot(); - - getCdpJsonCodec().decode(text, cmd); - - switch (cmd.which()) { - case cdp::Command::UNKNOWN: { - break; - } - case cdp::Command::NETWORK_ENABLE: { - setNetworkEnabled(true); - cmd.getNetworkEnable().initResult(); - break; - } - case cdp::Command::NETWORK_DISABLE: { - setNetworkEnabled(false); - cmd.getNetworkDisable().initResult(); - break; - } - case cdp::Command::NETWORK_GET_RESPONSE_BODY: { - auto err = cmd.getNetworkGetResponseBody().initError(); - err.setCode(-32600); - err.setMessage("Network.getResponseBody is not supported in this fork"); - break; - } - case cdp::Command::PROFILER_STOP: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - stopProfiling(**p, lock.v8Isolate, cmd); - } - break; - } - - case cdp::Command::PROFILER_START: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - startProfiling(**p, lock.v8Isolate); - } - break; - } + if (!cmd.isUnknown()) { + sendNotification(cmd); + return; + } - case cdp::Command::PROFILER_SET_SAMPLING_INTERVAL: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto interval = cmd.getProfilerSetSamplingInterval().getParams().getInterval(); - setSamplingInterval(**p, interval); - } - break; - } - case cdp::Command::PROFILER_ENABLE: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - isolate.impl->profiler = kj::Own( - v8::CpuProfiler::New(lock.v8Isolate, v8::kDebugNaming, v8::kLazyLogging), - CpuProfilerDisposer::instance); - break; - } - case cdp::Command::TAKE_HEAP_SNAPSHOT: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - auto params = cmd.getTakeHeapSnapshot().getParams(); - takeHeapSnapshot(lock, - params.getExposeInternals(), - params.getCaptureNumericValue()); - break; - } - } + auto& lock = recordedLock.lock; - if (!cmd.isUnknown()) { - sendNotification(cmd); - return incomingLoop(); - } - } + // We have at times observed V8 bugs where the inspector queues a background task and + // then synchronously waits for it to complete, which would deadlock if background + // threads are disallowed. Since the inspector is in a process sandbox anyway, it's not + // a big deal to just permit those background threads. + AllowV8BackgroundThreadsScope allowBackgroundThreads; - auto state = this->state.lockExclusive(); + kj::Maybe maybeLimitError; + { + auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(*lock, maybeLimitError); + session.dispatchProtocolMessage(toStringView(message)); + } - // const_cast OK because we're going to lock it - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; + // Run microtasks in case the user made an async call. + if (maybeLimitError == nullptr) { + auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(*lock, maybeLimitError); + lock->v8Isolate->PerformMicrotaskCheckpoint(); + } else { + // Oops, we already exceeded the limit, so force the microtask queue to be thrown away. + lock->v8Isolate->TerminateExecution(); + lock->v8Isolate->PerformMicrotaskCheckpoint(); + } - // We have at times observed V8 bugs where the inspector queues a background task and - // then synchronously waits for it to complete, which would deadlock if background - // threads are disallowed. Since the inspector is in a process sandbox anyway, it's not - // a big deal to just permit those background threads. - AllowV8BackgroundThreadsScope allowBackgroundThreads; + KJ_IF_MAYBE(limitError, maybeLimitError) { + v8::HandleScope scope(lock->v8Isolate); - kj::Maybe maybeLimitError; - { - auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(lock, maybeLimitError); - state->get()->session->dispatchProtocolMessage(toStringView(text)); - } + // HACK: We want to print the error, but we need a context to do that. + // We don't know which contexts exist in this isolate, so I guess we have to + // create one. Ugh. + auto dummyContext = v8::Context::New(lock->v8Isolate); + auto& inspector = *KJ_ASSERT_NONNULL(isolate.impl->inspector); + inspector.contextCreated( + v8_inspector::V8ContextInfo(dummyContext, 1, v8_inspector::StringView( + reinterpret_cast("Worker"), 6))); + sendExceptionToInspector(inspector, dummyContext, + jsg::extractTunneledExceptionDescription(limitError->getDescription())); + inspector.contextDestroyed(dummyContext); + } - // Run microtasks in case the user made an async call. - if (maybeLimitError == nullptr) { - auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(lock, maybeLimitError); - lock.v8Isolate->PerformMicrotaskCheckpoint(); - } else { - // Oops, we already exceeded the limit, so force the microtask queue to be thrown away. - lock.v8Isolate->TerminateExecution(); - lock.v8Isolate->PerformMicrotaskCheckpoint(); - } + if (recordedLock.checkInWithLimitEnforcer(isolate)) { + disconnect(); + } + } - KJ_IF_MAYBE(limitError, maybeLimitError) { - v8::HandleScope scope(lock.v8Isolate); - - // HACK: We want to print the error, but we need a context to do that. - // We don't know which contexts exist in this isolate, so I guess we have to - // create one. Ugh. - auto dummyContext = v8::Context::New(lock.v8Isolate); - auto& inspector = *KJ_ASSERT_NONNULL(isolate.impl->inspector); - inspector.contextCreated( - v8_inspector::V8ContextInfo(dummyContext, 1, v8_inspector::StringView( - reinterpret_cast("Worker"), 6))); - sendExceptionToInspector(inspector, dummyContext, - jsg::extractTunneledExceptionDescription(limitError->getDescription())); - inspector.contextDestroyed(dummyContext); - } + kj::Promise messagePump() { + return ioHandler.messagePump(); + } - if (recordedLock.checkInWithLimitEnforcer(isolate)) { - disconnect(); - } - } - KJ_CASE_ONEOF(bytes, kj::Array) { - // ignore - } - KJ_CASE_ONEOF(close, kj::WebSocket::Close) { - // all done - receivedClose = true; - outgoingQueueNotifier->notify(); - - // The outgoing loop will wake up and will exit out. It is exclusively joined with the - // incoming loop, so we'll be canceled there. We use NEVER_DONE here to make sure we - // don't inadvertently cancel the outgoing loop. - return kj::NEVER_DONE; - } - } - return incomingLoop(); - }); + void dispatchProtocolMessages(kj::ArrayPtr messages) { + auto lockedState = this->state.lockExclusive(); + for (auto& message : messages) { + dispatchProtocolMessage(lockedState, kj::mv(message)); + } } // --------------------------------------------------------------------------- @@ -2294,12 +2283,7 @@ public: } void sendNotification(kj::String message) { - outgoingQueue.lockExclusive()->add(kj::mv(message)); - outgoingQueueNotifier->notify(); - - // TODO(someday): Should we implement some sort of backpressure if the queue gets large? Will - // need to be careful about deadlock if so, since presumably the isolate is locked during - // these callbacks. + ioHandler.send(kj::mv(message)); } template @@ -2316,8 +2300,237 @@ public: // delay signaling the outgoing loop until this call? } + void pauseIncomingMessages(); + // Stops the delivery of CDP messages on the I/O worker thread. Called when the isolate hits a + // breakpoint or debugger statement. + + void resumeIncomingMessages(); + // Resumes delivery of CDP messages on the I/O worker thread. Called when execution resumes after + // a breakpoint or debugger statement. + + bool dispatchOneMessageDuringPause(); + // Dispatches one message whilst automatic CDP messages on the I/O worker thread is paused, called + // on the thread executing the isolate whilst execution is suspended due to a breakpoint or + // debugger statement. + private: - kj::WebSocket& webSocket; + class WebSocketIoHandler final { + // Class that manages the I/O for devtools connections. I/O is performed on the + // thread associated with the InspectorService (the thread that calls attachInspector). + // Most of the public API is intended for code running on the isolate thread, such as + // the InspectorChannelImpl and the InspectorClient. + public: + WebSocketIoHandler(kj::WebSocket& webSocket) + : webSocket(webSocket) { + // Assume we are being instantiated on the InspectorService thread, the thread that will do + // I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread. + outgoingQueueNotifier = kj::atomicRefcounted(kj::getCurrentThreadExecutor()); + } + + ~WebSocketIoHandler() noexcept(false) { + outgoingQueueNotifier->clear(); + } + + void connect(InspectorChannelImpl& inspectorChannel) { + // Sets the channel that messages are delivered to. + channel = inspectorChannel; + } + + void disconnect() { + channel = {}; + shutdown(); + } + + void pauseIncomingMessages() { + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + if (lockedIncomingQueue->status == MessageQueue::Status::CLOSED) { + return; + } + lockedIncomingQueue->status = MessageQueue::Status::PAUSED; + } + + void resumeIncomingMessages() { + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + if (lockedIncomingQueue->status == MessageQueue::Status::CLOSED) { + return; + } + lockedIncomingQueue->status = MessageQueue::Status::ACTIVE; + deliverProtocolMessages(channel, lockedIncomingQueue); + } + + kj::Maybe waitForMessage() { + // Blocked the current thread until a message arrives. This is intended + // for use in the InspectorClient when breakpoints are hit. The InspectorClient + // has to remain in runMessageLoopOnPause() but still receive CDP messages + // (e.g. resume). + return incomingQueue.when( + [](const MessageQueue& incomingQueue) { + return (incomingQueue.head < incomingQueue.messages.size() || + incomingQueue.status == MessageQueue::Status::CLOSED); + }, + [](MessageQueue& incomingQueue) -> kj::Maybe { + if (incomingQueue.status == MessageQueue::Status::CLOSED) return {}; + return pollMessage(incomingQueue); + }); + } + + kj::Promise messagePump() { + // Message pumping promise that should be evaluated on the InspectorService + // thread. + return incomingLoop().exclusiveJoin(outgoingLoop()); + } + + void send(kj::String message) { + auto lockedOutgoingQueue = outgoingQueue.lockExclusive(); + if (lockedOutgoingQueue->status == MessageQueue::Status::CLOSED) return; + lockedOutgoingQueue->messages.add(kj::mv(message)); + outgoingQueueNotifier->notify(); + } + + private: + struct MessageQueue { + kj::Vector messages; + size_t head; + enum class Status { ACTIVE, PAUSED, CLOSED } status; + }; + + static kj::Maybe pollMessage(MessageQueue& messageQueue) { + if (messageQueue.head < messageQueue.messages.size()) { + kj::String message = kj::mv(messageQueue.messages[messageQueue.head++]); + if (messageQueue.head == messageQueue.messages.size()) { + messageQueue.head = 0; + messageQueue.messages.clear(); + } + return kj::mv(message); + } + return {}; + } + + static void deliverProtocolMessages(kj::Maybe& channel, + kj::Locked& incomingQueue) { + KJ_REQUIRE(incomingQueue->status == MessageQueue::Status::ACTIVE); + KJ_IF_MAYBE(c, channel) { + kj::ArrayPtr messages = incomingQueue->messages.slice( + incomingQueue->head, incomingQueue->messages.size()); + c->dispatchProtocolMessages(messages); + incomingQueue->messages.clear(); + incomingQueue->head = 0; + } + } + + void shutdown() { + // Drain incoming queue, the isolate thread may be waiting on it + // on will notice it is closed if woken without any messages to + // deliver in WebSocketIoWorker::waitForMessage(). + { + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + lockedIncomingQueue->head = 0; + lockedIncomingQueue->messages.clear(); + lockedIncomingQueue->status = MessageQueue::Status::CLOSED; + } + { + auto lockedOutgoingQueue = outgoingQueue.lockExclusive(); + lockedOutgoingQueue->status = MessageQueue::Status::CLOSED; + } + // Wake any waiters since queue status fields have been updated. + outgoingQueueNotifier->notify(); + } + + kj::Promise incomingLoop() { + for (;;) { + auto message = co_await webSocket.receive(); + KJ_SWITCH_ONEOF(message) { + KJ_CASE_ONEOF(text, kj::String) { + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + lockedIncomingQueue->messages.add(kj::mv(text)); + if (lockedIncomingQueue->status == MessageQueue::Status::ACTIVE) { + deliverProtocolMessages(channel, lockedIncomingQueue); + } + } + KJ_CASE_ONEOF(blob, kj::Array){ + // Ignore. + } + KJ_CASE_ONEOF(close, kj::WebSocket::Close) { + shutdown(); + } + } + } + } + + kj::Promise outgoingLoop() { + for (;;) { + co_await outgoingQueueNotifier->awaitNotification(); + try { + auto lockedOutgoingQueue = outgoingQueue.lockExclusive(); + auto messages = kj::mv(lockedOutgoingQueue->messages); + bool receivedClose = lockedOutgoingQueue->status == MessageQueue::Status::CLOSED; + lockedOutgoingQueue.release(); + co_await sendToWebSocket(kj::mv(messages)); + if (receivedClose) { + co_await webSocket.close(1000, "client closed connection"); + co_return; + } + } catch (kj::Exception& e) { + shutdown(); + throw; + } + } + } + + kj::Promise sendToWebSocket(kj::Vector messages) { + for (auto& message : messages) { + co_await webSocket.send(message); + } + } + + class XThreadNotifier final: public kj::AtomicRefcounted { + // Class encapsulating the ability to notify the inspector thread from other threads when + // messages are pushed to the outgoing queue. + // + // TODO(cleanup): This could be a lot simpler if only it were possible to cancel + // an executor.executeAsync() promise from an arbitrary thread. Then, if the inspector + // session was destroyed in its thread while a cross-thread notification was in-flight, it + // could cancel that notification directly. + public: + XThreadNotifier(const kj::Executor& executor) : executor(executor) { } + + void clear() { + // Must call in main thread before it drops its reference. + paf = nullptr; + } + + kj::Promise awaitNotification() { + return kj::mv(KJ_ASSERT_NONNULL(paf).promise).then([this]() { + paf = kj::newPromiseAndFulfiller(); + }); + } + + void notify() const { + executor.executeAsync([ref = kj::atomicAddRef(*this)]() { + KJ_IF_MAYBE(p, ref->paf) { + p->fulfiller->fulfill(); + } + }).detach([](kj::Exception&& exception) { + KJ_LOG(ERROR, exception); + }); + } + + private: + const kj::Executor& executor; + mutable kj::Maybe> paf = kj::newPromiseAndFulfiller(); + // Accessed only in notifier's owning thread. + }; + + kj::MutexGuarded incomingQueue; + kj::MutexGuarded outgoingQueue; + kj::Own outgoingQueueNotifier; + + kj::WebSocket& webSocket; // only accessed on the InspectorService thread. + std::atomic_bool receivedClose; // accessed on any thread (only transitions false -> true). + kj::Maybe channel; // only accessed on the isolate thread. + }; + + WebSocketIoHandler ioHandler; void takeHeapSnapshot(jsg::Lock& js, bool exposeInternals, bool captureNumericValue) { struct Activity: public v8::ActivityControl { @@ -2410,75 +2623,63 @@ private: kj::MutexGuarded> state; // Mutex ordering: You must lock this *before* locking the isolate. - class XThreadNotifier final: public kj::AtomicRefcounted { - // Class encapsulating the ability to notify the inspector thread from other threads when - // messages are pushed to the outgoing queue. - // - // TODO(cleanup): This could be a lot simpler if only it were possible to cancel - // an executor.executeAsync() promise from an arbitrary thread. Then, if the inspector - // session was destroyed in its thread while a cross-thread notification was in-flight, it - // could cancel that notification directly. - public: - void clear() { - // Must call in main thread before it drops its reference. - paf = nullptr; - } + volatile bool networkEnabled = false; + // Not under `state` lock due to lock ordering complications. - kj::Promise awaitNotification() { - return kj::mv(KJ_ASSERT_NONNULL(paf).promise).then([this]() { - paf = kj::newPromiseAndFulfiller(); - __atomic_store_n(&inFlight, false, __ATOMIC_RELAXED); - }); - } + void dispatchProtocolMessage( + kj::Locked>& lockedState, + kj::String message) { + v8_inspector::V8InspectorSession& session = *lockedState->get()->session; + // const_cast OK because we're going to lock it + Isolate& isolate = const_cast(*lockedState->get()->isolate); + jsg::V8StackScope stackScope; + Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); + dispatchProtocolMessage(kj::mv(message), session, isolate, stackScope, recordedLock); + } +}; - void notify() const { - // TODO(perf): Figure out why this commented-out optimization sometimes randomly misses - // messages, particularly under load. - // if (__atomic_exchange_n(&inFlight, true, __ATOMIC_RELAXED)) { - // // A notifciation is already in-flight, no need to send another one. - // } else { - executor.executeAsync([ref = kj::atomicAddRef(*this)]() { - KJ_IF_MAYBE(p, ref->paf) { - p->fulfiller->fulfill(); - } - }).detach([](kj::Exception&& exception) { - KJ_LOG(ERROR, exception); - }); - // } - } +void Worker::Isolate::InspectorChannelImpl::pauseIncomingMessages() { + ioHandler.pauseIncomingMessages(); +} - private: - const kj::Executor& executor = kj::getCurrentThreadExecutor(); +void Worker::Isolate::InspectorChannelImpl::resumeIncomingMessages() { + ioHandler.resumeIncomingMessages(); +} - mutable kj::Maybe> paf = kj::newPromiseAndFulfiller(); - // Accessed only in notifier's owning thread. +bool Worker::Isolate::InspectorChannelImpl::dispatchOneMessageDuringPause() { + auto maybeMessage = ioHandler.waitForMessage(); - mutable bool inFlight = false; - // Is a notification already in-flight? - }; + // We can be paused by either hitting a debugger statement in a script or from hitting + // a breakpoint or someone hit break. - kj::Own outgoingQueueNotifier = kj::atomicRefcounted(); - // Whenever another thread adds messages to the outgoing queue, it notifies the inspector - // connection thread using this. + KJ_IF_MAYBE(message, maybeMessage) { + auto lockedState = this->state.lockExclusive(); + // Received a message whilst script is running, probably in a breakpoint. + v8_inspector::V8InspectorSession& session = *lockedState->get()->session; + // const_cast OK because the IoContext has the lock. + Isolate& isolate = const_cast(*lockedState->get()->isolate); + Worker::Lock& workerLock = IoContext::current().getCurrentLock(); + Isolate::Impl::Lock& recordedLock = workerLock.impl->recordedLock; + jsg::V8StackScope stackScope; + dispatchProtocolMessage(kj::mv(*message), session, isolate, stackScope, recordedLock); + return true; + } else { + // No message from waitForMessage() implies the connection is broken. + return false; + } +} - kj::MutexGuarded> outgoingQueue; - bool receivedClose = false; +void Worker::InspectorClient::pauseIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel) { + channel.pauseIncomingMessages(); +} - volatile bool networkEnabled = false; - // Not under `state` lock due to lock ordering complications. +void Worker::InspectorClient::resumeIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel) { + channel.resumeIncomingMessages(); +} - kj::Promise sendToWebsocket(kj::ArrayPtr messages) { - if (messages.size() == 0) { - return kj::READY_NOW; - } else { - auto first = kj::mv(messages[0]); - auto rest = messages.slice(1, messages.size()); - return webSocket.send(first).attach(kj::mv(first)).then([this, rest]() mutable { - return sendToWebsocket(rest); - }); - } - } -}; +bool Worker::InspectorClient::dispatchOneMessageDuringPause(Worker::Isolate::InspectorChannelImpl& channel) { + return channel.dispatchOneMessageDuringPause(); +} kj::Promise Worker::Isolate::attachInspector( kj::Timer& timer, @@ -2514,12 +2715,12 @@ kj::Promise Worker::Isolate::attachInspector( // just not. lockedSelf.disconnectInspector(); - auto channel = kj::heap( - kj::atomicAddRef(*this), webSocket); - lockedSelf.currentInspectorSession = *channel; - lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset); + auto channel = kj::heap(kj::atomicAddRef(*this), webSocket); + lockedSelf.currentInspectorSession = *channel; + lockedSelf.impl->inspectorClient.setChannel(*channel); + // Send any queued notifications. { v8::HandleScope handleScope(lock.v8Isolate); @@ -2528,19 +2729,17 @@ kj::Promise Worker::Isolate::attachInspector( } lockedSelf.impl->queuedNotifications.clear(); } - - return channel->incomingLoop() - .exclusiveJoin(channel->outgoingLoop()) - .attach(kj::mv(channel)); + return channel->messagePump().attach(kj::mv(channel)); } void Worker::Isolate::disconnectInspector() { // If an inspector session is connected, proactively drop it, so as to force it to drop its // reference on the script, so that the script can be deleted. - KJ_IF_MAYBE(current, currentInspectorSession) { current->disconnect(); + currentInspectorSession = {}; } + impl->inspectorClient.resetChannel(); } void Worker::Isolate::logWarning(kj::StringPtr description, Lock& lock) { diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index ef72120c3f9..5028d6f9f1e 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -451,7 +451,6 @@ class Worker::Isolate: public kj::AtomicRefcounted { kj::Own traceAsyncContextKey; friend class Worker; - friend class IsolateChannelImpl; }; class Worker::ApiIsolate { diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index fba6b3b38d8..c59e004e3a6 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1146,11 +1146,6 @@ private: friend class Registration; }; -kj::Own Server::makeInspectorService( - kj::HttpHeaderTable::Builder& headerTableBuilder) { - return kj::heap(timer, headerTableBuilder); -} - // ======================================================================================= class Server::WorkerService final: public Service, private kj::TaskSet::ErrorHandler, @@ -1969,6 +1964,7 @@ static kj::Maybe createBinding( "the schema?")); } +void startInspector(kj::StringPtr inspectorAddress, kj::StringPtr name, Worker::Isolate* isolate); kj::Own Server::makeWorker(kj::StringPtr name, config::Worker::Reader conf, capnp::List::Reader extensions) { @@ -2058,20 +2054,22 @@ kj::Own Server::makeWorker(kj::StringPtr name, config::Worker:: auto limitEnforcer = kj::heap(); auto api = kj::heap(globalContext->v8System, featureFlags.asReader(), *limitEnforcer, kj::atomicAddRef(*observer)); + auto inspectorPolicy = Worker::Isolate::InspectorPolicy::DISALLOW; + KJ_IF_MAYBE(inspector, inspectorOverride) { + // For workerd, if the inspector is enabled, it is always fully trusted. + inspectorPolicy = Worker::Isolate::InspectorPolicy::ALLOW_FULLY_TRUSTED; + } auto isolate = kj::atomicRefcounted( kj::mv(api), kj::mv(observer), name, kj::mv(limitEnforcer), - // For workerd, if the inspector is enabled, it is always fully trusted. - maybeInspectorService != nullptr ? - Worker::Isolate::InspectorPolicy::ALLOW_FULLY_TRUSTED : - Worker::Isolate::InspectorPolicy::DISALLOW); + inspectorPolicy); // If we are using the inspector, we need to register the Worker::Isolate // with the inspector service. - KJ_IF_MAYBE(inspector, maybeInspectorService) { - (*inspector)->registerIsolate(name, isolate.get()); + KJ_IF_MAYBE(inspector, inspectorOverride) { + startInspector(*inspector, name, isolate.get()); } auto script = isolate->newScript( @@ -2480,33 +2478,42 @@ void Server::startAlarmScheduler(config::Config::Reader config) { .attach(kj::mv(vfs)); } -void Server::startServices(jsg::V8System& v8System, config::Config::Reader config, - kj::HttpHeaderTable::Builder& headerTableBuilder, - kj::ForkedPromise& forkedDrainWhen) { - // --------------------------------------------------------------------------- - // Configure inspector. - static constexpr uint DEFAULT_PORT = 9229; - - static auto constexpr listen = [](kj::Network& network, - kj::StringPtr address, - InspectorService& inspectorService) - -> kj::Promise { - auto parsed = co_await network.parseAddress(address, DEFAULT_PORT); - auto listener = parsed->listen(); - KJ_LOG(INFO, "Inspector is listening"); - co_await inspectorService.listen(kj::mv(listener)); - }; +void startInspector(kj::StringPtr inspectorAddress, + Server::InspectorServiceIsolateRegistrar& registrar) { + // Configure and start the inspector socket. + kj::Thread thread([inspectorAddress, name, isolate](){ + kj::AsyncIoContext io = kj::setupAsyncIo(); + + kj::HttpHeaderTable::Builder headerTableBuilder; - KJ_IF_MAYBE(inspector, inspectorOverride) { // Create the special inspector service. - auto& inspectorService = *maybeInspectorService.emplace( - makeInspectorService(headerTableBuilder)); + kj::Own inspectorService(kj::heap(io.provider->getTimer(), headerTableBuilder)); + auto ownHeaderTable = headerTableBuilder.build(); + + inspectorService->registerIsolate(name, isolate); // Configure and start the inspector socket. - tasks.add(listen(network, *inspector, inspectorService) - .exclusiveJoin(forkedDrainWhen.addBranch())); - } + static constexpr uint DEFAULT_PORT = 9229; + + auto& network = io.provider->getNetwork(); + auto inspectorListener = network.parseAddress(inspectorAddress, DEFAULT_PORT) + .then([](kj::Own parsed) { + return parsed->listen(); + }); + + auto listen = inspectorListener.then([&inspectorService](kj::Own listener) mutable { + KJ_LOG(INFO, "Inspector is listening"); + return inspectorService->listen(kj::mv(listener)); + }); + kj::NEVER_DONE.wait(io.waitScope); + }); + thread.detach(); +} + +void Server::startServices(jsg::V8System& v8System, config::Config::Reader config, + kj::HttpHeaderTable::Builder& headerTableBuilder, + kj::ForkedPromise& forkedDrainWhen) { // --------------------------------------------------------------------------- // Configure services diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 6234f4caab2..134fa7dc5f2 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -74,6 +74,8 @@ class Server: private kj::TaskSet::ErrorHandler { struct Ephemeral {}; using ActorConfig = kj::OneOf; + class InspectorService; + private: kj::Filesystem& fs; kj::Timer& timer; @@ -175,11 +177,6 @@ class Server: private kj::TaskSet::ErrorHandler { class WorkerEntrypointService; class HttpListener; - class InspectorService; - - kj::Maybe> maybeInspectorService; - kj::Own makeInspectorService(kj::HttpHeaderTable::Builder& headerTableBuilder); - void startServices(jsg::V8System& v8System, config::Config::Reader config, kj::HttpHeaderTable::Builder& headerTableBuilder, kj::ForkedPromise& forkedDrainWhen); From 45a87799754eb755fe1bbd9bfcd9845d6cdc2a42 Mon Sep 17 00:00:00 2001 From: Orion Hodson Date: Mon, 3 Jul 2023 15:02:43 +0100 Subject: [PATCH 2/2] Add support for debugging multiple workers If there are multiple service workers in a config, expose them to devtools. This is largely just for Chrome devtools since wrangler does not expect to generate more than one worker per config that might need to be debugged. Test: manual using chrome devtools and https://bitbucket.cfdata.org/users/bcoll/repos/workerd-bus-error-10/browse --- src/workerd/server/server.c++ | 92 ++++++++++++++++++++++++++++++----- src/workerd/server/server.h | 2 + 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index c59e004e3a6..f3df0555273 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -969,6 +969,38 @@ kj::Own Server::makeDiskDirectoryService( // ======================================================================================= +class Server::InspectorServiceIsolateRegistrar final { + // This class exists to update the InspectorService's table of isolates when a config + // has multiple services. The InspectorService exists on the stack of it's own thread and + // initializes state that is bound to the thread, e.g. a http server and an event loop. + // This class provides a small thread-safe interface to the InspectorService so : + // mappings can be added after the InspectorService has started. + // + // The CloudFlare devtools only show the first service in workerd configuration. This service + // is always contains a users code. However, in packaging user code wrangler may add + // additional services that also have code. If using Chrome devtools to inspect a workerd, + // instance all services are visible and can be debugged. + +public: + InspectorServiceIsolateRegistrar() {} + ~InspectorServiceIsolateRegistrar() noexcept(true); + + void registerIsolate(kj::StringPtr name, Worker::Isolate* isolate); + + KJ_DISALLOW_COPY_AND_MOVE(InspectorServiceIsolateRegistrar); +private: + void attach(const Server::InspectorService* anInspectorService) { + *inspectorService.lockExclusive() = anInspectorService; + } + + void detach() { + *inspectorService.lockExclusive() = nullptr; + } + + kj::MutexGuarded inspectorService; + friend class Server::InspectorService; +}; + class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler { // Implements the interface for the devtools inspector protocol. // @@ -977,12 +1009,26 @@ class Server::InspectorService final: public kj::HttpService, public kj::HttpSer public: InspectorService( kj::Timer& timer, - kj::HttpHeaderTable::Builder& headerTableBuilder) + kj::HttpHeaderTable::Builder& headerTableBuilder, + InspectorServiceIsolateRegistrar& registrar) : timer(timer), headerTable(headerTableBuilder.getFutureTable()), server(timer, headerTable, *this, kj::HttpServerSettings { .errorHandler = *this - }) {} + }), + registrar(registrar) { + registrar.attach(this); + } + + ~InspectorService() { + KJ_IF_MAYBE(r, registrar) { + r->detach(); + } + } + + void invalidateRegistrar() { + registrar = nullptr; + } kj::Promise handleApplicationError( kj::Exception exception, kj::Maybe response) override { @@ -1045,6 +1091,7 @@ public: } } + KJ_LOG(INFO, kj::str("Unknown worker session [", id, "]")); return response.sendError(404, "Unknown worker session", responseHeaders); } @@ -1142,10 +1189,26 @@ private: kj::HttpHeaderTable& headerTable; kj::HashMap> isolates; kj::HttpServer server; - - friend class Registration; + kj::Maybe registrar; }; +Server::InspectorServiceIsolateRegistrar::~InspectorServiceIsolateRegistrar() noexcept(true) { + auto lockedInspectorService = this->inspectorService.lockExclusive(); + if (lockedInspectorService != nullptr) { + auto is = const_cast(*lockedInspectorService); + is->invalidateRegistrar(); + } +} + +void Server::InspectorServiceIsolateRegistrar::registerIsolate(kj::StringPtr name, + Worker::Isolate* isolate) { + auto lockedInspectorService = this->inspectorService.lockExclusive(); + if (lockedInspectorService != nullptr) { + auto is = const_cast(*lockedInspectorService); + is->registerIsolate(name, isolate); + } +} + // ======================================================================================= class Server::WorkerService final: public Service, private kj::TaskSet::ErrorHandler, @@ -1964,7 +2027,7 @@ static kj::Maybe createBinding( "the schema?")); } -void startInspector(kj::StringPtr inspectorAddress, kj::StringPtr name, Worker::Isolate* isolate); +void startInspector(kj::StringPtr inspectorAddress, Server::InspectorServiceIsolateRegistrar& registrar); kj::Own Server::makeWorker(kj::StringPtr name, config::Worker::Reader conf, capnp::List::Reader extensions) { @@ -2068,8 +2131,8 @@ kj::Own Server::makeWorker(kj::StringPtr name, config::Worker:: // If we are using the inspector, we need to register the Worker::Isolate // with the inspector service. - KJ_IF_MAYBE(inspector, inspectorOverride) { - startInspector(*inspector, name, isolate.get()); + KJ_IF_MAYBE(isolateRegistrar, inspectorIsolateRegistrar) { + (*isolateRegistrar)->registerIsolate(name, isolate.get()); } auto script = isolate->newScript( @@ -2481,17 +2544,16 @@ void Server::startAlarmScheduler(config::Config::Reader config) { void startInspector(kj::StringPtr inspectorAddress, Server::InspectorServiceIsolateRegistrar& registrar) { // Configure and start the inspector socket. - kj::Thread thread([inspectorAddress, name, isolate](){ + kj::Thread thread([inspectorAddress, ®istrar](){ kj::AsyncIoContext io = kj::setupAsyncIo(); kj::HttpHeaderTable::Builder headerTableBuilder; // Create the special inspector service. - kj::Own inspectorService(kj::heap(io.provider->getTimer(), headerTableBuilder)); + auto inspectorService( + kj::heap(io.provider->getTimer(), headerTableBuilder, registrar)); auto ownHeaderTable = headerTableBuilder.build(); - inspectorService->registerIsolate(name, isolate); - // Configure and start the inspector socket. static constexpr uint DEFAULT_PORT = 9229; @@ -2577,6 +2639,14 @@ void Server::startServices(jsg::V8System& v8System, config::Config::Reader confi }); } + // If we are using the inspector, we need to register the Worker::Isolate + // with the inspector service. + KJ_IF_MAYBE(inspectorAddress, inspectorOverride) { + auto registrar = kj::heap(); + startInspector(*inspectorAddress, *registrar); + inspectorIsolateRegistrar = kj::mv(registrar); + } + // Second pass: Build services. for (auto serviceConf: config.getServices()) { kj::StringPtr name = serviceConf.getName(); diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 134fa7dc5f2..68b7a7ae766 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -75,6 +75,7 @@ class Server: private kj::TaskSet::ErrorHandler { using ActorConfig = kj::OneOf; class InspectorService; + class InspectorServiceIsolateRegistrar; private: kj::Filesystem& fs; @@ -94,6 +95,7 @@ class Server: private kj::TaskSet::ErrorHandler { // code that parses strings from the config file. kj::Maybe inspectorOverride; + kj::Maybe> inspectorIsolateRegistrar; kj::Maybe> controlOverride; struct GlobalContext;