diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index d0acfe3c7af..3a8c016c5f6 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -449,37 +449,84 @@ public: // We're on a request-serving thread. auto& ioContext = IoContext::current(); timePoint = ioContext.now(); - } else KJ_IF_MAYBE(info, inspectorTimerInfo) { - if (info->threadId == getCurrentThreadId()) { - // We're on an inspector-serving thread. + } else { + auto lockedState = state.lockExclusive(); + KJ_IF_MAYBE(info, lockedState->inspectorTimerInfo) { timePoint = info->timer.now() + info->timerOffset - kj::origin() + kj::UNIX_EPOCH; } + // We're at script startup time -- just return the Epoch. } + return (timePoint - kj::UNIX_EPOCH) / kj::MILLISECONDS; + } - // If we're on neither a request- nor inspector-serving thread, then we're at script startup - // time -- just return the Epoch. + void setInspectorTimerInfo(kj::Timer& timer, kj::Duration timerOffset) { + auto lockedState = state.lockExclusive(); + lockedState->inspectorTimerInfo = InspectorTimerInfo { timer, timerOffset, getCurrentThreadId() }; + } - return (timePoint - kj::UNIX_EPOCH) / kj::MILLISECONDS; + void setChannel(Worker::Isolate::InspectorChannelImpl& channel) { + auto lockedState = state.lockExclusive(); + // There is only one active inspector channel at a time in workerd. The teardown of any + // previous channel should have invalidated `lockedState->channel`. + KJ_REQUIRE(lockedState->channel == nullptr); + lockedState->channel = channel; } - // Nothing else. We ignore everything the inspector tells us, because we only care about the - // devtools inspector protocol, which is handled separately. + void resetChannel() { + auto lockedState = state.lockExclusive(); + lockedState->channel = {}; + } - void setInspectorTimerInfo(kj::Timer& timer, kj::Duration timerOffset) { - // Helper for attachInspector(). - inspectorTimerInfo = InspectorTimerInfo { timer, timerOffset, getCurrentThreadId() }; + void runMessageLoopOnPause(int contextGroupId) override { + // This method is called by v8 when a breakpoint or debugger statement is hit. This method + // processes debugger messages until `Debugger.resume()` is called, when v8 then calls + // `quitMessageLoopOnPause()`. + // + // This method is ultimately called from the `InspectorChannelImpl` and the isolate lock is + // held when this method is called. + auto lockedState = state.lockExclusive(); + KJ_IF_MAYBE(channel, lockedState->channel) { + pauseIncomingMessages(*channel); + runMessageLoop = true; + do { + if (!dispatchOneMessageDuringPause(*channel)) { + break; + } + } while (runMessageLoop); + resumeIncomingMessages(*channel); + } + } + + void quitMessageLoopOnPause() override { + // This method is called by v8 to resume execution after a breakpoint is hit. + runMessageLoop = false; } private: + static void pauseIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel); + static void resumeIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel); + static bool dispatchOneMessageDuringPause(Worker::Isolate::InspectorChannelImpl& channel); + struct InspectorTimerInfo { kj::Timer& timer; kj::Duration timerOffset; uint64_t threadId; }; - kj::Maybe inspectorTimerInfo; - // The timer and offset for the inspector-serving thread. + bool runMessageLoop; + + struct State { + // State that may be set on a thread other than the isolate thread. + // These are typically set in attachInspector when an inspector connection is + // made. + kj::Maybe channel; + // Inspector channel to use to pump messages. + + kj::Maybe inspectorTimerInfo; + // The timer and offset for the inspector-serving thread. + }; + kj::MutexGuarded state; }; void setWebAssemblyModuleHasInstance(jsg::Lock& lock, v8::Local context); @@ -1635,6 +1682,7 @@ Worker::Lock::Lock(const Worker& constWorker, LockType lockType) impl(kj::heap(worker, lockType, stackScope)) { kj::requireOnStack(this, "Worker::Lock MUST be allocated on the stack."); } + Worker::Lock::~Lock() noexcept(false) { // const_cast OK because we hold -- nay, we *are* -- a lock on the script. auto& isolate = const_cast(worker.getIsolate()); @@ -2041,10 +2089,10 @@ private: class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel { public: - InspectorChannelImpl(kj::Own isolateParam, - kj::WebSocket& webSocket) - : webSocket(webSocket), - state(kj::heap(this, kj::mv(isolateParam))) {} + InspectorChannelImpl(kj::Own isolateParam, kj::WebSocket& webSocket) + : ioHandler(webSocket), state(kj::heap(this, kj::mv(isolateParam))) { + ioHandler.connect(*this); + } using InspectorLock = Worker::Lock::TakeSynchronously; // In preview sessions, synchronous locks are not an issue. We declare an alternate spelling of @@ -2052,7 +2100,8 @@ public: // locks. ~InspectorChannelImpl() noexcept try { - KJ_DEFER(outgoingQueueNotifier->clear()); + // Stop message pump. + ioHandler.disconnect(); // Delete session under lock. auto state = this->state.lockExclusive(); @@ -2060,9 +2109,7 @@ public: jsg::V8StackScope stackScope; Isolate::Impl::Lock recordedLock(*state->get()->isolate, InspectorLock(nullptr), stackScope); KJ_IF_MAYBE(p, state->get()->isolate->currentInspectorSession) { - if (p == this) { - const_cast(*state->get()->isolate).currentInspectorSession = nullptr;; - } + const_cast(*state->get()->isolate).disconnectInspector(); } state->get()->teardownUnderLock(); } catch (...) { @@ -2086,193 +2133,135 @@ public: void disconnect() { // Fake like the client requested close. This will cause outgoingLoop() to exit and everything // will be cleaned up. - receivedClose = true; - outgoingQueueNotifier->notify(); + ioHandler.disconnect(); } - kj::Promise outgoingLoop() { - return outgoingQueueNotifier->awaitNotification().then([this]() { - auto messages = kj::mv(*outgoingQueue.lockExclusive()); - auto promise = sendToWebsocket(messages).attach(kj::mv(messages)); + void dispatchProtocolMessage(kj::String message, + v8_inspector::V8InspectorSession& session, + Isolate& isolate, + jsg::V8StackScope& stackScope, + Isolate::Impl::Lock& recordedLock) { + capnp::MallocMessageBuilder messageBuilder; + auto cmd = messageBuilder.initRoot(); + getCdpJsonCodec().decode(message, cmd); - if (receivedClose) { - return promise.then([this]() { - return webSocket.close(1000, "client closed connection"); - }); - } else if (*state.lockShared() == nullptr) { - // Another connection superseded us, or the isolate died. - return promise.then([this]() { - // TODO(soon): What happens if the other side never hangs up? - return webSocket.disconnect(); - }); + switch (cmd.which()) { + case cdp::Command::UNKNOWN: { + break; } + case cdp::Command::NETWORK_ENABLE: { + setNetworkEnabled(true); + cmd.getNetworkEnable().initResult(); + break; + } + case cdp::Command::NETWORK_DISABLE: { + setNetworkEnabled(false); + cmd.getNetworkDisable().initResult(); + break; + } + case cdp::Command::NETWORK_GET_RESPONSE_BODY: { + auto err = cmd.getNetworkGetResponseBody().initError(); + err.setCode(-32600); + err.setMessage("Network.getResponseBody is not supported in this fork"); + break; + } + case cdp::Command::PROFILER_STOP: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto& lock = recordedLock.lock; + stopProfiling(**p, lock->v8Isolate, cmd); + } + break; + } + case cdp::Command::PROFILER_START: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto& lock = recordedLock.lock; + startProfiling(**p, lock->v8Isolate); + } + break; + } + case cdp::Command::PROFILER_SET_SAMPLING_INTERVAL: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto interval = cmd.getProfilerSetSamplingInterval().getParams().getInterval(); + setSamplingInterval(**p, interval); + } + break; + } + case cdp::Command::PROFILER_ENABLE: { + auto& lock = recordedLock.lock; + isolate.impl->profiler = kj::Own( + v8::CpuProfiler::New(lock->v8Isolate, v8::kDebugNaming, v8::kLazyLogging), + CpuProfilerDisposer::instance); + break; + } + case cdp::Command::TAKE_HEAP_SNAPSHOT: { + auto& lock = recordedLock.lock; + auto params = cmd.getTakeHeapSnapshot().getParams(); + takeHeapSnapshot(*lock, + params.getExposeInternals(), + params.getCaptureNumericValue()); + break; + } + } - return promise.then([this]() { return outgoingLoop(); }); - }); - } - - kj::Promise incomingLoop() { - return webSocket.receive().then([this](kj::WebSocket::Message&& message) -> kj::Promise { - KJ_SWITCH_ONEOF(message) { - KJ_CASE_ONEOF(text, kj::String) { - { - capnp::MallocMessageBuilder message; - auto cmd = message.initRoot(); - - getCdpJsonCodec().decode(text, cmd); - - switch (cmd.which()) { - case cdp::Command::UNKNOWN: { - break; - } - case cdp::Command::NETWORK_ENABLE: { - setNetworkEnabled(true); - cmd.getNetworkEnable().initResult(); - break; - } - case cdp::Command::NETWORK_DISABLE: { - setNetworkEnabled(false); - cmd.getNetworkDisable().initResult(); - break; - } - case cdp::Command::NETWORK_GET_RESPONSE_BODY: { - auto err = cmd.getNetworkGetResponseBody().initError(); - err.setCode(-32600); - err.setMessage("Network.getResponseBody is not supported in this fork"); - break; - } - case cdp::Command::PROFILER_STOP: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - stopProfiling(**p, lock.v8Isolate, cmd); - } - break; - } - - case cdp::Command::PROFILER_START: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - startProfiling(**p, lock.v8Isolate); - } - break; - } + if (!cmd.isUnknown()) { + sendNotification(cmd); + return; + } - case cdp::Command::PROFILER_SET_SAMPLING_INTERVAL: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto interval = cmd.getProfilerSetSamplingInterval().getParams().getInterval(); - setSamplingInterval(**p, interval); - } - break; - } - case cdp::Command::PROFILER_ENABLE: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - isolate.impl->profiler = kj::Own( - v8::CpuProfiler::New(lock.v8Isolate, v8::kDebugNaming, v8::kLazyLogging), - CpuProfilerDisposer::instance); - break; - } - case cdp::Command::TAKE_HEAP_SNAPSHOT: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - auto params = cmd.getTakeHeapSnapshot().getParams(); - takeHeapSnapshot(lock, - params.getExposeInternals(), - params.getCaptureNumericValue()); - break; - } - } + auto& lock = recordedLock.lock; - if (!cmd.isUnknown()) { - sendNotification(cmd); - return incomingLoop(); - } - } + // We have at times observed V8 bugs where the inspector queues a background task and + // then synchronously waits for it to complete, which would deadlock if background + // threads are disallowed. Since the inspector is in a process sandbox anyway, it's not + // a big deal to just permit those background threads. + AllowV8BackgroundThreadsScope allowBackgroundThreads; - auto state = this->state.lockExclusive(); + kj::Maybe maybeLimitError; + { + auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(*lock, maybeLimitError); + session.dispatchProtocolMessage(toStringView(message)); + } - // const_cast OK because we're going to lock it - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; + // Run microtasks in case the user made an async call. + if (maybeLimitError == nullptr) { + auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(*lock, maybeLimitError); + lock->v8Isolate->PerformMicrotaskCheckpoint(); + } else { + // Oops, we already exceeded the limit, so force the microtask queue to be thrown away. + lock->v8Isolate->TerminateExecution(); + lock->v8Isolate->PerformMicrotaskCheckpoint(); + } - // We have at times observed V8 bugs where the inspector queues a background task and - // then synchronously waits for it to complete, which would deadlock if background - // threads are disallowed. Since the inspector is in a process sandbox anyway, it's not - // a big deal to just permit those background threads. - AllowV8BackgroundThreadsScope allowBackgroundThreads; + KJ_IF_MAYBE(limitError, maybeLimitError) { + v8::HandleScope scope(lock->v8Isolate); - kj::Maybe maybeLimitError; - { - auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(lock, maybeLimitError); - state->get()->session->dispatchProtocolMessage(toStringView(text)); - } + // HACK: We want to print the error, but we need a context to do that. + // We don't know which contexts exist in this isolate, so I guess we have to + // create one. Ugh. + auto dummyContext = v8::Context::New(lock->v8Isolate); + auto& inspector = *KJ_ASSERT_NONNULL(isolate.impl->inspector); + inspector.contextCreated( + v8_inspector::V8ContextInfo(dummyContext, 1, v8_inspector::StringView( + reinterpret_cast("Worker"), 6))); + sendExceptionToInspector(inspector, dummyContext, + jsg::extractTunneledExceptionDescription(limitError->getDescription())); + inspector.contextDestroyed(dummyContext); + } - // Run microtasks in case the user made an async call. - if (maybeLimitError == nullptr) { - auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(lock, maybeLimitError); - lock.v8Isolate->PerformMicrotaskCheckpoint(); - } else { - // Oops, we already exceeded the limit, so force the microtask queue to be thrown away. - lock.v8Isolate->TerminateExecution(); - lock.v8Isolate->PerformMicrotaskCheckpoint(); - } + if (recordedLock.checkInWithLimitEnforcer(isolate)) { + disconnect(); + } + } - KJ_IF_MAYBE(limitError, maybeLimitError) { - v8::HandleScope scope(lock.v8Isolate); - - // HACK: We want to print the error, but we need a context to do that. - // We don't know which contexts exist in this isolate, so I guess we have to - // create one. Ugh. - auto dummyContext = v8::Context::New(lock.v8Isolate); - auto& inspector = *KJ_ASSERT_NONNULL(isolate.impl->inspector); - inspector.contextCreated( - v8_inspector::V8ContextInfo(dummyContext, 1, v8_inspector::StringView( - reinterpret_cast("Worker"), 6))); - sendExceptionToInspector(inspector, dummyContext, - jsg::extractTunneledExceptionDescription(limitError->getDescription())); - inspector.contextDestroyed(dummyContext); - } + kj::Promise messagePump() { + return ioHandler.messagePump(); + } - if (recordedLock.checkInWithLimitEnforcer(isolate)) { - disconnect(); - } - } - KJ_CASE_ONEOF(bytes, kj::Array) { - // ignore - } - KJ_CASE_ONEOF(close, kj::WebSocket::Close) { - // all done - receivedClose = true; - outgoingQueueNotifier->notify(); - - // The outgoing loop will wake up and will exit out. It is exclusively joined with the - // incoming loop, so we'll be canceled there. We use NEVER_DONE here to make sure we - // don't inadvertently cancel the outgoing loop. - return kj::NEVER_DONE; - } - } - return incomingLoop(); - }); + void dispatchProtocolMessages(kj::ArrayPtr messages) { + auto lockedState = this->state.lockExclusive(); + for (auto& message : messages) { + dispatchProtocolMessage(lockedState, kj::mv(message)); + } } // --------------------------------------------------------------------------- @@ -2294,12 +2283,7 @@ public: } void sendNotification(kj::String message) { - outgoingQueue.lockExclusive()->add(kj::mv(message)); - outgoingQueueNotifier->notify(); - - // TODO(someday): Should we implement some sort of backpressure if the queue gets large? Will - // need to be careful about deadlock if so, since presumably the isolate is locked during - // these callbacks. + ioHandler.send(kj::mv(message)); } template @@ -2316,8 +2300,237 @@ public: // delay signaling the outgoing loop until this call? } + void pauseIncomingMessages(); + // Stops the delivery of CDP messages on the I/O worker thread. Called when the isolate hits a + // breakpoint or debugger statement. + + void resumeIncomingMessages(); + // Resumes delivery of CDP messages on the I/O worker thread. Called when execution resumes after + // a breakpoint or debugger statement. + + bool dispatchOneMessageDuringPause(); + // Dispatches one message whilst automatic CDP messages on the I/O worker thread is paused, called + // on the thread executing the isolate whilst execution is suspended due to a breakpoint or + // debugger statement. + private: - kj::WebSocket& webSocket; + class WebSocketIoHandler final { + // Class that manages the I/O for devtools connections. I/O is performed on the + // thread associated with the InspectorService (the thread that calls attachInspector). + // Most of the public API is intended for code running on the isolate thread, such as + // the InspectorChannelImpl and the InspectorClient. + public: + WebSocketIoHandler(kj::WebSocket& webSocket) + : webSocket(webSocket) { + // Assume we are being instantiated on the InspectorService thread, the thread that will do + // I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread. + outgoingQueueNotifier = kj::atomicRefcounted(kj::getCurrentThreadExecutor()); + } + + ~WebSocketIoHandler() noexcept(false) { + outgoingQueueNotifier->clear(); + } + + void connect(InspectorChannelImpl& inspectorChannel) { + // Sets the channel that messages are delivered to. + channel = inspectorChannel; + } + + void disconnect() { + channel = {}; + shutdown(); + } + + void pauseIncomingMessages() { + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + if (lockedIncomingQueue->status == MessageQueue::Status::CLOSED) { + return; + } + lockedIncomingQueue->status = MessageQueue::Status::PAUSED; + } + + void resumeIncomingMessages() { + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + if (lockedIncomingQueue->status == MessageQueue::Status::CLOSED) { + return; + } + lockedIncomingQueue->status = MessageQueue::Status::ACTIVE; + deliverProtocolMessages(channel, lockedIncomingQueue); + } + + kj::Maybe waitForMessage() { + // Blocked the current thread until a message arrives. This is intended + // for use in the InspectorClient when breakpoints are hit. The InspectorClient + // has to remain in runMessageLoopOnPause() but still receive CDP messages + // (e.g. resume). + return incomingQueue.when( + [](const MessageQueue& incomingQueue) { + return (incomingQueue.head < incomingQueue.messages.size() || + incomingQueue.status == MessageQueue::Status::CLOSED); + }, + [](MessageQueue& incomingQueue) -> kj::Maybe { + if (incomingQueue.status == MessageQueue::Status::CLOSED) return {}; + return pollMessage(incomingQueue); + }); + } + + kj::Promise messagePump() { + // Message pumping promise that should be evaluated on the InspectorService + // thread. + return incomingLoop().exclusiveJoin(outgoingLoop()); + } + + void send(kj::String message) { + auto lockedOutgoingQueue = outgoingQueue.lockExclusive(); + if (lockedOutgoingQueue->status == MessageQueue::Status::CLOSED) return; + lockedOutgoingQueue->messages.add(kj::mv(message)); + outgoingQueueNotifier->notify(); + } + + private: + struct MessageQueue { + kj::Vector messages; + size_t head; + enum class Status { ACTIVE, PAUSED, CLOSED } status; + }; + + static kj::Maybe pollMessage(MessageQueue& messageQueue) { + if (messageQueue.head < messageQueue.messages.size()) { + kj::String message = kj::mv(messageQueue.messages[messageQueue.head++]); + if (messageQueue.head == messageQueue.messages.size()) { + messageQueue.head = 0; + messageQueue.messages.clear(); + } + return kj::mv(message); + } + return {}; + } + + static void deliverProtocolMessages(kj::Maybe& channel, + kj::Locked& incomingQueue) { + KJ_REQUIRE(incomingQueue->status == MessageQueue::Status::ACTIVE); + KJ_IF_MAYBE(c, channel) { + kj::ArrayPtr messages = incomingQueue->messages.slice( + incomingQueue->head, incomingQueue->messages.size()); + c->dispatchProtocolMessages(messages); + incomingQueue->messages.clear(); + incomingQueue->head = 0; + } + } + + void shutdown() { + // Drain incoming queue, the isolate thread may be waiting on it + // on will notice it is closed if woken without any messages to + // deliver in WebSocketIoWorker::waitForMessage(). + { + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + lockedIncomingQueue->head = 0; + lockedIncomingQueue->messages.clear(); + lockedIncomingQueue->status = MessageQueue::Status::CLOSED; + } + { + auto lockedOutgoingQueue = outgoingQueue.lockExclusive(); + lockedOutgoingQueue->status = MessageQueue::Status::CLOSED; + } + // Wake any waiters since queue status fields have been updated. + outgoingQueueNotifier->notify(); + } + + kj::Promise incomingLoop() { + for (;;) { + auto message = co_await webSocket.receive(); + KJ_SWITCH_ONEOF(message) { + KJ_CASE_ONEOF(text, kj::String) { + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + lockedIncomingQueue->messages.add(kj::mv(text)); + if (lockedIncomingQueue->status == MessageQueue::Status::ACTIVE) { + deliverProtocolMessages(channel, lockedIncomingQueue); + } + } + KJ_CASE_ONEOF(blob, kj::Array){ + // Ignore. + } + KJ_CASE_ONEOF(close, kj::WebSocket::Close) { + shutdown(); + } + } + } + } + + kj::Promise outgoingLoop() { + for (;;) { + co_await outgoingQueueNotifier->awaitNotification(); + try { + auto lockedOutgoingQueue = outgoingQueue.lockExclusive(); + auto messages = kj::mv(lockedOutgoingQueue->messages); + bool receivedClose = lockedOutgoingQueue->status == MessageQueue::Status::CLOSED; + lockedOutgoingQueue.release(); + co_await sendToWebSocket(kj::mv(messages)); + if (receivedClose) { + co_await webSocket.close(1000, "client closed connection"); + co_return; + } + } catch (kj::Exception& e) { + shutdown(); + throw; + } + } + } + + kj::Promise sendToWebSocket(kj::Vector messages) { + for (auto& message : messages) { + co_await webSocket.send(message); + } + } + + class XThreadNotifier final: public kj::AtomicRefcounted { + // Class encapsulating the ability to notify the inspector thread from other threads when + // messages are pushed to the outgoing queue. + // + // TODO(cleanup): This could be a lot simpler if only it were possible to cancel + // an executor.executeAsync() promise from an arbitrary thread. Then, if the inspector + // session was destroyed in its thread while a cross-thread notification was in-flight, it + // could cancel that notification directly. + public: + XThreadNotifier(const kj::Executor& executor) : executor(executor) { } + + void clear() { + // Must call in main thread before it drops its reference. + paf = nullptr; + } + + kj::Promise awaitNotification() { + return kj::mv(KJ_ASSERT_NONNULL(paf).promise).then([this]() { + paf = kj::newPromiseAndFulfiller(); + }); + } + + void notify() const { + executor.executeAsync([ref = kj::atomicAddRef(*this)]() { + KJ_IF_MAYBE(p, ref->paf) { + p->fulfiller->fulfill(); + } + }).detach([](kj::Exception&& exception) { + KJ_LOG(ERROR, exception); + }); + } + + private: + const kj::Executor& executor; + mutable kj::Maybe> paf = kj::newPromiseAndFulfiller(); + // Accessed only in notifier's owning thread. + }; + + kj::MutexGuarded incomingQueue; + kj::MutexGuarded outgoingQueue; + kj::Own outgoingQueueNotifier; + + kj::WebSocket& webSocket; // only accessed on the InspectorService thread. + std::atomic_bool receivedClose; // accessed on any thread (only transitions false -> true). + kj::Maybe channel; // only accessed on the isolate thread. + }; + + WebSocketIoHandler ioHandler; void takeHeapSnapshot(jsg::Lock& js, bool exposeInternals, bool captureNumericValue) { struct Activity: public v8::ActivityControl { @@ -2410,75 +2623,63 @@ private: kj::MutexGuarded> state; // Mutex ordering: You must lock this *before* locking the isolate. - class XThreadNotifier final: public kj::AtomicRefcounted { - // Class encapsulating the ability to notify the inspector thread from other threads when - // messages are pushed to the outgoing queue. - // - // TODO(cleanup): This could be a lot simpler if only it were possible to cancel - // an executor.executeAsync() promise from an arbitrary thread. Then, if the inspector - // session was destroyed in its thread while a cross-thread notification was in-flight, it - // could cancel that notification directly. - public: - void clear() { - // Must call in main thread before it drops its reference. - paf = nullptr; - } + volatile bool networkEnabled = false; + // Not under `state` lock due to lock ordering complications. - kj::Promise awaitNotification() { - return kj::mv(KJ_ASSERT_NONNULL(paf).promise).then([this]() { - paf = kj::newPromiseAndFulfiller(); - __atomic_store_n(&inFlight, false, __ATOMIC_RELAXED); - }); - } + void dispatchProtocolMessage( + kj::Locked>& lockedState, + kj::String message) { + v8_inspector::V8InspectorSession& session = *lockedState->get()->session; + // const_cast OK because we're going to lock it + Isolate& isolate = const_cast(*lockedState->get()->isolate); + jsg::V8StackScope stackScope; + Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); + dispatchProtocolMessage(kj::mv(message), session, isolate, stackScope, recordedLock); + } +}; - void notify() const { - // TODO(perf): Figure out why this commented-out optimization sometimes randomly misses - // messages, particularly under load. - // if (__atomic_exchange_n(&inFlight, true, __ATOMIC_RELAXED)) { - // // A notifciation is already in-flight, no need to send another one. - // } else { - executor.executeAsync([ref = kj::atomicAddRef(*this)]() { - KJ_IF_MAYBE(p, ref->paf) { - p->fulfiller->fulfill(); - } - }).detach([](kj::Exception&& exception) { - KJ_LOG(ERROR, exception); - }); - // } - } +void Worker::Isolate::InspectorChannelImpl::pauseIncomingMessages() { + ioHandler.pauseIncomingMessages(); +} - private: - const kj::Executor& executor = kj::getCurrentThreadExecutor(); +void Worker::Isolate::InspectorChannelImpl::resumeIncomingMessages() { + ioHandler.resumeIncomingMessages(); +} - mutable kj::Maybe> paf = kj::newPromiseAndFulfiller(); - // Accessed only in notifier's owning thread. +bool Worker::Isolate::InspectorChannelImpl::dispatchOneMessageDuringPause() { + auto maybeMessage = ioHandler.waitForMessage(); - mutable bool inFlight = false; - // Is a notification already in-flight? - }; + // We can be paused by either hitting a debugger statement in a script or from hitting + // a breakpoint or someone hit break. - kj::Own outgoingQueueNotifier = kj::atomicRefcounted(); - // Whenever another thread adds messages to the outgoing queue, it notifies the inspector - // connection thread using this. + KJ_IF_MAYBE(message, maybeMessage) { + auto lockedState = this->state.lockExclusive(); + // Received a message whilst script is running, probably in a breakpoint. + v8_inspector::V8InspectorSession& session = *lockedState->get()->session; + // const_cast OK because the IoContext has the lock. + Isolate& isolate = const_cast(*lockedState->get()->isolate); + Worker::Lock& workerLock = IoContext::current().getCurrentLock(); + Isolate::Impl::Lock& recordedLock = workerLock.impl->recordedLock; + jsg::V8StackScope stackScope; + dispatchProtocolMessage(kj::mv(*message), session, isolate, stackScope, recordedLock); + return true; + } else { + // No message from waitForMessage() implies the connection is broken. + return false; + } +} - kj::MutexGuarded> outgoingQueue; - bool receivedClose = false; +void Worker::InspectorClient::pauseIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel) { + channel.pauseIncomingMessages(); +} - volatile bool networkEnabled = false; - // Not under `state` lock due to lock ordering complications. +void Worker::InspectorClient::resumeIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel) { + channel.resumeIncomingMessages(); +} - kj::Promise sendToWebsocket(kj::ArrayPtr messages) { - if (messages.size() == 0) { - return kj::READY_NOW; - } else { - auto first = kj::mv(messages[0]); - auto rest = messages.slice(1, messages.size()); - return webSocket.send(first).attach(kj::mv(first)).then([this, rest]() mutable { - return sendToWebsocket(rest); - }); - } - } -}; +bool Worker::InspectorClient::dispatchOneMessageDuringPause(Worker::Isolate::InspectorChannelImpl& channel) { + return channel.dispatchOneMessageDuringPause(); +} kj::Promise Worker::Isolate::attachInspector( kj::Timer& timer, @@ -2514,12 +2715,12 @@ kj::Promise Worker::Isolate::attachInspector( // just not. lockedSelf.disconnectInspector(); - auto channel = kj::heap( - kj::atomicAddRef(*this), webSocket); - lockedSelf.currentInspectorSession = *channel; - lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset); + auto channel = kj::heap(kj::atomicAddRef(*this), webSocket); + lockedSelf.currentInspectorSession = *channel; + lockedSelf.impl->inspectorClient.setChannel(*channel); + // Send any queued notifications. { v8::HandleScope handleScope(lock.v8Isolate); @@ -2528,19 +2729,17 @@ kj::Promise Worker::Isolate::attachInspector( } lockedSelf.impl->queuedNotifications.clear(); } - - return channel->incomingLoop() - .exclusiveJoin(channel->outgoingLoop()) - .attach(kj::mv(channel)); + return channel->messagePump().attach(kj::mv(channel)); } void Worker::Isolate::disconnectInspector() { // If an inspector session is connected, proactively drop it, so as to force it to drop its // reference on the script, so that the script can be deleted. - KJ_IF_MAYBE(current, currentInspectorSession) { current->disconnect(); + currentInspectorSession = {}; } + impl->inspectorClient.resetChannel(); } void Worker::Isolate::logWarning(kj::StringPtr description, Lock& lock) { diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index ef72120c3f9..5028d6f9f1e 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -451,7 +451,6 @@ class Worker::Isolate: public kj::AtomicRefcounted { kj::Own traceAsyncContextKey; friend class Worker; - friend class IsolateChannelImpl; }; class Worker::ApiIsolate { diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index fba6b3b38d8..f3df0555273 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -969,6 +969,38 @@ kj::Own Server::makeDiskDirectoryService( // ======================================================================================= +class Server::InspectorServiceIsolateRegistrar final { + // This class exists to update the InspectorService's table of isolates when a config + // has multiple services. The InspectorService exists on the stack of it's own thread and + // initializes state that is bound to the thread, e.g. a http server and an event loop. + // This class provides a small thread-safe interface to the InspectorService so : + // mappings can be added after the InspectorService has started. + // + // The CloudFlare devtools only show the first service in workerd configuration. This service + // is always contains a users code. However, in packaging user code wrangler may add + // additional services that also have code. If using Chrome devtools to inspect a workerd, + // instance all services are visible and can be debugged. + +public: + InspectorServiceIsolateRegistrar() {} + ~InspectorServiceIsolateRegistrar() noexcept(true); + + void registerIsolate(kj::StringPtr name, Worker::Isolate* isolate); + + KJ_DISALLOW_COPY_AND_MOVE(InspectorServiceIsolateRegistrar); +private: + void attach(const Server::InspectorService* anInspectorService) { + *inspectorService.lockExclusive() = anInspectorService; + } + + void detach() { + *inspectorService.lockExclusive() = nullptr; + } + + kj::MutexGuarded inspectorService; + friend class Server::InspectorService; +}; + class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler { // Implements the interface for the devtools inspector protocol. // @@ -977,12 +1009,26 @@ class Server::InspectorService final: public kj::HttpService, public kj::HttpSer public: InspectorService( kj::Timer& timer, - kj::HttpHeaderTable::Builder& headerTableBuilder) + kj::HttpHeaderTable::Builder& headerTableBuilder, + InspectorServiceIsolateRegistrar& registrar) : timer(timer), headerTable(headerTableBuilder.getFutureTable()), server(timer, headerTable, *this, kj::HttpServerSettings { .errorHandler = *this - }) {} + }), + registrar(registrar) { + registrar.attach(this); + } + + ~InspectorService() { + KJ_IF_MAYBE(r, registrar) { + r->detach(); + } + } + + void invalidateRegistrar() { + registrar = nullptr; + } kj::Promise handleApplicationError( kj::Exception exception, kj::Maybe response) override { @@ -1045,6 +1091,7 @@ public: } } + KJ_LOG(INFO, kj::str("Unknown worker session [", id, "]")); return response.sendError(404, "Unknown worker session", responseHeaders); } @@ -1142,13 +1189,24 @@ private: kj::HttpHeaderTable& headerTable; kj::HashMap> isolates; kj::HttpServer server; - - friend class Registration; + kj::Maybe registrar; }; -kj::Own Server::makeInspectorService( - kj::HttpHeaderTable::Builder& headerTableBuilder) { - return kj::heap(timer, headerTableBuilder); +Server::InspectorServiceIsolateRegistrar::~InspectorServiceIsolateRegistrar() noexcept(true) { + auto lockedInspectorService = this->inspectorService.lockExclusive(); + if (lockedInspectorService != nullptr) { + auto is = const_cast(*lockedInspectorService); + is->invalidateRegistrar(); + } +} + +void Server::InspectorServiceIsolateRegistrar::registerIsolate(kj::StringPtr name, + Worker::Isolate* isolate) { + auto lockedInspectorService = this->inspectorService.lockExclusive(); + if (lockedInspectorService != nullptr) { + auto is = const_cast(*lockedInspectorService); + is->registerIsolate(name, isolate); + } } // ======================================================================================= @@ -1969,6 +2027,7 @@ static kj::Maybe createBinding( "the schema?")); } +void startInspector(kj::StringPtr inspectorAddress, Server::InspectorServiceIsolateRegistrar& registrar); kj::Own Server::makeWorker(kj::StringPtr name, config::Worker::Reader conf, capnp::List::Reader extensions) { @@ -2058,20 +2117,22 @@ kj::Own Server::makeWorker(kj::StringPtr name, config::Worker:: auto limitEnforcer = kj::heap(); auto api = kj::heap(globalContext->v8System, featureFlags.asReader(), *limitEnforcer, kj::atomicAddRef(*observer)); + auto inspectorPolicy = Worker::Isolate::InspectorPolicy::DISALLOW; + KJ_IF_MAYBE(inspector, inspectorOverride) { + // For workerd, if the inspector is enabled, it is always fully trusted. + inspectorPolicy = Worker::Isolate::InspectorPolicy::ALLOW_FULLY_TRUSTED; + } auto isolate = kj::atomicRefcounted( kj::mv(api), kj::mv(observer), name, kj::mv(limitEnforcer), - // For workerd, if the inspector is enabled, it is always fully trusted. - maybeInspectorService != nullptr ? - Worker::Isolate::InspectorPolicy::ALLOW_FULLY_TRUSTED : - Worker::Isolate::InspectorPolicy::DISALLOW); + inspectorPolicy); // If we are using the inspector, we need to register the Worker::Isolate // with the inspector service. - KJ_IF_MAYBE(inspector, maybeInspectorService) { - (*inspector)->registerIsolate(name, isolate.get()); + KJ_IF_MAYBE(isolateRegistrar, inspectorIsolateRegistrar) { + (*isolateRegistrar)->registerIsolate(name, isolate.get()); } auto script = isolate->newScript( @@ -2480,33 +2541,41 @@ void Server::startAlarmScheduler(config::Config::Reader config) { .attach(kj::mv(vfs)); } -void Server::startServices(jsg::V8System& v8System, config::Config::Reader config, - kj::HttpHeaderTable::Builder& headerTableBuilder, - kj::ForkedPromise& forkedDrainWhen) { - // --------------------------------------------------------------------------- - // Configure inspector. - static constexpr uint DEFAULT_PORT = 9229; - - static auto constexpr listen = [](kj::Network& network, - kj::StringPtr address, - InspectorService& inspectorService) - -> kj::Promise { - auto parsed = co_await network.parseAddress(address, DEFAULT_PORT); - auto listener = parsed->listen(); - KJ_LOG(INFO, "Inspector is listening"); - co_await inspectorService.listen(kj::mv(listener)); - }; +void startInspector(kj::StringPtr inspectorAddress, + Server::InspectorServiceIsolateRegistrar& registrar) { + // Configure and start the inspector socket. + kj::Thread thread([inspectorAddress, ®istrar](){ + kj::AsyncIoContext io = kj::setupAsyncIo(); + + kj::HttpHeaderTable::Builder headerTableBuilder; - KJ_IF_MAYBE(inspector, inspectorOverride) { // Create the special inspector service. - auto& inspectorService = *maybeInspectorService.emplace( - makeInspectorService(headerTableBuilder)); + auto inspectorService( + kj::heap(io.provider->getTimer(), headerTableBuilder, registrar)); + auto ownHeaderTable = headerTableBuilder.build(); // Configure and start the inspector socket. - tasks.add(listen(network, *inspector, inspectorService) - .exclusiveJoin(forkedDrainWhen.addBranch())); - } + static constexpr uint DEFAULT_PORT = 9229; + + auto& network = io.provider->getNetwork(); + auto inspectorListener = network.parseAddress(inspectorAddress, DEFAULT_PORT) + .then([](kj::Own parsed) { + return parsed->listen(); + }); + auto listen = inspectorListener.then([&inspectorService](kj::Own listener) mutable { + KJ_LOG(INFO, "Inspector is listening"); + return inspectorService->listen(kj::mv(listener)); + }); + + kj::NEVER_DONE.wait(io.waitScope); + }); + thread.detach(); +} + +void Server::startServices(jsg::V8System& v8System, config::Config::Reader config, + kj::HttpHeaderTable::Builder& headerTableBuilder, + kj::ForkedPromise& forkedDrainWhen) { // --------------------------------------------------------------------------- // Configure services @@ -2570,6 +2639,14 @@ void Server::startServices(jsg::V8System& v8System, config::Config::Reader confi }); } + // If we are using the inspector, we need to register the Worker::Isolate + // with the inspector service. + KJ_IF_MAYBE(inspectorAddress, inspectorOverride) { + auto registrar = kj::heap(); + startInspector(*inspectorAddress, *registrar); + inspectorIsolateRegistrar = kj::mv(registrar); + } + // Second pass: Build services. for (auto serviceConf: config.getServices()) { kj::StringPtr name = serviceConf.getName(); diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 6234f4caab2..68b7a7ae766 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -74,6 +74,9 @@ class Server: private kj::TaskSet::ErrorHandler { struct Ephemeral {}; using ActorConfig = kj::OneOf; + class InspectorService; + class InspectorServiceIsolateRegistrar; + private: kj::Filesystem& fs; kj::Timer& timer; @@ -92,6 +95,7 @@ class Server: private kj::TaskSet::ErrorHandler { // code that parses strings from the config file. kj::Maybe inspectorOverride; + kj::Maybe> inspectorIsolateRegistrar; kj::Maybe> controlOverride; struct GlobalContext; @@ -175,11 +179,6 @@ class Server: private kj::TaskSet::ErrorHandler { class WorkerEntrypointService; class HttpListener; - class InspectorService; - - kj::Maybe> maybeInspectorService; - kj::Own makeInspectorService(kj::HttpHeaderTable::Builder& headerTableBuilder); - void startServices(jsg::V8System& v8System, config::Config::Reader config, kj::HttpHeaderTable::Builder& headerTableBuilder, kj::ForkedPromise& forkedDrainWhen);