diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 2e3653b9eec1..75909dee7778 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -448,37 +448,105 @@ public: // We're on a request-serving thread. auto& ioContext = IoContext::current(); timePoint = ioContext.now(); - } else KJ_IF_MAYBE(info, inspectorTimerInfo) { - if (info->threadId == getCurrentThreadId()) { - // We're on an inspector-serving thread. + } else { + auto lockedState = state.lockExclusive(); + KJ_IF_MAYBE(info, lockedState->inspectorTimerInfo) { timePoint = info->timer.now() + info->timerOffset - kj::origin() + kj::UNIX_EPOCH; } + // We're at script startup time -- just return the Epoch. } + return (timePoint - kj::UNIX_EPOCH) / kj::MILLISECONDS; + } - // If we're on neither a request- nor inspector-serving thread, then we're at script startup - // time -- just return the Epoch. + void setInspectorTimerInfo(kj::Timer& timer, kj::Duration timerOffset) { + auto lockedState = state.lockExclusive(); + lockedState->inspectorTimerInfo = InspectorTimerInfo { timer, timerOffset, getCurrentThreadId() }; + } - return (timePoint - kj::UNIX_EPOCH) / kj::MILLISECONDS; + void setChannel(Worker::Isolate::InspectorChannelImpl& channel) { + auto lockedState = state.lockExclusive(); + lockedState->channel = channel; } - // Nothing else. We ignore everything the inspector tells us, because we only care about the - // devtools inspector protocol, which is handled separately. + void resetChannel() { + auto lockedState = state.lockExclusive(); + lockedState->channel = {}; + } - void setInspectorTimerInfo(kj::Timer& timer, kj::Duration timerOffset) { - // Helper for attachInspector(). - inspectorTimerInfo = InspectorTimerInfo { timer, timerOffset, getCurrentThreadId() }; + void runMessageLoopOnPause(int contextGroupId) override { + // This method is called by v8 when a breakpoint or debugger statement is hit. This method + // processes debugger messages until `Debugger.resume()` is called, when v8 then calls + // `quitMessageLoopOnPause()`. + // + // This method is ultimately called from the `InspectorChannelImpl` and the isolate lock is + // held when this method is called. + if (isMultiTenantProcess()) { + // TODO: breakpoints are not supported for the multi-tenant case. + return; + } + + { + auto lockedState = state.lockExclusive(); + KJ_IF_MAYBE(channel, lockedState->channel) { + pauseIncomingMessages(*channel); + } + } + + runMessageLoop = true; + do { + auto lockedState = state.lockExclusive(); + // Processing each message with a lock/release cycle is expensive, but + // we are debugging and processing a low volume of messages. Dropping the + // lock allows us to adapt behaviour if the lock is lost. + KJ_IF_MAYBE(channel, lockedState->channel) { + if (!dispatchOneMessageDuringPause(*channel)) { + break; + } + } else { + // Our channel is missing or has gone down. Stop waiting for messages + // and resume execution. + break; + } + } while (runMessageLoop); + + { + auto lockedState = state.lockExclusive(); + KJ_IF_MAYBE(channel, lockedState->channel) { + resumeIncomingMessages(*channel); + } + } + } + + void quitMessageLoopOnPause() override { + // This method is called by v8 to resume execution after a breakpoint is hit. + runMessageLoop = false; } private: + static void pauseIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel); + static void resumeIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel); + static bool dispatchOneMessageDuringPause(Worker::Isolate::InspectorChannelImpl& channel); + struct InspectorTimerInfo { kj::Timer& timer; kj::Duration timerOffset; uint64_t threadId; }; - kj::Maybe inspectorTimerInfo; - // The timer and offset for the inspector-serving thread. + volatile bool runMessageLoop; + + struct State { + // State that may be set on a thread other than the isolate thread. + // These are typically set in attachInspector when an inspector connection is + // made. + kj::Maybe channel; + // Inspector channel to use to pump messages. + + kj::Maybe inspectorTimerInfo; + // The timer and offset for the inspector-serving thread. + }; + kj::MutexGuarded state; }; void setWebAssemblyModuleHasInstance(jsg::Lock& lock, v8::Local context); @@ -1645,6 +1713,7 @@ Worker::Lock::Lock(const Worker& constWorker, LockType lockType) impl(kj::heap(worker, lockType, stackScope)) { kj::requireOnStack(this, "Worker::Lock MUST be allocated on the stack."); } + Worker::Lock::~Lock() noexcept(false) { // const_cast OK because we hold -- nay, we *are* -- a lock on the script. auto& isolate = const_cast(worker.getIsolate()); @@ -2051,10 +2120,10 @@ private: class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel { public: - InspectorChannelImpl(kj::Own isolateParam, - kj::WebSocket& webSocket) - : webSocket(webSocket), - state(kj::heap(this, kj::mv(isolateParam))) {} + InspectorChannelImpl(kj::Own isolateParam, kj::WebSocket& webSocket) + : ioWorker(webSocket), state(kj::heap(this, kj::mv(isolateParam))) { + ioWorker.connect(*this); + } using InspectorLock = Worker::Lock::TakeSynchronously; // In preview sessions, synchronous locks are not an issue. We declare an alternate spelling of @@ -2062,7 +2131,8 @@ public: // locks. ~InspectorChannelImpl() noexcept try { - KJ_DEFER(outgoingQueueNotifier->clear()); + // Stop message pump. + ioWorker.disconnect(); // Delete session under lock. auto state = this->state.lockExclusive(); @@ -2070,9 +2140,7 @@ public: jsg::V8StackScope stackScope; Isolate::Impl::Lock recordedLock(*state->get()->isolate, InspectorLock(nullptr), stackScope); KJ_IF_MAYBE(p, state->get()->isolate->currentInspectorSession) { - if (p == this) { - const_cast(*state->get()->isolate).currentInspectorSession = nullptr;; - } + const_cast(*state->get()->isolate).disconnectInspector(); } state->get()->teardownUnderLock(); } catch (...) { @@ -2096,193 +2164,18 @@ public: void disconnect() { // Fake like the client requested close. This will cause outgoingLoop() to exit and everything // will be cleaned up. - receivedClose = true; - outgoingQueueNotifier->notify(); + ioWorker.disconnect(); } - kj::Promise outgoingLoop() { - return outgoingQueueNotifier->awaitNotification().then([this]() { - auto messages = kj::mv(*outgoingQueue.lockExclusive()); - auto promise = sendToWebsocket(messages).attach(kj::mv(messages)); - - if (receivedClose) { - return promise.then([this]() { - return webSocket.close(1000, "client closed connection"); - }); - } else if (*state.lockShared() == nullptr) { - // Another connection superseded us, or the isolate died. - return promise.then([this]() { - // TODO(soon): What happens if the other side never hangs up? - return webSocket.disconnect(); - }); - } - - return promise.then([this]() { return outgoingLoop(); }); - }); + kj::Promise messagePump() { + return ioWorker.messagePump(); } - kj::Promise incomingLoop() { - return webSocket.receive().then([this](kj::WebSocket::Message&& message) -> kj::Promise { - KJ_SWITCH_ONEOF(message) { - KJ_CASE_ONEOF(text, kj::String) { - { - capnp::MallocMessageBuilder message; - auto cmd = message.initRoot(); - - getCdpJsonCodec().decode(text, cmd); - - switch (cmd.which()) { - case cdp::Command::UNKNOWN: { - break; - } - case cdp::Command::NETWORK_ENABLE: { - setNetworkEnabled(true); - cmd.getNetworkEnable().initResult(); - break; - } - case cdp::Command::NETWORK_DISABLE: { - setNetworkEnabled(false); - cmd.getNetworkDisable().initResult(); - break; - } - case cdp::Command::NETWORK_GET_RESPONSE_BODY: { - auto err = cmd.getNetworkGetResponseBody().initError(); - err.setCode(-32600); - err.setMessage("Network.getResponseBody is not supported in this fork"); - break; - } - case cdp::Command::PROFILER_STOP: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - stopProfiling(**p, lock.v8Isolate, cmd); - } - break; - } - - case cdp::Command::PROFILER_START: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - startProfiling(**p, lock.v8Isolate); - } - break; - } - - case cdp::Command::PROFILER_SET_SAMPLING_INTERVAL: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - KJ_IF_MAYBE(p, isolate.impl->profiler) { - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto interval = cmd.getProfilerSetSamplingInterval().getParams().getInterval(); - setSamplingInterval(**p, interval); - } - break; - } - case cdp::Command::PROFILER_ENABLE: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - isolate.impl->profiler = kj::Own( - v8::CpuProfiler::New(lock.v8Isolate, v8::kDebugNaming, v8::kLazyLogging), - CpuProfilerDisposer::instance); - break; - } - case cdp::Command::TAKE_HEAP_SNAPSHOT: { - auto state = this->state.lockExclusive(); - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - auto params = cmd.getTakeHeapSnapshot().getParams(); - takeHeapSnapshot(lock, - params.getExposeInternals(), - params.getCaptureNumericValue()); - break; - } - } - - if (!cmd.isUnknown()) { - sendNotification(cmd); - return incomingLoop(); - } - } - - auto state = this->state.lockExclusive(); - - // const_cast OK because we're going to lock it - Isolate& isolate = const_cast(*state->get()->isolate); - jsg::V8StackScope stackScope; - Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); - auto& lock = *recordedLock.lock; - - // We have at times observed V8 bugs where the inspector queues a background task and - // then synchronously waits for it to complete, which would deadlock if background - // threads are disallowed. Since the inspector is in a process sandbox anyway, it's not - // a big deal to just permit those background threads. - AllowV8BackgroundThreadsScope allowBackgroundThreads; - - kj::Maybe maybeLimitError; - { - auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(lock, maybeLimitError); - state->get()->session->dispatchProtocolMessage(toStringView(text)); - } - - // Run microtasks in case the user made an async call. - if (maybeLimitError == nullptr) { - auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(lock, maybeLimitError); - lock.v8Isolate->PerformMicrotaskCheckpoint(); - } else { - // Oops, we already exceeded the limit, so force the microtask queue to be thrown away. - lock.v8Isolate->TerminateExecution(); - lock.v8Isolate->PerformMicrotaskCheckpoint(); - } - - KJ_IF_MAYBE(limitError, maybeLimitError) { - v8::HandleScope scope(lock.v8Isolate); - - // HACK: We want to print the error, but we need a context to do that. - // We don't know which contexts exist in this isolate, so I guess we have to - // create one. Ugh. - auto dummyContext = v8::Context::New(lock.v8Isolate); - auto& inspector = *KJ_ASSERT_NONNULL(isolate.impl->inspector); - inspector.contextCreated( - v8_inspector::V8ContextInfo(dummyContext, 1, v8_inspector::StringView( - reinterpret_cast("Worker"), 6))); - sendExceptionToInspector(inspector, dummyContext, - jsg::extractTunneledExceptionDescription(limitError->getDescription())); - inspector.contextDestroyed(dummyContext); - } - - if (recordedLock.checkInWithLimitEnforcer(isolate)) { - disconnect(); - } - } - KJ_CASE_ONEOF(bytes, kj::Array) { - // ignore - } - KJ_CASE_ONEOF(close, kj::WebSocket::Close) { - // all done - receivedClose = true; - outgoingQueueNotifier->notify(); - - // The outgoing loop will wake up and will exit out. It is exclusively joined with the - // incoming loop, so we'll be canceled there. We use NEVER_DONE here to make sure we - // don't inadvertently cancel the outgoing loop. - return kj::NEVER_DONE; - } - } - return incomingLoop(); - }); + void dispatchProtocolMessages(kj::ArrayPtr messages) { + auto lockedState = this->state.lockExclusive(); + for (auto& message : messages) { + dispatchProtocolMessage(lockedState, kj::mv(message)); + } } // --------------------------------------------------------------------------- @@ -2304,12 +2197,7 @@ public: } void sendNotification(kj::String message) { - outgoingQueue.lockExclusive()->add(kj::mv(message)); - outgoingQueueNotifier->notify(); - - // TODO(someday): Should we implement some sort of backpressure if the queue gets large? Will - // need to be careful about deadlock if so, since presumably the isolate is locked during - // these callbacks. + ioWorker.send(kj::mv(message)); } template @@ -2326,8 +2214,236 @@ public: // delay signaling the outgoing loop until this call? } + void pauseIncomingMessages(); + // Stops the delivery of CDP messages on the I/O worker thread. Called when the isolate hits a + // breakpoint or debugger statement. + + void resumeIncomingMessages(); + // Resumes delivery of CDP messages on the I/O worker thread. Called when execution resumes after + // a breakpoint or debugger statement. + + bool dispatchOneMessageDuringPause(); + // Dispatches one message whilst automatic CDP messages on the I/O worker thread is paused, called + // on the thread executing the isolate whilst execution is suspended due to a breakpoint or + // debugger statement. + private: - kj::WebSocket& webSocket; + class WebSocketIoWorker final { + // Class that manages the I/O for devtools connections. I/O is performed on the + // thread associated with the InspectorService (the thread that calls attachInspector). + // Most of the public API is intended for code running on the isolate thread, such as + // the InspectorChannelImpl and the InspectorClient. + public: + WebSocketIoWorker(kj::WebSocket& webSocket) + : webSocket(webSocket), receivedClose(false) { + // Assume we are being instantiated on the InspectorService thread, the thread that will do + // I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread. + outgoingQueueNotifier = kj::atomicRefcounted(kj::getCurrentThreadExecutor()); + } + + ~WebSocketIoWorker() { + outgoingQueueNotifier->clear(); + } + + void connect(InspectorChannelImpl& inspectorChannel) { + // Sets the channel that messages are delivered to. + channel = inspectorChannel; + } + + void disconnect() { + channel = {}; + shutdown(); + } + + bool isClosed() const { + return receivedClose.load(std::memory_order_acquire); + } + + void pauseIncomingMessages() { + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + lockedIncomingQueue->paused = true; + } + + void resumeIncomingMessages() { + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + lockedIncomingQueue->paused = false; + deliverProtocolMessages(channel, lockedIncomingQueue); + } + + kj::Maybe waitForMessage() { + // Blocked the current thread until a message arrives. This is intended + // for use in the InspectorClient when breakpoints are hit. The InspectorClient + // has to remain in runMessageLoopOnPause() but still receive CDP messages + // (e.g. resume). + return incomingQueue.when( + [this](const MessageQueue& incomingQueue) { + return (incomingQueue.head < incomingQueue.messages.size() || isClosed()); + }, + [this](MessageQueue& incomingQueue) -> kj::Maybe { + if (isClosed()) return {}; + return pollMessage(incomingQueue); + } + ); + } + + kj::Promise messagePump() { + // Message pumping promise that should be evaluated on the InspectorService + // thread. + return incomingLoop().exclusiveJoin(outgoingLoop()); + } + + void send(kj::String message) { + if (isClosed()) return; + auto lockedOutgoingQueue = outgoingQueue.lockExclusive(); + lockedOutgoingQueue->messages.add(kj::mv(message)); + outgoingQueueNotifier->notify(); + } + + private: + struct MessageQueue { + kj::Vector messages; + size_t head; + bool paused; + }; + + static kj::Maybe pollMessage(MessageQueue& messageQueue) { + if (messageQueue.head < messageQueue.messages.size()) { + kj::String message = kj::mv(messageQueue.messages[messageQueue.head++]); + if (messageQueue.head == messageQueue.messages.size()) { + messageQueue.head = 0; + messageQueue.messages.clear(); + } + return kj::mv(message); + } + return {}; + } + + static void deliverProtocolMessages(kj::Maybe& channel, + kj::Locked& incomingQueue) { + KJ_REQUIRE(!incomingQueue->paused); + KJ_IF_MAYBE(c, channel) { + kj::ArrayPtr messages = incomingQueue->messages.slice( + incomingQueue->head, incomingQueue->messages.size()); + c->dispatchProtocolMessages(messages); + incomingQueue->messages.clear(); + incomingQueue->head = 0; + } + } + + void shutdown() { + receivedClose.store(true, std::memory_order_release); + + // Wake any waiters. + outgoingQueueNotifier->notify(); + + // Drain incoming queue, the isolate thread may be waiting on it + // on will notice it is closed if woken without any messages to + // deliver in WebSocketIoWorker::waitForMessage(). + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + lockedIncomingQueue->head = 0; + lockedIncomingQueue->messages.clear(); + } + + kj::Promise incomingLoop() { + return webSocket.receive().then([this](kj::WebSocket::Message&& message) -> kj::Promise { + KJ_SWITCH_ONEOF(message) { + KJ_CASE_ONEOF(text, kj::String) { + auto lockedIncomingQueue = incomingQueue.lockExclusive(); + lockedIncomingQueue->messages.add(kj::mv(text)); + if (!lockedIncomingQueue->paused) { + deliverProtocolMessages(channel, lockedIncomingQueue); + } + } + KJ_CASE_ONEOF(blob, kj::Array){ + // Ignore. + } + KJ_CASE_ONEOF(close, kj::WebSocket::Close) { + shutdown(); + } + } + return incomingLoop(); + }); + } + + kj::Promise outgoingLoop() { + return outgoingQueueNotifier->awaitNotification().then([this]() { + auto lockedQueue = outgoingQueue.lockExclusive(); + auto messages = kj::mv(lockedQueue->messages); + try { + kj::Promise sendMessages = sendToWebSocket(messages).attach(kj::mv(messages)); + if (isClosed()) { + return sendMessages.then([this]() { + return webSocket.close(1000, "client closed connection"); + }); + } + return sendMessages.then([this]() { return outgoingLoop(); }); + } catch (kj::Exception& e) { + shutdown(); + throw; + } + }); + } + + kj::Promise sendToWebSocket(kj::ArrayPtr messages) { + if (messages.size() == 0) { + return kj::READY_NOW; + } else { + auto first = kj::mv(messages[0]); + auto rest = messages.slice(1, messages.size()); + return webSocket.send(first).attach(kj::mv(first)).then([this, rest]() mutable { + return sendToWebSocket(rest); + }); + } + } + + class XThreadNotifier final: public kj::AtomicRefcounted { + // Class encapsulating the ability to notify the inspector thread from other threads when + // messages are pushed to the outgoing queue. + // + // TODO(cleanup): This could be a lot simpler if only it were possible to cancel + // an executor.executeAsync() promise from an arbitrary thread. Then, if the inspector + // session was destroyed in its thread while a cross-thread notification was in-flight, it + // could cancel that notification directly. + public: + XThreadNotifier(const kj::Executor& executor) : executor(executor) { } + + void clear() { + // Must call in main thread before it drops its reference. + paf = nullptr; + } + + kj::Promise awaitNotification() { + return kj::mv(KJ_ASSERT_NONNULL(paf).promise).then([this]() { + paf = kj::newPromiseAndFulfiller(); + }); + } + + void notify() const { + executor.executeAsync([ref = kj::atomicAddRef(*this)]() { + KJ_IF_MAYBE(p, ref->paf) { + p->fulfiller->fulfill(); + } + }).detach([](kj::Exception&& exception) { + KJ_LOG(ERROR, exception); + }); + } + + private: + const kj::Executor& executor; + mutable kj::Maybe> paf = kj::newPromiseAndFulfiller(); + // Accessed only in notifier's owning thread. + }; + + kj::MutexGuarded incomingQueue; + kj::MutexGuarded outgoingQueue; + kj::Own outgoingQueueNotifier; + + kj::WebSocket& webSocket; // only accessed on the InspectorService thread. + std::atomic_bool receivedClose; // accessed on any thread (only transitions false -> true). + kj::Maybe channel; // only accessed on the isolate thread. + }; + + WebSocketIoWorker ioWorker; void takeHeapSnapshot(jsg::Lock& js, bool exposeInternals, bool captureNumericValue) { struct Activity: public v8::ActivityControl { @@ -2420,75 +2536,183 @@ private: kj::MutexGuarded> state; // Mutex ordering: You must lock this *before* locking the isolate. - class XThreadNotifier final: public kj::AtomicRefcounted { - // Class encapsulating the ability to notify the inspector thread from other threads when - // messages are pushed to the outgoing queue. - // - // TODO(cleanup): This could be a lot simpler if only it were possible to cancel - // an executor.executeAsync() promise from an arbitrary thread. Then, if the inspector - // session was destroyed in its thread while a cross-thread notification was in-flight, it - // could cancel that notification directly. - public: - void clear() { - // Must call in main thread before it drops its reference. - paf = nullptr; + volatile bool networkEnabled = false; + // Not under `state` lock due to lock ordering complications. + + void dispatchProtocolMessage( + kj::Locked>& lockedState, + kj::String message) { + v8_inspector::V8InspectorSession& session = *lockedState->get()->session; + // const_cast OK because we're going to lock it + Isolate& isolate = const_cast(*lockedState->get()->isolate); + jsg::V8StackScope stackScope; + Isolate::Impl::Lock recordedLock(isolate, InspectorLock(nullptr), stackScope); + dispatchProtocolMessage(kj::mv(message), session, isolate, stackScope, recordedLock); + } + + void dispatchProtocolMessage(kj::String message, + v8_inspector::V8InspectorSession& session, + Isolate& isolate, + jsg::V8StackScope& stackScope, + Isolate::Impl::Lock& recordedLock) { + capnp::MallocMessageBuilder messageBuilder; + auto cmd = messageBuilder.initRoot(); + getCdpJsonCodec().decode(message, cmd); + + switch (cmd.which()) { + case cdp::Command::UNKNOWN: { + break; + } + case cdp::Command::NETWORK_ENABLE: { + setNetworkEnabled(true); + cmd.getNetworkEnable().initResult(); + break; + } + case cdp::Command::NETWORK_DISABLE: { + setNetworkEnabled(false); + cmd.getNetworkDisable().initResult(); + break; + } + case cdp::Command::NETWORK_GET_RESPONSE_BODY: { + auto err = cmd.getNetworkGetResponseBody().initError(); + err.setCode(-32600); + err.setMessage("Network.getResponseBody is not supported in this fork"); + break; + } + case cdp::Command::PROFILER_STOP: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto& lock = recordedLock.lock; + stopProfiling(**p, lock->v8Isolate, cmd); + } + break; + } + case cdp::Command::PROFILER_START: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto& lock = recordedLock.lock; + startProfiling(**p, lock->v8Isolate); + } + break; + } + case cdp::Command::PROFILER_SET_SAMPLING_INTERVAL: { + KJ_IF_MAYBE(p, isolate.impl->profiler) { + auto interval = cmd.getProfilerSetSamplingInterval().getParams().getInterval(); + setSamplingInterval(**p, interval); + } + break; + } + case cdp::Command::PROFILER_ENABLE: { + auto& lock = recordedLock.lock; + isolate.impl->profiler = kj::Own( + v8::CpuProfiler::New(lock->v8Isolate, v8::kDebugNaming, v8::kLazyLogging), + CpuProfilerDisposer::instance); + break; + } + case cdp::Command::TAKE_HEAP_SNAPSHOT: { + auto& lock = recordedLock.lock; + auto params = cmd.getTakeHeapSnapshot().getParams(); + takeHeapSnapshot(*lock, + params.getExposeInternals(), + params.getCaptureNumericValue()); + break; + } } - kj::Promise awaitNotification() { - return kj::mv(KJ_ASSERT_NONNULL(paf).promise).then([this]() { - paf = kj::newPromiseAndFulfiller(); - __atomic_store_n(&inFlight, false, __ATOMIC_RELAXED); - }); + if (!cmd.isUnknown()) { + sendNotification(cmd); + return; } - void notify() const { - // TODO(perf): Figure out why this commented-out optimization sometimes randomly misses - // messages, particularly under load. - // if (__atomic_exchange_n(&inFlight, true, __ATOMIC_RELAXED)) { - // // A notifciation is already in-flight, no need to send another one. - // } else { - executor.executeAsync([ref = kj::atomicAddRef(*this)]() { - KJ_IF_MAYBE(p, ref->paf) { - p->fulfiller->fulfill(); - } - }).detach([](kj::Exception&& exception) { - KJ_LOG(ERROR, exception); - }); - // } + auto& lock = recordedLock.lock; + + // We have at times observed V8 bugs where the inspector queues a background task and + // then synchronously waits for it to complete, which would deadlock if background + // threads are disallowed. Since the inspector is in a process sandbox anyway, it's not + // a big deal to just permit those background threads. + AllowV8BackgroundThreadsScope allowBackgroundThreads; + + kj::Maybe maybeLimitError; + { + auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(*lock, maybeLimitError); + session.dispatchProtocolMessage(toStringView(message)); } - private: - const kj::Executor& executor = kj::getCurrentThreadExecutor(); + // Run microtasks in case the user made an async call. + if (maybeLimitError == nullptr) { + auto limitScope = isolate.getLimitEnforcer().enterInspectorJs(*lock, maybeLimitError); + lock->v8Isolate->PerformMicrotaskCheckpoint(); + } else { + // Oops, we already exceeded the limit, so force the microtask queue to be thrown away. + lock->v8Isolate->TerminateExecution(); + lock->v8Isolate->PerformMicrotaskCheckpoint(); + } - mutable kj::Maybe> paf = kj::newPromiseAndFulfiller(); - // Accessed only in notifier's owning thread. + KJ_IF_MAYBE(limitError, maybeLimitError) { + v8::HandleScope scope(lock->v8Isolate); - mutable bool inFlight = false; - // Is a notification already in-flight? - }; + // HACK: We want to print the error, but we need a context to do that. + // We don't know which contexts exist in this isolate, so I guess we have to + // create one. Ugh. + auto dummyContext = v8::Context::New(lock->v8Isolate); + auto& inspector = *KJ_ASSERT_NONNULL(isolate.impl->inspector); + inspector.contextCreated( + v8_inspector::V8ContextInfo(dummyContext, 1, v8_inspector::StringView( + reinterpret_cast("Worker"), 6))); + sendExceptionToInspector(inspector, dummyContext, + jsg::extractTunneledExceptionDescription(limitError->getDescription())); + inspector.contextDestroyed(dummyContext); + } - kj::Own outgoingQueueNotifier = kj::atomicRefcounted(); - // Whenever another thread adds messages to the outgoing queue, it notifies the inspector - // connection thread using this. + if (recordedLock.checkInWithLimitEnforcer(isolate)) { + disconnect(); + } + } +}; - kj::MutexGuarded> outgoingQueue; - bool receivedClose = false; +void Worker::Isolate::InspectorChannelImpl::pauseIncomingMessages() { + ioWorker.pauseIncomingMessages(); +} - volatile bool networkEnabled = false; - // Not under `state` lock due to lock ordering complications. +void Worker::Isolate::InspectorChannelImpl::resumeIncomingMessages() { + ioWorker.resumeIncomingMessages(); +} - kj::Promise sendToWebsocket(kj::ArrayPtr messages) { - if (messages.size() == 0) { - return kj::READY_NOW; +bool Worker::Isolate::InspectorChannelImpl::dispatchOneMessageDuringPause() { + auto maybeMessage = ioWorker.waitForMessage(); + + // We can be paused by either hitting a debugger statement in a script or from hitting + // a breakpoint or someone hit break. As a result we may or may not be recursive dispatching messages. + + KJ_IF_MAYBE(message, maybeMessage) { + auto lockedState = this->state.lockExclusive(); + if (IoContext::hasCurrent()) { + // Received a message whilst script is running, probably in a breakpoint. + v8_inspector::V8InspectorSession& session = *lockedState->get()->session; + // const_cast OK because we're going to lock it + Isolate& isolate = const_cast(*lockedState->get()->isolate); + Worker::Lock& workerLock = IoContext::current().getCurrentLock(); + Isolate::Impl::Lock& recordedLock = workerLock.impl->recordedLock; + jsg::V8StackScope stackScope; + dispatchProtocolMessage(kj::mv(*message), session, isolate, stackScope, recordedLock); } else { - auto first = kj::mv(messages[0]); - auto rest = messages.slice(1, messages.size()); - return webSocket.send(first).attach(kj::mv(first)).then([this, rest]() mutable { - return sendToWebsocket(rest); - }); + // Received a message whilst idle. + dispatchProtocolMessage(lockedState, kj::mv(*message)); } + return true; } -}; + return false; +} + +void Worker::InspectorClient::pauseIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel) { + channel.pauseIncomingMessages(); +} + +void Worker::InspectorClient::resumeIncomingMessages(Worker::Isolate::InspectorChannelImpl& channel) { + channel.resumeIncomingMessages(); +} + +bool Worker::InspectorClient::dispatchOneMessageDuringPause(Worker::Isolate::InspectorChannelImpl& channel) { + return channel.dispatchOneMessageDuringPause(); +} kj::Promise Worker::Isolate::attachInspector( kj::Timer& timer, @@ -2524,12 +2748,12 @@ kj::Promise Worker::Isolate::attachInspector( // just not. lockedSelf.disconnectInspector(); - auto channel = kj::heap( - kj::atomicAddRef(*this), webSocket); - lockedSelf.currentInspectorSession = *channel; - lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset); + auto channel = kj::heap(kj::atomicAddRef(*this), webSocket); + lockedSelf.currentInspectorSession = *channel; + lockedSelf.impl->inspectorClient.setChannel(*channel); + // Send any queued notifications. { v8::HandleScope handleScope(lock.v8Isolate); @@ -2538,19 +2762,17 @@ kj::Promise Worker::Isolate::attachInspector( } lockedSelf.impl->queuedNotifications.clear(); } - - return channel->incomingLoop() - .exclusiveJoin(channel->outgoingLoop()) - .attach(kj::mv(channel)); + return channel->messagePump().attach(kj::mv(channel)); } void Worker::Isolate::disconnectInspector() { // If an inspector session is connected, proactively drop it, so as to force it to drop its // reference on the script, so that the script can be deleted. - KJ_IF_MAYBE(current, currentInspectorSession) { current->disconnect(); + currentInspectorSession = {}; } + impl->inspectorClient.resetChannel(); } void Worker::Isolate::logWarning(kj::StringPtr description, Lock& lock) { diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index 312bcda37bc7..4039a2b83b95 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -451,7 +451,6 @@ class Worker::Isolate: public kj::AtomicRefcounted { kj::Own traceAsyncContextKey; friend class Worker; - friend class IsolateChannelImpl; }; class Worker::ApiIsolate { diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index ff9663eb913d..9e93e43de9cc 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1141,11 +1141,6 @@ private: friend class Registration; }; -kj::Own Server::makeInspectorService( - kj::HttpHeaderTable::Builder& headerTableBuilder) { - return kj::heap(timer, headerTableBuilder); -} - // ======================================================================================= class Server::WorkerService final: public Service, private kj::TaskSet::ErrorHandler, @@ -1913,6 +1908,7 @@ static kj::Maybe createBinding( "the schema?")); } +void startInspector(kj::StringPtr inspectorAddress, kj::StringPtr name, Worker::Isolate* isolate); kj::Own Server::makeWorker(kj::StringPtr name, config::Worker::Reader conf, capnp::List::Reader extensions) { @@ -2001,20 +1997,22 @@ kj::Own Server::makeWorker(kj::StringPtr name, config::Worker:: auto limitEnforcer = kj::heap(); auto api = kj::heap(globalContext->v8System, featureFlags.asReader(), *limitEnforcer); + auto inspectorPolicy = Worker::Isolate::InspectorPolicy::DISALLOW; + KJ_IF_MAYBE(inspector, inspectorOverride) { + // For workerd, if the inspector is enabled, it is always fully trusted. + inspectorPolicy = Worker::Isolate::InspectorPolicy::ALLOW_FULLY_TRUSTED; + } auto isolate = kj::atomicRefcounted( kj::mv(api), kj::atomicRefcounted(), name, kj::mv(limitEnforcer), - // For workerd, if the inspector is enabled, it is always fully trusted. - maybeInspectorService != nullptr ? - Worker::Isolate::InspectorPolicy::ALLOW_FULLY_TRUSTED : - Worker::Isolate::InspectorPolicy::DISALLOW); + inspectorPolicy); // If we are using the inspector, we need to register the Worker::Isolate // with the inspector service. - KJ_IF_MAYBE(inspector, maybeInspectorService) { - (*inspector)->registerIsolate(name, isolate.get()); + KJ_IF_MAYBE(inspector, inspectorOverride) { + startInspector(*inspector, name, isolate.get()); } auto script = isolate->newScript( @@ -2422,32 +2420,44 @@ void Server::startAlarmScheduler(config::Config::Reader config) { .attach(kj::mv(vfs)); } -void Server::startServices(jsg::V8System& v8System, config::Config::Reader config, - kj::HttpHeaderTable::Builder& headerTableBuilder, - kj::ForkedPromise& forkedDrainWhen) { +void startInspector(kj::StringPtr inspectorAddress, kj::StringPtr name, Worker::Isolate* isolate) { // --------------------------------------------------------------------------- // Configure inspector. - KJ_IF_MAYBE(inspector, inspectorOverride) { + // Configure and start the inspector socket. + kj::Thread thread([inspectorAddress, name, isolate](){ + kj::AsyncIoContext io = kj::setupAsyncIo(); + + kj::HttpHeaderTable::Builder headerTableBuilder; + // Create the special inspector service. - auto& inspectorService = *maybeInspectorService.emplace( - makeInspectorService(headerTableBuilder)); + kj::Own inspectorService(kj::heap(io.provider->getTimer(), headerTableBuilder)); + auto ownHeaderTable = headerTableBuilder.build(); + + inspectorService->registerIsolate(name, isolate); // Configure and start the inspector socket. static constexpr uint DEFAULT_PORT = 9229; - auto inspectorListener = network.parseAddress(*inspector, DEFAULT_PORT) - .then([](kj::Own parsed) { + auto& network = io.provider->getNetwork(); + auto inspectorListener = network.parseAddress(inspectorAddress, DEFAULT_PORT) + .then([](kj::Own parsed) { return parsed->listen(); }); - tasks.add(inspectorListener.then( - [&inspectorService](kj::Own listener) mutable { + auto listen = inspectorListener.then([&inspectorService](kj::Own listener) mutable { KJ_LOG(INFO, "Inspector is listening"); - return inspectorService.listen(kj::mv(listener)); - }).exclusiveJoin(forkedDrainWhen.addBranch())); - } + return inspectorService->listen(kj::mv(listener)); + }); + kj::NEVER_DONE.wait(io.waitScope); + }); + thread.detach(); +} + +void Server::startServices(jsg::V8System& v8System, config::Config::Reader config, + kj::HttpHeaderTable::Builder& headerTableBuilder, + kj::ForkedPromise& forkedDrainWhen) { // --------------------------------------------------------------------------- // Configure services diff --git a/src/workerd/server/server.h b/src/workerd/server/server.h index 6234f4caab2e..134fa7dc5f23 100644 --- a/src/workerd/server/server.h +++ b/src/workerd/server/server.h @@ -74,6 +74,8 @@ class Server: private kj::TaskSet::ErrorHandler { struct Ephemeral {}; using ActorConfig = kj::OneOf; + class InspectorService; + private: kj::Filesystem& fs; kj::Timer& timer; @@ -175,11 +177,6 @@ class Server: private kj::TaskSet::ErrorHandler { class WorkerEntrypointService; class HttpListener; - class InspectorService; - - kj::Maybe> maybeInspectorService; - kj::Own makeInspectorService(kj::HttpHeaderTable::Builder& headerTableBuilder); - void startServices(jsg::V8System& v8System, config::Config::Reader config, kj::HttpHeaderTable::Builder& headerTableBuilder, kj::ForkedPromise& forkedDrainWhen);