Skip to content

Commit

Permalink
refactor packWithFds
Browse files Browse the repository at this point in the history
Summary: in this diff we break `packWithFds` into several methods to make it more readable.

Reviewed By: praihan

Differential Revision: D61666103

fbshipit-source-id: d2bc80b1241e4edcff0d013510e1126b6be10b3b
  • Loading branch information
avalonalex authored and facebook-github-bot committed Aug 24, 2024
1 parent e6eab22 commit c82427f
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 63 deletions.
103 changes: 97 additions & 6 deletions thrift/lib/cpp2/transport/rocket/PayloadUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,102 @@

#include <thrift/lib/cpp2/transport/rocket/PayloadUtils.h>

namespace apache {
namespace thrift {
namespace rocket {
namespace apache::thrift::rocket {

template <typename Metadata>
void applyCompressionIfNeeded(
std::unique_ptr<folly::IOBuf>& payload, Metadata* metadata) {
if (auto compress = metadata->compression_ref()) {
apache::thrift::rocket::detail::compressPayload(payload, *compress);
}
}

template <typename Metadata>
void handleFds(
folly::SocketFds& fds,
Metadata* metadata,
folly::AsyncTransport* transport) {
auto numFds = fds.size();
if (numFds) {
FdMetadata fdMetadata;

// The kernel maximum is actually much lower (at least on Linux, and
// MacOS doesn't seem to document it at all), but that will only fail in
// in `AsyncFdSocket`.
constexpr auto numFdsTypeMax = std::numeric_limits<
op::get_native_type<FdMetadata, ident::numFds>>::max();
if (UNLIKELY(numFds > numFdsTypeMax)) {
LOG(DFATAL) << numFds << " would overflow FdMetadata::numFds";
fdMetadata.numFds() = numFdsTypeMax;
// This will cause "AsyncFdSocket::writeChainWithFds" to error out.
fdMetadata.fdSeqNum() = folly::SocketFds::kNoSeqNum;
} else {
// When received, the request will know to retrieve this many FDs.
fdMetadata.numFds() = numFds;
// FD sequence numbers count the total number of FDs sent on this
// socket, and are used to detect & fail on the dire class of bugs where
// the wrong FDs are about to be associated with a message.
//
// We currently require message bytes and FDs to be both sent and
// received in a coherent order, so sequence numbers here in `pack*` are
// expected to exactly match the sequencing of socket sends, and also the
// sequencing of `popNextReceivedFds` on the receiving side.
//
// NB: If `transport` is not backed by a `AsyncFdSocket*`, this will
// store `fdSeqNum == -1`, which cannot happen otherwise, thanks to
// AsyncFdSocket's 2^63 -> 0 wrap-around logic. Furthermore, the
// subsequent `writeChainWithFds` will discard `fds`. As a result, the
// recipient will see read errors on the FDs due to both `numFds` not
// matching, and `fdSeqNum` not matching.
fdMetadata.fdSeqNum() =
injectFdSocketSeqNumIntoFdsToSend(transport, &fds);
}

DCHECK(!metadata->fdMetadata().has_value());
metadata->fdMetadata() = fdMetadata;
}
}

template <typename Metadata>
rocket::Payload finalizePayload(
std::unique_ptr<folly::IOBuf>&& payload,
Metadata* metadata,
folly::SocketFds fds) {
auto ret = apache::thrift::rocket::detail::makePayload(
*metadata, std::move(payload));
if (fds.size()) {
ret.fds = std::move(fds.dcheckToSendOrEmpty());
}
return ret;
}

template <typename Metadata>
rocket::Payload packWithFds(
Metadata* metadata,
std::unique_ptr<folly::IOBuf>&& payload,
folly::SocketFds fds,
folly::AsyncTransport* transport) {
applyCompressionIfNeeded(payload, metadata);
handleFds(fds, metadata, transport);
return finalizePayload(std::move(payload), metadata, std::move(fds));
}

template rocket::Payload packWithFds<RequestRpcMetadata>(
RequestRpcMetadata*,
std::unique_ptr<folly::IOBuf>&&,
folly::SocketFds,
folly::AsyncTransport*);
template rocket::Payload packWithFds<ResponseRpcMetadata>(
ResponseRpcMetadata*,
std::unique_ptr<folly::IOBuf>&&,
folly::SocketFds,
folly::AsyncTransport*);
template rocket::Payload packWithFds<StreamPayloadMetadata>(
StreamPayloadMetadata*,
std::unique_ptr<folly::IOBuf>&&,
folly::SocketFds,
folly::AsyncTransport*);

namespace detail {

template <class Metadata>
Expand Down Expand Up @@ -72,6 +165,4 @@ template Payload makePayload<>(
template Payload makePayload<>(
const StreamPayloadMetadata&, std::unique_ptr<folly::IOBuf> data);
} // namespace detail
} // namespace rocket
} // namespace thrift
} // namespace apache
} // namespace apache::thrift::rocket
58 changes: 1 addition & 57 deletions thrift/lib/cpp2/transport/rocket/PayloadUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ namespace detail {
template <class Metadata>
Payload makePayload(
const Metadata& metadata, std::unique_ptr<folly::IOBuf> data);

extern template Payload makePayload<>(
const RequestRpcMetadata&, std::unique_ptr<folly::IOBuf> data);
extern template Payload makePayload<>(
const ResponseRpcMetadata&, std::unique_ptr<folly::IOBuf> data);
extern template Payload makePayload<>(
const StreamPayloadMetadata&, std::unique_ptr<folly::IOBuf> data);
} // namespace detail

template <typename T>
Expand Down Expand Up @@ -124,56 +117,7 @@ rocket::Payload packWithFds(
Metadata* metadata,
std::unique_ptr<folly::IOBuf>&& payload,
folly::SocketFds fds,
folly::AsyncTransport* transport) {
if (auto compress = metadata->compression_ref()) {
apache::thrift::rocket::detail::compressPayload(payload, *compress);
}
auto numFds = fds.size();
if (numFds) {
FdMetadata fdMetadata;

// The kernel maximum is actually much lower (at least on Linux, and
// MacOS doesn't seem to document it at all), but that will only fail in
// in `AsyncFdSocket`.
constexpr auto numFdsTypeMax = std::numeric_limits<
op::get_native_type<FdMetadata, ident::numFds>>::max();
if (UNLIKELY(numFds > numFdsTypeMax)) {
LOG(DFATAL) << numFds << " would overflow FdMetadata::numFds";
fdMetadata.numFds() = numFdsTypeMax;
// This will cause "AsyncFdSocket::writeChainWithFds" to error out.
fdMetadata.fdSeqNum() = folly::SocketFds::kNoSeqNum;
} else {
// When received, the request will know to retrieve this many FDs.
fdMetadata.numFds() = numFds;
// FD sequence numbers count the total number of FDs sent on this
// socket, and are used to detect & fail on the dire class of bugs where
// the wrong FDs are about to be associated with a message.
//
// We currently require message bytes and FDs to be both sent and
// received in a coherent order, so sequence numbers here in `pack*` are
// expected to exactly match the sequencing of socket sends, and also the
// sequencing of `popNextReceivedFds` on the receiving side.
//
// NB: If `transport` is not backed by a `AsyncFdSocket*`, this will
// store `fdSeqNum == -1`, which cannot happen otherwise, thanks to
// AsyncFdSocket's 2^63 -> 0 wrap-around logic. Furthermore, the
// subsequent `writeChainWithFds` will discard `fds`. As a result, the
// recipient will see read errors on the FDs due to both `numFds` not
// matching, and `fdSeqNum` not matching.
fdMetadata.fdSeqNum() =
injectFdSocketSeqNumIntoFdsToSend(transport, &fds);
}

DCHECK(!metadata->fdMetadata().has_value());
metadata->fdMetadata() = fdMetadata;
}
auto ret = apache::thrift::rocket::detail::makePayload(
*metadata, std::move(payload));
if (numFds) {
ret.fds = std::move(fds.dcheckToSendOrEmpty());
}
return ret;
}
folly::AsyncTransport* transport);

template <class PayloadType>
rocket::Payload pack(PayloadType&& payload, folly::AsyncTransport* transport) {
Expand Down

0 comments on commit c82427f

Please sign in to comment.