From 96d188c7733b39fecb47b24be4d8af6087ae30b7 Mon Sep 17 00:00:00 2001 From: Lucas Furukawa Gadani Date: Tue, 20 Aug 2024 13:48:58 -0700 Subject: [PATCH] Return Future when publishing data Summary: The MoQSession needs to wait for the promise to be completed before it can start publishing more data. This diff returns the SemiFuture which allows the client to ensure that data has been flushed before writing more data. ``` F0819 15:11:20.110482 2032140288 HTTPTransaction.cpp:2159] Check failed: !writePromise_ Wait for previous write to complete ``` Reviewed By: afrind Differential Revision: D61499501 fbshipit-source-id: 1fcbb75169955d1a655fbfbe9c3b418072717b89 --- moxygen/MoQSession.cpp | 40 ++++++++++++++++++++++++++++------------ moxygen/MoQSession.h | 16 +++++++++++++--- 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/moxygen/MoQSession.cpp b/moxygen/MoQSession.cpp index db71382..d71d644 100644 --- a/moxygen/MoQSession.cpp +++ b/moxygen/MoQSession.cpp @@ -548,21 +548,22 @@ uint64_t MoQSession::order(const ObjectHeader& objHeader) { objOrder(objHeader.id)); } -void MoQSession::publish( +folly::SemiFuture MoQSession::publish( const ObjectHeader& objHeader, uint64_t payloadOffset, std::unique_ptr payload, bool eom) { XCHECK_EQ(objHeader.status, ObjectStatus::NORMAL); - publishImpl(objHeader, payloadOffset, std::move(payload), eom); + return publishImpl(objHeader, payloadOffset, std::move(payload), eom); } -void MoQSession::publishStatus(const ObjectHeader& objHeader) { +folly::SemiFuture MoQSession::publishStatus( + const ObjectHeader& objHeader) { XCHECK_NE(objHeader.status, ObjectStatus::NORMAL); - publishImpl(objHeader, 0, nullptr, true); + return publishImpl(objHeader, 0, nullptr, true); } -void MoQSession::publishImpl( +folly::SemiFuture MoQSession::publishImpl( const ObjectHeader& objHeader, uint64_t payloadOffset, std::unique_ptr payload, @@ -597,7 +598,8 @@ void MoQSession::publishImpl( << __func__ << " Can't start publishing in the middle. Disgregard data for this new obj with payloadOffset = " << payloadOffset << " sess=" << this; - return; + return folly::makeSemiFuture(folly::exception_wrapper( + std::runtime_error("Can't start publishing in the middle."))); } // Create a new stream (except for datagram) @@ -607,7 +609,8 @@ void MoQSession::publishImpl( if (!res) { // failed to create a stream XLOG(ERR) << "Failed to create uni stream" << " sess=" << this; - return; + return folly::makeSemiFuture(folly::exception_wrapper( + std::runtime_error("Failed to create uni stream."))); } stream = *res; XLOG(DBG4) << "New stream created, id: " << stream->getID() @@ -644,14 +647,16 @@ void MoQSession::publishImpl( if (objHeader.forwardPreference == ForwardPreference::Track) { if (objHeader.group < pubDataIt->second.group) { XLOG(ERR) << "Decreasing group in Track" << " sess=" << this; - return; + return folly::makeSemiFuture(folly::exception_wrapper( + std::runtime_error("Decreasing group in Track."))); } if (objHeader.group == pubDataIt->second.group) { if (objHeader.id < pubDataIt->second.objectID || (objHeader.id == pubDataIt->second.objectID && pubDataIt->second.offset != 0)) { XLOG(ERR) << "obj id must increase within group" << " sess=" << this; - return; + return folly::makeSemiFuture(folly::exception_wrapper( + std::runtime_error("obj id must increase within group."))); } } multiObject = true; @@ -660,7 +665,8 @@ void MoQSession::publishImpl( (objHeader.id == pubDataIt->second.objectID && pubDataIt->second.offset != 0)) { XLOG(ERR) << "obj id must increase within group" << " sess=" << this; - return; + return folly::makeSemiFuture(folly::exception_wrapper( + std::runtime_error("obj id must increase within group."))); } multiObject = true; } @@ -679,12 +685,14 @@ void MoQSession::publishImpl( if (pubDataIt->second.objectLength && *pubDataIt->second.objectLength < payloadLength) { XLOG(ERR) << "Object length exceeds header length" << " sess=" << this; - return; + return folly::makeSemiFuture(folly::exception_wrapper( + std::runtime_error("Object length exceeds header length."))); } writeBuf.append(std::move(payload)); if (sendAsDatagram) { wt_->sendDatagram(writeBuf.move()); publishDataMap_.erase(pubDataIt); + return folly::makeSemiFuture(); } else { bool streamEOM = (eom && objHeader.forwardPreference == ForwardPreference::Object) || @@ -692,8 +700,15 @@ void MoQSession::publishImpl( objHeader.status == ObjectStatus::END_OF_TRACK_AND_GROUP); XLOG_IF(DBG1, streamEOM) << "End of stream" << " sess=" << this; // TODO: verify that pubDataIt->second.objectLength is empty or 0 - wt_->writeStreamData( + auto writeRes = wt_->writeStreamData( pubDataIt->second.streamID, writeBuf.move(), streamEOM); + if (!writeRes) { + XLOG(ERR) << "Failed to write stream data." << " sess=" << this + << " error=" << static_cast(writeRes.error()); + return folly::makeSemiFuture( + folly::exception_wrapper(WebTransportException( + writeRes.error(), "Failed to write stream data."))); + } if (streamEOM) { publishDataMap_.erase(pubDataIt); } else { @@ -707,6 +722,7 @@ void MoQSession::publishImpl( } } } + return std::move(writeRes.value()); } } diff --git a/moxygen/MoQSession.h b/moxygen/MoQSession.h index a52ecb0..a5846e2 100644 --- a/moxygen/MoQSession.h +++ b/moxygen/MoQSession.h @@ -254,13 +254,23 @@ class MoQSession : public MoQCodec::Callback, void unsubscribe(Unsubscribe unsubscribe); void subscribeDone(SubscribeDone subDone); + class WebTransportException : public std::runtime_error { + public: + explicit WebTransportException( + proxygen::WebTransport::ErrorCode error, + const std::string& msg) + : std::runtime_error(msg), errorCode(error) {} + + proxygen::WebTransport::ErrorCode errorCode; + }; + // Publish this object. - void publish( + folly::SemiFuture publish( const ObjectHeader& objHeader, uint64_t payloadOffset, std::unique_ptr payload, bool eom); - void publishStatus(const ObjectHeader& objHeader); + folly::SemiFuture publishStatus(const ObjectHeader& objHeader); void onNewUniStream(proxygen::WebTransport::StreamReadHandle* rh) override; void onNewBidiStream(proxygen::WebTransport::BidiStreamHandle bh) override; @@ -306,7 +316,7 @@ class MoQSession : public MoQCodec::Callback, void onGoaway(Goaway goaway) override; void onConnectionError(ErrorCode error) override; - void publishImpl( + folly::SemiFuture publishImpl( const ObjectHeader& objHeader, uint64_t payloadOffset, std::unique_ptr payload,