Skip to content

Commit

Permalink
Return Future when publishing data
Browse files Browse the repository at this point in the history
Summary:
The MoQSession needs to wait for the promise to be completed before it can start publishing more data.

This diff returns the SemiFuture which allows the client to ensure that data has been flushed before writing more data.

```
F0819 15:11:20.110482 2032140288 HTTPTransaction.cpp:2159] Check failed: !writePromise_ Wait for previous write to complete
```

Reviewed By: afrind

Differential Revision: D61499501

fbshipit-source-id: 1fcbb75169955d1a655fbfbe9c3b418072717b89
  • Loading branch information
lucasgadani authored and facebook-github-bot committed Aug 20, 2024
1 parent ffa6928 commit 96d188c
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 15 deletions.
40 changes: 28 additions & 12 deletions moxygen/MoQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -548,21 +548,22 @@ uint64_t MoQSession::order(const ObjectHeader& objHeader) {
objOrder(objHeader.id));
}

void MoQSession::publish(
folly::SemiFuture<folly::Unit> MoQSession::publish(
const ObjectHeader& objHeader,
uint64_t payloadOffset,
std::unique_ptr<folly::IOBuf> payload,
bool eom) {
XCHECK_EQ(objHeader.status, ObjectStatus::NORMAL);
publishImpl(objHeader, payloadOffset, std::move(payload), eom);
return publishImpl(objHeader, payloadOffset, std::move(payload), eom);
}

void MoQSession::publishStatus(const ObjectHeader& objHeader) {
folly::SemiFuture<folly::Unit> MoQSession::publishStatus(
const ObjectHeader& objHeader) {
XCHECK_NE(objHeader.status, ObjectStatus::NORMAL);
publishImpl(objHeader, 0, nullptr, true);
return publishImpl(objHeader, 0, nullptr, true);
}

void MoQSession::publishImpl(
folly::SemiFuture<folly::Unit> MoQSession::publishImpl(
const ObjectHeader& objHeader,
uint64_t payloadOffset,
std::unique_ptr<folly::IOBuf> payload,
Expand Down Expand Up @@ -597,7 +598,8 @@ void MoQSession::publishImpl(
<< __func__
<< " Can't start publishing in the middle. Disgregard data for this new obj with payloadOffset = "
<< payloadOffset << " sess=" << this;
return;
return folly::makeSemiFuture<folly::Unit>(folly::exception_wrapper(
std::runtime_error("Can't start publishing in the middle.")));
}

// Create a new stream (except for datagram)
Expand All @@ -607,7 +609,8 @@ void MoQSession::publishImpl(
if (!res) {
// failed to create a stream
XLOG(ERR) << "Failed to create uni stream" << " sess=" << this;
return;
return folly::makeSemiFuture<folly::Unit>(folly::exception_wrapper(
std::runtime_error("Failed to create uni stream.")));
}
stream = *res;
XLOG(DBG4) << "New stream created, id: " << stream->getID()
Expand Down Expand Up @@ -644,14 +647,16 @@ void MoQSession::publishImpl(
if (objHeader.forwardPreference == ForwardPreference::Track) {
if (objHeader.group < pubDataIt->second.group) {
XLOG(ERR) << "Decreasing group in Track" << " sess=" << this;
return;
return folly::makeSemiFuture<folly::Unit>(folly::exception_wrapper(
std::runtime_error("Decreasing group in Track.")));
}
if (objHeader.group == pubDataIt->second.group) {
if (objHeader.id < pubDataIt->second.objectID ||
(objHeader.id == pubDataIt->second.objectID &&
pubDataIt->second.offset != 0)) {
XLOG(ERR) << "obj id must increase within group" << " sess=" << this;
return;
return folly::makeSemiFuture<folly::Unit>(folly::exception_wrapper(
std::runtime_error("obj id must increase within group.")));
}
}
multiObject = true;
Expand All @@ -660,7 +665,8 @@ void MoQSession::publishImpl(
(objHeader.id == pubDataIt->second.objectID &&
pubDataIt->second.offset != 0)) {
XLOG(ERR) << "obj id must increase within group" << " sess=" << this;
return;
return folly::makeSemiFuture<folly::Unit>(folly::exception_wrapper(
std::runtime_error("obj id must increase within group.")));
}
multiObject = true;
}
Expand All @@ -679,21 +685,30 @@ void MoQSession::publishImpl(
if (pubDataIt->second.objectLength &&
*pubDataIt->second.objectLength < payloadLength) {
XLOG(ERR) << "Object length exceeds header length" << " sess=" << this;
return;
return folly::makeSemiFuture<folly::Unit>(folly::exception_wrapper(
std::runtime_error("Object length exceeds header length.")));
}
writeBuf.append(std::move(payload));
if (sendAsDatagram) {
wt_->sendDatagram(writeBuf.move());
publishDataMap_.erase(pubDataIt);
return folly::makeSemiFuture();
} else {
bool streamEOM =
(eom && objHeader.forwardPreference == ForwardPreference::Object) ||
(objHeader.status == ObjectStatus::END_OF_GROUP ||
objHeader.status == ObjectStatus::END_OF_TRACK_AND_GROUP);
XLOG_IF(DBG1, streamEOM) << "End of stream" << " sess=" << this;
// TODO: verify that pubDataIt->second.objectLength is empty or 0
wt_->writeStreamData(
auto writeRes = wt_->writeStreamData(
pubDataIt->second.streamID, writeBuf.move(), streamEOM);
if (!writeRes) {
XLOG(ERR) << "Failed to write stream data." << " sess=" << this
<< " error=" << static_cast<int>(writeRes.error());
return folly::makeSemiFuture<folly::Unit>(
folly::exception_wrapper(WebTransportException(
writeRes.error(), "Failed to write stream data.")));
}
if (streamEOM) {
publishDataMap_.erase(pubDataIt);
} else {
Expand All @@ -707,6 +722,7 @@ void MoQSession::publishImpl(
}
}
}
return std::move(writeRes.value());
}
}

Expand Down
16 changes: 13 additions & 3 deletions moxygen/MoQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,13 +254,23 @@ class MoQSession : public MoQCodec::Callback,
void unsubscribe(Unsubscribe unsubscribe);
void subscribeDone(SubscribeDone subDone);

class WebTransportException : public std::runtime_error {
public:
explicit WebTransportException(
proxygen::WebTransport::ErrorCode error,
const std::string& msg)
: std::runtime_error(msg), errorCode(error) {}

proxygen::WebTransport::ErrorCode errorCode;
};

// Publish this object.
void publish(
folly::SemiFuture<folly::Unit> publish(
const ObjectHeader& objHeader,
uint64_t payloadOffset,
std::unique_ptr<folly::IOBuf> payload,
bool eom);
void publishStatus(const ObjectHeader& objHeader);
folly::SemiFuture<folly::Unit> publishStatus(const ObjectHeader& objHeader);

void onNewUniStream(proxygen::WebTransport::StreamReadHandle* rh) override;
void onNewBidiStream(proxygen::WebTransport::BidiStreamHandle bh) override;
Expand Down Expand Up @@ -306,7 +316,7 @@ class MoQSession : public MoQCodec::Callback,
void onGoaway(Goaway goaway) override;
void onConnectionError(ErrorCode error) override;

void publishImpl(
folly::SemiFuture<folly::Unit> publishImpl(
const ObjectHeader& objHeader,
uint64_t payloadOffset,
std::unique_ptr<folly::IOBuf> payload,
Expand Down

0 comments on commit 96d188c

Please sign in to comment.