From 5e78eaa8d6324e243456f451d04eb689ed2bff5d Mon Sep 17 00:00:00 2001 From: James M Snell Date: Wed, 31 Jan 2024 12:19:23 -0800 Subject: [PATCH] Use WeakRef for tracking locked streams readers and writers --- src/workerd/api/streams/common.h | 80 +++++++++++++------ src/workerd/api/streams/internal.c++ | 89 +++++++++------------ src/workerd/api/streams/internal.h | 9 +-- src/workerd/api/streams/readable.c++ | 21 +++-- src/workerd/api/streams/readable.h | 10 +++ src/workerd/api/streams/standard.c++ | 112 ++++++++++++--------------- src/workerd/api/streams/writable.c++ | 19 ++--- src/workerd/api/streams/writable.h | 8 +- src/workerd/util/weak-refs.h | 4 + 9 files changed, 182 insertions(+), 170 deletions(-) diff --git a/src/workerd/api/streams/common.h b/src/workerd/api/streams/common.h index 6cab4acbcee..f68f1bf0b59 100644 --- a/src/workerd/api/streams/common.h +++ b/src/workerd/api/streams/common.h @@ -318,9 +318,9 @@ class ReadableStreamController { // passing along the closed promise that will be used to communicate state to the // user code. // - // The Reader will hold a reference to the controller that will be cleared when the reader - // is released or destroyed. The controller is guaranteed to either outlive or detach the - // reader so the ReadableStreamController& reference should remain valid. + // The Reader holds a strong reference to the controller. The Controller will hold a weak + // reference to the reader. It is ok for the reader itself to be freed/garbage collected + // while still being attached to the controller, but not the other way around. virtual void attach( ReadableStreamController& controller, jsg::Promise closedPromise) = 0; @@ -328,6 +328,12 @@ class ReadableStreamController { // When a Reader lock is released, the controller will signal to the reader that it has been // detached. virtual void detach() = 0; + + virtual kj::Own> addWeakRef() = 0; + + private: + static kj::Badge getBadge() { return kj::Badge(); } + friend class ReaderImpl; }; struct ByobOptions { @@ -479,12 +485,12 @@ class ReadableStreamController { // Locks this controller to the given reader, returning true if the lock was successful, or false // if the controller was already locked. - virtual bool lockReader(jsg::Lock& js, Reader& reader) = 0; + virtual bool lockReader(jsg::Lock& js, kj::Own> reader) = 0; // Removes the lock and releases the reader from this controller. // maybeJs will be nullptr when the isolate lock is not available. // If maybeJs is set, the reader's closed promise will be resolved. - virtual void releaseReader(Reader& reader, kj::Maybe maybeJs) = 0; + virtual void releaseReader(jsg::Lock& js, Reader& reader) = 0; virtual kj::Maybe tryPipeLock(jsg::Ref destination) = 0; @@ -574,8 +580,9 @@ class WritableStreamController { // passing along the closed and ready promises that will be used to communicate state to the // user code. // - // The controller is guaranteed to either outlive the Writer or will detach the Writer so the - // WritableStreamController& reference should always remain valid. + // The Writer holds a strong reference to the controller. The Controller will hold a weak + // reference to the writer. It is ok for the writer itself to be freed/garbage collected + // while still being attached to the controller, but not the other way around. virtual void attach( WritableStreamController& controller, jsg::Promise closedPromise, @@ -588,6 +595,12 @@ class WritableStreamController { // The ready promise can be replaced whenever backpressure is signaled by the underlying // controller. virtual void replaceReadyPromise(jsg::Promise readyPromise) = 0; + + virtual kj::Own> addWeakRef() = 0; + + private: + static kj::Badge getBadge() { return kj::Badge(); } + friend class WritableStreamDefaultWriter; }; struct PendingAbort { @@ -672,12 +685,10 @@ class WritableStreamController { // Locks this controller to the given writer, returning true if the lock was successful, or false // if the controller was already locked. - virtual bool lockWriter(jsg::Lock& js, Writer& writer) = 0; + virtual bool lockWriter(jsg::Lock& js, kj::Own>) = 0; // Removes the lock and releases the writer from this controller. - // maybeJs will be nullptr when the isolate lock is not available. - // If maybeJs is set, the writer's closed and ready promises will be resolved. - virtual void releaseWriter(Writer& writer, kj::Maybe maybeJs) = 0; + virtual void releaseWriter(jsg::Lock& js, Writer& writer) = 0; virtual kj::Maybe> isErroring(jsg::Lock& js) = 0; @@ -710,19 +721,27 @@ struct Locked {}; // When a reader is locked to a ReadableStream, a ReaderLock instance // is used internally to represent the locked state in the ReadableStreamController. +// ReaderLocked maintains a weak referene to the actual Reader instance. It's ok +// for the Reader to be garbage collected while the ReadableStream is still alive but +// not vis versa. The Reader holds a strong reference to the ReadableStream only while +// it is attached. class ReaderLocked { public: ReaderLocked( - ReadableStreamController::Reader& reader, + kj::Own> reader, jsg::Promise::Resolver closedFulfiller, kj::Maybe> canceler = kj::none) - : reader(reader), + : reader(kj::mv(reader)), closedFulfiller(kj::mv(closedFulfiller)), canceler(kj::mv(canceler)) {} ReaderLocked(ReaderLocked&&) = default; ~ReaderLocked() noexcept(false) { - KJ_IF_SOME(r, reader) { r.detach(); } + if (reader.get() != nullptr) { + reader->runIfAlive([](auto& r) { + r.detach(); + }); + } } KJ_DISALLOW_COPY(ReaderLocked); @@ -730,8 +749,9 @@ class ReaderLocked { visitor.visit(closedFulfiller); } - ReadableStreamController::Reader& getReader() { - return KJ_ASSERT_NONNULL(reader); + kj::Maybe getReader() { + if (reader.get() == nullptr) return kj::none; + return reader->tryGet(); } kj::Maybe::Resolver>& getClosedFulfiller() { @@ -743,34 +763,43 @@ class ReaderLocked { } private: - kj::Maybe reader; + kj::Own> reader; kj::Maybe::Resolver> closedFulfiller; kj::Maybe> canceler; }; // When a writer is locked to a WritableStream, a WriterLock instance // is used internally to represent the locked state in the WritableStreamController. +// WriterLocked maintains a weak reference to the actual Writer instance. It's ok +// for the Writer to be garbage collected while the WritableStream is still alive but +// not vis versa. The Writer holds a strong reference to the WritableStream only while +// it is attached. class WriterLocked { public: WriterLocked( - WritableStreamController::Writer& writer, + kj::Own> writer, jsg::Promise::Resolver closedFulfiller, kj::Maybe::Resolver> readyFulfiller = kj::none) - : writer(writer), + : writer(kj::mv(writer)), closedFulfiller(kj::mv(closedFulfiller)), readyFulfiller(kj::mv(readyFulfiller)) {} WriterLocked(WriterLocked&&) = default; ~WriterLocked() noexcept(false) { - KJ_IF_SOME(w, writer) { w.detach(); } + if (writer.get() != nullptr) { + writer->runIfAlive([&](auto& w) { + w.detach(); + }); + } } void visitForGc(jsg::GcVisitor& visitor) { visitor.visit(closedFulfiller, readyFulfiller); } - WritableStreamController::Writer& getWriter() { - return KJ_ASSERT_NONNULL(writer); + kj::Maybe getWriter() { + if (writer.get() == nullptr) return kj::none; + return writer->tryGet(); } kj::Maybe::Resolver>& getClosedFulfiller() { @@ -782,14 +811,15 @@ class WriterLocked { } void setReadyFulfiller(jsg::PromiseResolverPair& pair) { - KJ_IF_SOME(w, writer) { + if (writer.get() == nullptr) return; + writer->runIfAlive([&](auto& w) { readyFulfiller = kj::mv(pair.resolver); w.replaceReadyPromise(kj::mv(pair.promise)); - } + }); } private: - kj::Maybe writer; + kj::Own> writer; kj::Maybe::Resolver> closedFulfiller; kj::Maybe::Resolver> readyFulfiller; }; diff --git a/src/workerd/api/streams/internal.c++ b/src/workerd/api/streams/internal.c++ index dd58957b284..40c51f70002 100644 --- a/src/workerd/api/streams/internal.c++ +++ b/src/workerd/api/streams/internal.c++ @@ -471,12 +471,7 @@ kj::Maybe>> WritableStreamSink::tryPumpFrom( // ======================================================================================= -ReadableStreamInternalController::~ReadableStreamInternalController() noexcept(false) { - KJ_IF_SOME(locked, readState.tryGet()) { - auto lock = kj::mv(locked); - readState.init(); - } -} +ReadableStreamInternalController::~ReadableStreamInternalController() noexcept(false) {} jsg::Ref ReadableStreamInternalController::addRef() { return KJ_ASSERT_NONNULL(owner).addRef(); @@ -761,7 +756,7 @@ kj::Maybe> ReadableStreamInternalController::remov KJ_UNREACHABLE; } -bool ReadableStreamInternalController::lockReader(jsg::Lock& js, Reader& reader) { +bool ReadableStreamInternalController::lockReader(jsg::Lock& js, kj::Own> reader) { if (isLockedToReader()) { return false; } @@ -769,8 +764,9 @@ bool ReadableStreamInternalController::lockReader(jsg::Lock& js, Reader& reader) auto prp = js.newPromiseAndResolver(); prp.promise.markAsHandled(js); - auto lock = ReaderLocked(reader, kj::mv(prp.resolver), + auto lock = ReaderLocked(kj::mv(reader), kj::mv(prp.resolver), IoContext::current().addObject(kj::heap())); + // Take care not to access reader directly after this point. Use the lock. KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { @@ -785,42 +781,30 @@ bool ReadableStreamInternalController::lockReader(jsg::Lock& js, Reader& reader) } readState = kj::mv(lock); - reader.attach(*this, kj::mv(prp.promise)); + + auto& inner = KJ_ASSERT_NONNULL(readState.get().getReader()); + inner.attach(*this, kj::mv(prp.promise)); return true; } -void ReadableStreamInternalController::releaseReader( - Reader& reader, - kj::Maybe maybeJs) { +void ReadableStreamInternalController::releaseReader(jsg::Lock& js, Reader& reader) { KJ_IF_SOME(locked, readState.tryGet()) { - KJ_ASSERT(&locked.getReader() == &reader); - KJ_IF_SOME(js, maybeJs) { - JSG_REQUIRE(KJ_ASSERT_NONNULL(locked.getCanceler())->isEmpty(), TypeError, - "Cannot call releaseLock() on a reader with outstanding read promises."); - maybeRejectPromise(js, - locked.getClosedFulfiller(), - js.v8TypeError("This ReadableStream reader has been released."_kj)); + KJ_IF_SOME(r, locked.getReader()) { + KJ_ASSERT(&r == &reader); } - auto lock = kj::mv(locked); - - // When maybeJs is nullptr, that means releaseReader was called when the reader is - // being deconstructed and not as the result of explicitly calling releaseLock. In - // that case, we don't want to change the lock state itself because we do not have - // an isolate lock. Moving the lock above will free the lock state while keeping the - // ReadableStream marked as locked. - if (maybeJs != kj::none) { - readState.template init(); - } - } -} + JSG_REQUIRE(KJ_ASSERT_NONNULL(locked.getCanceler())->isEmpty(), TypeError, + "Cannot call releaseLock() on a reader with outstanding read promises."); + maybeRejectPromise(js, + locked.getClosedFulfiller(), + js.v8TypeError("This ReadableStream reader has been released."_kj)); -WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) { - KJ_IF_SOME(locked, writeState.tryGet()) { auto lock = kj::mv(locked); - writeState.init(); + readState.template init(); } } +WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) {} + jsg::Ref WritableStreamInternalController::addRef() { return KJ_ASSERT_NONNULL(owner).addRef(); } @@ -1246,7 +1230,7 @@ kj::Maybe WritableStreamInternalController::getDesiredSize() { KJ_UNREACHABLE; } -bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer) { +bool WritableStreamInternalController::lockWriter(jsg::Lock& js, kj::Own> writer) { if (isLockedToWriter()) { return false; } @@ -1257,7 +1241,8 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer) auto readyPrp = js.newPromiseAndResolver(); readyPrp.promise.markAsHandled(js); - auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver)); + auto lock = WriterLocked(kj::mv(writer), kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver)); + // Careful not to access writer directly after this point. Access is through the lock. KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { @@ -1274,30 +1259,26 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer) } writeState = kj::mv(lock); - writer.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise)); + + auto& inner = KJ_ASSERT_NONNULL(writeState.get().getWriter()); + inner.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise)); + return true; } -void WritableStreamInternalController::releaseWriter( - Writer& writer, - kj::Maybe maybeJs) { +void WritableStreamInternalController::releaseWriter(jsg::Lock& js, Writer& writer) { KJ_IF_SOME(locked, writeState.tryGet()) { - KJ_ASSERT(&locked.getWriter() == &writer); - KJ_IF_SOME(js, maybeJs) { - maybeRejectPromise(js, - locked.getClosedFulfiller(), - js.v8TypeError("This WritableStream writer has been released."_kj)); + KJ_IF_SOME(w, locked.getWriter()) { + // Just an extra verification. + KJ_ASSERT(&w == &writer); } - auto lock = kj::mv(locked); - // When maybeJs is nullptr, that means releaseWriter was called when the writer is - // being deconstructed and not as the result of explicitly calling releaseLock and - // we do not have an isolate lock. In that case, we don't want to change the lock - // state itself. Moving the lock above will free the lock state while keeping the - // WritableStream marked as locked. - if (maybeJs != kj::none) { - writeState.template init(); - } + maybeRejectPromise(js, + locked.getClosedFulfiller(), + js.v8TypeError("This WritableStream writer has been released."_kj)); + + auto lock = kj::mv(locked); + writeState.template init(); } } diff --git a/src/workerd/api/streams/internal.h b/src/workerd/api/streams/internal.h index 89bcab17821..aa6170e8cf7 100644 --- a/src/workerd/api/streams/internal.h +++ b/src/workerd/api/streams/internal.h @@ -81,10 +81,9 @@ class ReadableStreamInternalController: public ReadableStreamController { bool isLockedToReader() const override { return !readState.is(); } - bool lockReader(jsg::Lock& js, Reader& reader) override; + bool lockReader(jsg::Lock& js, kj::Own> reader) override; - void releaseReader(Reader& reader, kj::Maybe maybeJs) override; - // See the comment for releaseReader in common.h for details on the use of maybeJs + void releaseReader(jsg::Lock& js, Reader& reader) override; kj::Maybe tryPipeLock(jsg::Ref destination) override; @@ -201,9 +200,9 @@ class WritableStreamInternalController: public WritableStreamController { bool isLockedToWriter() const override { return !writeState.is(); } - bool lockWriter(jsg::Lock& js, Writer& writer) override; + bool lockWriter(jsg::Lock& js, kj::Own> writer) override; - void releaseWriter(Writer& writer, kj::Maybe maybeJs) override; + void releaseWriter(jsg::Lock& js, Writer& writer) override; // See the comment for releaseWriter in common.h for details on the use of maybeJs kj::Maybe> isErroring(jsg::Lock& js) override { diff --git a/src/workerd/api/streams/readable.c++ b/src/workerd/api/streams/readable.c++ index 5df9ca5708c..7bdca05db28 100644 --- a/src/workerd/api/streams/readable.c++ +++ b/src/workerd/api/streams/readable.c++ @@ -11,15 +11,12 @@ namespace workerd::api { ReaderImpl::ReaderImpl(ReadableStreamController::Reader& reader) : ioContext(tryGetIoContext()), - reader(reader) {} + reader(reader), + self(kj::refcounted>( + ReadableStreamController::Reader::getBadge(), reader)) {} ReaderImpl::~ReaderImpl() noexcept(false) { - KJ_IF_SOME(stream, state.tryGet()) { - // There's a very good likelihood that this is called during GC or other - // cleanup so we have to make sure that releasing the reader does not also - // trigger resolution of the close promise. - stream->getController().releaseReader(reader, kj::none); - } + self->invalidate(ReadableStreamController::Reader::getBadge()); } void ReaderImpl::attach(ReadableStreamController& controller, jsg::Promise closedPromise) { @@ -79,7 +76,7 @@ jsg::MemoizedIdentity>& ReaderImpl::getClosed() { void ReaderImpl::lockToStream(jsg::Lock& js, ReadableStream& stream) { KJ_ASSERT(!stream.isLocked()); - KJ_ASSERT(stream.getController().lockReader(js, reader)); + KJ_ASSERT(stream.getController().lockReader(js, reader.addWeakRef())); } jsg::Promise ReaderImpl::read( @@ -138,7 +135,7 @@ void ReaderImpl::releaseLock(jsg::Lock& js) { KJ_FAIL_ASSERT("this reader was never attached"); } KJ_CASE_ONEOF(stream, Attached) { - stream->getController().releaseReader(reader, js); + stream->getController().releaseReader(js, reader); state.init(); return; } @@ -163,7 +160,8 @@ void ReaderImpl::visitForGc(jsg::GcVisitor& visitor) { // ====================================================================================== -ReadableStreamDefaultReader::ReadableStreamDefaultReader() : impl(*this) {} +ReadableStreamDefaultReader::ReadableStreamDefaultReader() + : Reader(), impl(*this) {} jsg::Ref ReadableStreamDefaultReader::constructor( jsg::Lock& js, @@ -213,7 +211,8 @@ void ReadableStreamDefaultReader::visitForGc(jsg::GcVisitor& visitor) { // ====================================================================================== -ReadableStreamBYOBReader::ReadableStreamBYOBReader() : impl(*this) {} +ReadableStreamBYOBReader::ReadableStreamBYOBReader() + : Reader(), impl(*this) {} jsg::Ref ReadableStreamBYOBReader::constructor( jsg::Lock& js, diff --git a/src/workerd/api/streams/readable.h b/src/workerd/api/streams/readable.h index 6f1c8a1d71b..8f39a038087 100644 --- a/src/workerd/api/streams/readable.h +++ b/src/workerd/api/streams/readable.h @@ -35,6 +35,10 @@ class ReaderImpl { void visitForGc(jsg::GcVisitor& visitor); + inline kj::Own> addWeakRef() { + return self->addRef(); + } + private: struct Initial {}; // While a Reader is attached to a ReadableStream, it holds a strong reference to the @@ -53,6 +57,8 @@ class ReaderImpl { kj::OneOf state = Initial(); kj::Maybe>> closedPromise; + kj::Own> self; + friend class ReadableStreamDefaultReader; friend class ReadableStreamBYOBReader; }; @@ -97,6 +103,8 @@ class ReadableStreamDefaultReader : public jsg::Object, inline bool isByteOriented() const override { return false; } + kj::Own> addWeakRef() override { return impl.addWeakRef(); } + private: ReaderImpl impl; @@ -162,6 +170,8 @@ class ReadableStreamBYOBReader: public jsg::Object, inline bool isByteOriented() const override { return true; } + kj::Own> addWeakRef() override { return impl.addWeakRef(); } + private: ReaderImpl impl; diff --git a/src/workerd/api/streams/standard.c++ b/src/workerd/api/streams/standard.c++ index 00fe659f384..bb43c5f0412 100644 --- a/src/workerd/api/streams/standard.c++ +++ b/src/workerd/api/streams/standard.c++ @@ -45,10 +45,10 @@ public: bool isLockedToReader() const { return !state.template is(); } - bool lockReader(jsg::Lock& js, Controller& self, Reader& reader); + bool lockReader(jsg::Lock& js, Controller& self, kj::Own> reader); // See the comment for releaseReader in common.h for details on the use of maybeJs - void releaseReader(Controller& self, Reader& reader, kj::Maybe maybeJs); + void releaseReader(jsg::Lock& js, Controller& self, Reader& reader); bool lock(); @@ -125,10 +125,10 @@ public: bool isLockedToWriter() const; - bool lockWriter(jsg::Lock& js, Controller& self, Writer& writer); + bool lockWriter(jsg::Lock& js, Controller& self, kj::Own> writer); // See the comment for releaseWriter in common.h for details on the use of maybeJs - void releaseWriter(Controller& self, Writer& writer, kj::Maybe maybeJs); + void releaseWriter(jsg::Lock& js, Controller& self, Writer& writer); void visitForGc(jsg::GcVisitor& visitor); @@ -179,7 +179,7 @@ template bool ReadableLockImpl::lockReader( jsg::Lock& js, Controller& self, - Reader& reader) { + kj::Own> reader) { if (isLockedToReader()) { return false; } @@ -187,7 +187,8 @@ bool ReadableLockImpl::lockReader( auto prp = js.newPromiseAndResolver(); prp.promise.markAsHandled(js); - auto lock = ReaderLocked(reader, kj::mv(prp.resolver)); + auto lock = ReaderLocked(kj::mv(reader), kj::mv(prp.resolver)); + // Take care not to access reader after this point. Use the lock. if (self.state.template is()) { maybeResolvePromise(js, lock.getClosedFulfiller()); @@ -196,41 +197,37 @@ bool ReadableLockImpl::lockReader( } state = kj::mv(lock); - reader.attach(self, kj::mv(prp.promise)); + + auto& inner = KJ_ASSERT_NONNULL(state.template get().getReader()); + inner.attach(self, kj::mv(prp.promise)); + return true; } template void ReadableLockImpl::releaseReader( + jsg::Lock& js, Controller& self, - Reader& reader, - kj::Maybe maybeJs) { + Reader& reader) { KJ_IF_SOME(locked, state.template tryGet()) { - KJ_ASSERT(&locked.getReader() == &reader); + KJ_IF_SOME(r, locked.getReader()) { + KJ_ASSERT(&r == &reader); + } - KJ_IF_SOME(js, maybeJs) { - JSG_REQUIRE(!self.hasPendingReadRequests(), - TypeError, - "Cannot call releaseLock() on a reader with outstanding read promises."); + JSG_REQUIRE(!self.hasPendingReadRequests(), + TypeError, + "Cannot call releaseLock() on a reader with outstanding read promises."); - maybeRejectPromise(js, - locked.getClosedFulfiller(), - js.v8TypeError("This ReadableStream reader has been released."_kj)); - } + maybeRejectPromise(js, + locked.getClosedFulfiller(), + js.v8TypeError("This ReadableStream reader has been released."_kj)); // Keep the kj::mv(locked) after the isolate and hasPendingReadRequests check above. // Moving will release the references and we don't want to do that if the hasPendingReadRequests // check fails. auto lock = kj::mv(locked); - // When maybeJs is nullptr, that means releaseReader was called when the reader is - // being deconstructed and not as the result of explicitly calling releaseLock and - // we do not have an isolate lock. In that case, we don't want to change the lock - // state itself. Moving the lock above will free the lock state while keeping the - // ReadableStream marked as locked. - if (maybeJs != kj::none) { - state.template init(); - } + state.template init(); } } @@ -328,7 +325,9 @@ bool WritableLockImpl::isLockedToWriter() const { } template -bool WritableLockImpl::lockWriter(jsg::Lock& js, Controller& self, Writer& writer) { +bool WritableLockImpl::lockWriter(jsg::Lock& js, + Controller& self, + kj::Own> writer) { if (isLockedToWriter()) { return false; } @@ -338,7 +337,8 @@ bool WritableLockImpl::lockWriter(jsg::Lock& js, Controller& self, W auto readyPrp = js.newPromiseAndResolver(); readyPrp.promise.markAsHandled(js); - auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver)); + auto lock = WriterLocked(kj::mv(writer), kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver)); + // Take care not to access writer directly after this point. Use the lock if (self.state.template is()) { maybeResolvePromise(js, lock.getClosedFulfiller()); @@ -353,32 +353,27 @@ bool WritableLockImpl::lockWriter(jsg::Lock& js, Controller& self, W } state = kj::mv(lock); - writer.attach(self, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise)); + + auto& inner = KJ_ASSERT_NONNULL(state.template get().getWriter()); + inner.attach(self, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise)); return true; } template void WritableLockImpl::releaseWriter( + jsg::Lock& js, Controller& self, - Writer& writer, - kj::Maybe maybeJs) { + Writer& writer) { auto& locked = state.template get(); - KJ_ASSERT(&locked.getWriter() == &writer); - KJ_IF_SOME(js, maybeJs) { - maybeRejectPromise(js, - locked.getClosedFulfiller(), - js.v8TypeError("This WritableStream writer has been released."_kj)); + KJ_IF_SOME(w, locked.getWriter()) { + KJ_ASSERT(&w == &writer); } + maybeRejectPromise(js, + locked.getClosedFulfiller(), + js.v8TypeError("This WritableStream writer has been released."_kj)); auto lock = kj::mv(locked); - // When maybeJs is nullptr, that means releaseWriter was called when the writer is - // being deconstructed and not as the result of explicitly calling releaseLock and - // we do not have an isolate lock. In that case, we don't want to change the lock - // state itself. Moving the lock above will free the lock state while keeping the - // WritableStream marked as locked. - if (maybeJs != kj::none) { - state.template init(); - } + state.template init(); } template @@ -647,7 +642,7 @@ public: bool isLockedToReader() const override; - bool lockReader(jsg::Lock& js, Reader& reader) override; + bool lockReader(jsg::Lock& js, kj::Own> reader) override; kj::Maybe> isErrored(jsg::Lock& js); @@ -665,8 +660,7 @@ public: jsg::Lock& js, kj::Maybe byobOptions) override; - // See the comment for releaseReader in common.h for details on the use of maybeJs - void releaseReader(Reader& reader, kj::Maybe maybeJs) override; + void releaseReader(jsg::Lock& js, Reader& reader) override; void setOwnerRef(ReadableStream& stream) override; @@ -784,14 +778,14 @@ public: inline bool isWritable() const { return state.is(); } - bool lockWriter(jsg::Lock& js, Writer& writer) override; + bool lockWriter(jsg::Lock& js, kj::Own> writer) override; void maybeRejectReadyPromise(jsg::Lock& js, v8::Local reason); void maybeResolveReadyPromise(jsg::Lock& js); // See the comment for releaseWriter in common.h for details on the use of maybeJs - void releaseWriter(Writer& writer, kj::Maybe maybeJs) override; + void releaseWriter(jsg::Lock& js, Writer& writer) override; kj::Maybe> removeSink(jsg::Lock& js) override; @@ -2407,8 +2401,8 @@ bool ReadableStreamJsController::isLockedToReader() const { return lock.isLockedToReader(); } -bool ReadableStreamJsController::lockReader(jsg::Lock& js, Reader& reader) { - return lock.lockReader(js, *this, reader); +bool ReadableStreamJsController::lockReader(jsg::Lock& js, kj::Own> reader) { + return lock.lockReader(js, *this, kj::mv(reader)); } jsg::Promise ReadableStreamJsController::pipeTo( @@ -2508,10 +2502,8 @@ kj::Maybe> ReadableStreamJsController::read( KJ_UNREACHABLE; } -void ReadableStreamJsController::releaseReader( - Reader& reader, - kj::Maybe maybeJs) { - lock.releaseReader(*this, reader, maybeJs); +void ReadableStreamJsController::releaseReader(jsg::Lock& js, Reader& reader) { + lock.releaseReader(js, *this, reader); } ReadableStreamController::Tee ReadableStreamJsController::tee(jsg::Lock& js) { @@ -3562,8 +3554,8 @@ bool WritableStreamJsController::isLockedToWriter() const { return !lock.state.is(); } -bool WritableStreamJsController::lockWriter(jsg::Lock& js, Writer& writer) { - return lock.lockWriter(js, *this, writer); +bool WritableStreamJsController::lockWriter(jsg::Lock& js, kj::Own> writer) { + return lock.lockWriter(js, *this, kj::mv(writer)); } void WritableStreamJsController::maybeRejectReadyPromise( @@ -3587,10 +3579,8 @@ void WritableStreamJsController::maybeResolveReadyPromise(jsg::Lock& js) { } } -void WritableStreamJsController::releaseWriter( - Writer& writer, - kj::Maybe maybeJs) { - lock.releaseWriter(*this, writer, maybeJs); +void WritableStreamJsController::releaseWriter(jsg::Lock& js, Writer& writer) { + lock.releaseWriter(js, *this, writer); } kj::Maybe> WritableStreamJsController::removeSink(jsg::Lock& js) { diff --git a/src/workerd/api/streams/writable.c++ b/src/workerd/api/streams/writable.c++ index 1eeab83013d..c3c505977d3 100644 --- a/src/workerd/api/streams/writable.c++ +++ b/src/workerd/api/streams/writable.c++ @@ -8,15 +8,12 @@ namespace workerd::api { WritableStreamDefaultWriter::WritableStreamDefaultWriter() - : ioContext(tryGetIoContext()) {} - -WritableStreamDefaultWriter::~WritableStreamDefaultWriter() noexcept(false) { - KJ_IF_SOME(stream, state.tryGet()) { - // Because this can be called during gc or other cleanup, it is important - // that releasing the writer does not cause the closed promise be resolved - // since that requires v8 heap allocations. - stream->getController().releaseWriter(*this, kj::none); - } + : Writer(), ioContext(tryGetIoContext()), + self(kj::refcounted>( + WritableStreamController::Writer::getBadge(), + *this)) {} +WritableStreamDefaultWriter::~WritableStreamDefaultWriter() { + self->invalidate(WritableStreamController::Writer::getBadge()); } jsg::Ref WritableStreamDefaultWriter::constructor( @@ -130,7 +127,7 @@ jsg::MemoizedIdentity>& WritableStreamDefaultWriter::getReady void WritableStreamDefaultWriter::lockToStream(jsg::Lock& js, WritableStream& stream) { KJ_ASSERT(!stream.isLocked()); - KJ_ASSERT(stream.getController().lockWriter(js, *this)); + KJ_ASSERT(stream.getController().lockWriter(js, addWeakRef())); } void WritableStreamDefaultWriter::releaseLock(jsg::Lock& js) { @@ -140,7 +137,7 @@ void WritableStreamDefaultWriter::releaseLock(jsg::Lock& js) { KJ_FAIL_ASSERT("this writer was never attached"); } KJ_CASE_ONEOF(stream, Attached) { - stream->getController().releaseWriter(*this, js); + stream->getController().releaseWriter(js, *this); state.init(); return; } diff --git a/src/workerd/api/streams/writable.h b/src/workerd/api/streams/writable.h index a5063eef17d..fafe6f4437d 100644 --- a/src/workerd/api/streams/writable.h +++ b/src/workerd/api/streams/writable.h @@ -12,11 +12,9 @@ class WritableStreamDefaultWriter: public jsg::Object, public WritableStreamController::Writer { public: explicit WritableStreamDefaultWriter(); - - ~WritableStreamDefaultWriter() noexcept(false) override; + ~WritableStreamDefaultWriter() override; // JavaScript API - static jsg::Ref constructor( jsg::Lock& js, jsg::Ref stream); @@ -75,6 +73,8 @@ class WritableStreamDefaultWriter: public jsg::Object, void replaceReadyPromise(jsg::Promise readyPromise) override; + inline kj::Own> addWeakRef() override { return self->addRef(); } + private: struct Initial {}; // While a Writer is attached to a WritableStream, it holds a strong reference to the @@ -93,6 +93,8 @@ class WritableStreamDefaultWriter: public jsg::Object, kj::Maybe>> closedPromise; kj::Maybe>> readyPromise; + kj::Own> self; + void visitForGc(jsg::GcVisitor& visitor); }; diff --git a/src/workerd/util/weak-refs.h b/src/workerd/util/weak-refs.h index 978559825a1..be4b17d15f2 100644 --- a/src/workerd/util/weak-refs.h +++ b/src/workerd/util/weak-refs.h @@ -94,6 +94,10 @@ class WeakRef final: public kj::Refcounted { inline kj::Own addRef() { return kj::addRef(*this); } inline bool isValid() const { return maybeThing != kj::none; } + inline void invalidate(kj::Badge) { + invalidate(); + } + private: friend T;