Skip to content

Commit

Permalink
Use iovec instead of IOBuf in QuicAsyncUDPSocket::write and QuicAsync…
Browse files Browse the repository at this point in the history
…UDPSocket::writeGSO

Summary: See title

Reviewed By: mjoras

Differential Revision: D61048705

fbshipit-source-id: 60dc63cc67f63be6f0ac6cbe0e766172a8c79d7c
  • Loading branch information
Aman Sharma authored and facebook-github-bot committed Oct 2, 2024
1 parent 924183d commit 2369ecb
Show file tree
Hide file tree
Showing 27 changed files with 626 additions and 454 deletions.
6 changes: 6 additions & 0 deletions quic/QuicConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ constexpr uint16_t kDefaultMsgSizeBackOffSize = 50;
// larger than this, unless configured otherwise.
constexpr uint16_t kDefaultUDPReadBufferSize = 1500;

// UDP's typical MTU size is 1500, so a large number of buffers
// does not make sense. We can optimize for buffer chains with
// fewer than 16 buffers, which is the highest I can think of
// for a real use case.
constexpr size_t kNumIovecBufferChains = 16;

// Number of GRO buffers to use
// 1 means GRO is not enabled
// 64 is the max possible value
Expand Down
17 changes: 13 additions & 4 deletions quic/api/QuicBatchWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ bool SinglePacketBatchWriter::append(
ssize_t SinglePacketBatchWriter::write(
QuicAsyncUDPSocket& sock,
const folly::SocketAddress& address) {
return sock.write(address, buf_);
iovec vec[kNumIovecBufferChains];
size_t iovec_len = fillIovec(buf_, vec);
return sock.write(address, vec, iovec_len);
}

// SinglePacketInplaceBatchWriter
Expand All @@ -54,7 +56,10 @@ ssize_t SinglePacketInplaceBatchWriter::write(
const folly::SocketAddress& address) {
auto& buf = conn_.bufAccessor->buf();
CHECK(!conn_.bufAccessor->isChained());
auto ret = sock.write(address, buf);

iovec vec[kNumIovecBufferChains];
size_t iovec_len = fillIovec(buf, vec);
auto ret = sock.write(address, vec, iovec_len);
conn_.bufAccessor->clear();
return ret;
}
Expand Down Expand Up @@ -102,7 +107,9 @@ bool SinglePacketBackpressureBatchWriter::append(
ssize_t SinglePacketBackpressureBatchWriter::write(
QuicAsyncUDPSocket& sock,
const folly::SocketAddress& address) {
auto written = sock.write(address, buf_);
iovec vec[kNumIovecBufferChains];
size_t iovec_len = fillIovec(buf_, vec);
auto written = sock.write(address, vec, iovec_len);
lastWriteSuccessful_ = written > 0;
return written;
}
Expand Down Expand Up @@ -149,7 +156,9 @@ ssize_t SendmmsgPacketBatchWriter::write(
const folly::SocketAddress& address) {
CHECK_GT(bufs_.size(), 0);
if (bufs_.size() == 1) {
return sock.write(address, bufs_[0]);
iovec vec[kNumIovecBufferChains];
size_t iovec_len = fillIovec(bufs_.at(0), vec);
return sock.write(address, vec, iovec_len);
}

int ret = sock.writem(
Expand Down
15 changes: 11 additions & 4 deletions quic/api/QuicGsoBatchWriters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ ssize_t GSOPacketBatchWriter::write(
auto options =
QuicAsyncUDPSocket::WriteOptions(gsoVal, false /*zerocopyVal*/);
options.txTime = txTime_;
return sock.writeGSO(address, buf_, options);
iovec vec[kNumIovecBufferChains];
size_t iovec_len = fillIovec(buf_, vec);
return sock.writeGSO(address, vec, iovec_len, options);
}

GSOInplacePacketBatchWriter::GSOInplacePacketBatchWriter(
Expand Down Expand Up @@ -159,7 +161,9 @@ ssize_t GSOInplacePacketBatchWriter::write(
auto options =
QuicAsyncUDPSocket::WriteOptions(gsoVal, false /*zerocopyVal*/);
options.txTime = txTime_;
auto bytesWritten = sock.writeGSO(address, buf, options);
iovec vec[kNumIovecBufferChains];
size_t iovec_len = fillIovec(buf, vec);
auto bytesWritten = sock.writeGSO(address, vec, iovec_len, options);
/**
* If there is one more bytes after lastPacketEnd_, that means there is a
* packet we choose not to write in this batch (e.g., it has a size larger
Expand Down Expand Up @@ -281,8 +285,11 @@ ssize_t SendmmsgGSOPacketBatchWriter::write(
const folly::SocketAddress& /*unused*/) {
CHECK_GT(bufs_.size(), 0);
if (bufs_.size() == 1) {
return (currBufs_ > 1) ? sock.writeGSO(addrs_[0], bufs_[0], options_[0])
: sock.write(addrs_[0], bufs_[0]);
iovec vec[kNumIovecBufferChains];
size_t iovec_len = fillIovec(bufs_[0], vec);
return (currBufs_ > 1)
? sock.writeGSO(addrs_[0], vec, iovec_len, options_[0])
: sock.write(addrs_[0], vec, iovec_len);
}

int ret = sock.writemGSO(
Expand Down
4 changes: 3 additions & 1 deletion quic/api/QuicTransportFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,9 @@ void writeCloseCommon(
// best effort writing to the socket, ignore any errors.

Buf packetBufPtr = packetBuf.clone();
auto ret = sock.write(connection.peerAddress, packetBufPtr);
iovec vec[kNumIovecBufferChains];
size_t iovec_len = fillIovec(packetBufPtr, vec);
auto ret = sock.write(connection.peerAddress, vec, iovec_len);
connection.lossState.totalBytesSent += packetSize;
if (ret < 0) {
VLOG(4) << "Error writing connection close " << folly::errnoStr(errno)
Expand Down
1 change: 1 addition & 0 deletions quic/api/test/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ cpp_unittest(
deps = [
"//quic/api:quic_batch_writer",
"//quic/common/events:folly_eventbase",
"//quic/common/test:test_utils",
"//quic/common/testutil:mock_async_udp_socket",
"//quic/common/udpsocket:folly_async_udp_socket",
"//quic/fizz/server/handshake:fizz_server_handshake",
Expand Down
1 change: 1 addition & 0 deletions quic/api/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ quic_add_test(TARGET QuicBatchWriterTest
mvfst_buf_accessor
mvfst_server
mvfst_transport
mvfst_test_utils
)

quic_add_test(TARGET QuicStreamAsyncTransportTest
Expand Down
65 changes: 34 additions & 31 deletions quic/api/test/QuicBatchWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <quic/api/QuicBatchWriter.h>
#include <quic/api/QuicBatchWriterFactory.h>
#include <quic/common/events/FollyQuicEventBase.h>
#include <quic/common/test/TestUtils.h>
#include <quic/common/udpsocket/FollyQuicAsyncUDPSocket.h>

#include <gtest/gtest.h>
Expand Down Expand Up @@ -464,12 +465,13 @@ TEST_F(QuicBatchWriterTest, InplaceWriterWriteAll) {
ASSERT_TRUE(
batchWriter->append(nullptr, 700, folly::SocketAddress(), nullptr));

EXPECT_CALL(sock, writeGSO(_, _, _))
EXPECT_CALL(sock, writeGSO(_, _, _, _))
.Times(1)
.WillOnce(Invoke([&](const auto& /* addr */,
const std::unique_ptr<folly::IOBuf>& buf,
const struct iovec* vec,
size_t,
QuicAsyncUDPSocket::WriteOptions options) {
EXPECT_EQ(1000 * 5 + 700, buf->length());
EXPECT_EQ(1000 * 5 + 700, vec[0].iov_len);
EXPECT_EQ(1000, options.gso);
return 1000 * 5 + 700;
}));
Expand Down Expand Up @@ -506,14 +508,13 @@ TEST_F(QuicBatchWriterTest, InplaceWriterWriteOne) {
ASSERT_FALSE(
batchWriter->append(nullptr, 1000, folly::SocketAddress(), nullptr));

EXPECT_CALL(sock, writeGSO(_, _, _))
EXPECT_CALL(sock, writeGSO(_, _, _, _))
.Times(1)
.WillOnce(Invoke([&](const auto& /* addr */,
const std::unique_ptr<folly::IOBuf>& buf,
auto) {
EXPECT_EQ(1000, buf->length());
return 1000;
}));
.WillOnce(Invoke(
[&](const auto& /* addr */, const struct iovec* vec, size_t, auto) {
EXPECT_EQ(1000, vec[0].iov_len);
return 1000;
}));
EXPECT_EQ(1000, batchWriter->write(sock, folly::SocketAddress()));

EXPECT_TRUE(bufAccessor->ownsBuffer());
Expand Down Expand Up @@ -550,12 +551,13 @@ TEST_F(QuicBatchWriterTest, InplaceWriterLastOneTooBig) {
bufAccessor->release(std::move(buf));
EXPECT_TRUE(batchWriter->needsFlush(1000));

EXPECT_CALL(sock, writeGSO(_, _, _))
EXPECT_CALL(sock, writeGSO(_, _, _, _))
.Times(1)
.WillOnce(Invoke([&](const auto& /* addr */,
const std::unique_ptr<folly::IOBuf>& buf,
const struct iovec* vec,
size_t,
QuicAsyncUDPSocket::WriteOptions options) {
EXPECT_EQ(5 * 700, buf->length());
EXPECT_EQ(5 * 700, vec[0].iov_len);
EXPECT_EQ(700, options.gso);
return 700 * 5;
}));
Expand Down Expand Up @@ -598,14 +600,13 @@ TEST_F(QuicBatchWriterTest, InplaceWriterBufResidueCheck) {
rawBuf->append(packetSizeBig);
EXPECT_TRUE(batchWriter->needsFlush(packetSizeBig));

EXPECT_CALL(sock, writeGSO(_, _, _))
EXPECT_CALL(sock, writeGSO(_, _, _, _))
.Times(1)
.WillOnce(Invoke([&](const auto& /* addr */,
const std::unique_ptr<folly::IOBuf>& buf,
auto) {
EXPECT_EQ(700, buf->length());
return 700;
}));
.WillOnce(Invoke(
[&](const auto& /* addr */, const struct iovec* vec, size_t, auto) {
EXPECT_EQ(700, vec[0].iov_len);
return 700;
}));
// No crash:
EXPECT_EQ(700, batchWriter->write(sock, folly::SocketAddress()));
EXPECT_EQ(1009, rawBuf->length());
Expand Down Expand Up @@ -751,13 +752,13 @@ TEST_F(SinglePacketInplaceBatchWriterTest, TestWrite) {
std::shared_ptr<FollyQuicEventBase> qEvb =
std::make_shared<FollyQuicEventBase>(&evb);
quic::test::MockAsyncUDPSocket sock(qEvb);
EXPECT_CALL(sock, write(_, _))
EXPECT_CALL(sock, write(_, _, _))
.Times(1)
.WillOnce(Invoke([&](const auto& /* addr */,
const std::unique_ptr<folly::IOBuf>& buf) {
EXPECT_EQ(appendSize, buf->length());
return appendSize;
}));
.WillOnce(
Invoke([&](const auto& /* addr */, const struct iovec* vec, size_t) {
EXPECT_EQ(appendSize, vec[0].iov_len);
return appendSize;
}));
EXPECT_EQ(appendSize, batchWriter->write(sock, folly::SocketAddress()));
EXPECT_TRUE(batchWriter->empty());
}
Expand Down Expand Up @@ -822,10 +823,11 @@ TEST_F(SinglePacketBackpressureBatchWriterTest, TestFailedWriteCachedOnEAGAIN) {
folly::SocketAddress(),
&sock_));

EXPECT_CALL(sock_, write(_, _))
EXPECT_CALL(sock_, write(_, _, _))
.Times(1)
.WillOnce(Invoke([&](const auto& /* addr */,
const std::unique_ptr<folly::IOBuf>& /*buf*/) {
const struct iovec* /* vec */,
size_t /* iovec_len */) {
errno = EAGAIN;
return 0;
}));
Expand All @@ -846,11 +848,12 @@ TEST_F(SinglePacketBackpressureBatchWriterTest, TestFailedWriteCachedOnEAGAIN) {
EXPECT_FALSE(conn_.pendingWriteBatch_.buf);

// The write succeeds
EXPECT_CALL(sock_, write(_, _))
EXPECT_CALL(sock_, write(_, _, _))
.Times(1)
.WillOnce(Invoke([&](const auto& /* addr */,
const std::unique_ptr<folly::IOBuf>& buf) {
return buf->computeChainDataLength();
const struct iovec* vec,
size_t iovec_len) {
return ::quic::test::getTotalIovecLen(vec, iovec_len);
}));
EXPECT_EQ(
batchWriter->write(sock_, folly::SocketAddress()), testString.size());
Expand Down
52 changes: 29 additions & 23 deletions quic/api/test/QuicTransportBaseTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1718,7 +1718,7 @@ TEST_P(QuicTransportImplTestBase, ReadDataAlsoChecksLossAlarm) {
TEST_P(QuicTransportImplTestBase, ConnectionErrorOnWrite) {
transport->transportConn->oneRttWriteCipher = test::createNoOpAead();
auto stream = transport->createBidirectionalStream().value();
EXPECT_CALL(*socketPtr, write(_, _))
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillOnce(SetErrnoAndReturn(ENETUNREACH, -1));
transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true, nullptr);
transport->addDataToStream(
Expand All @@ -1743,10 +1743,12 @@ TEST_P(QuicTransportImplTestBase, ReadErrorUnsanitizedErrorMsg) {
EXPECT_EQ("You need to calm down.", error.message);
}));

EXPECT_CALL(*socketPtr, write(_, _)).WillOnce(Invoke([](auto&, auto&) {
throw std::runtime_error("You need to calm down.");
return 0;
}));
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillOnce(
Invoke([](const folly::SocketAddress&, const struct iovec*, size_t) {
throw std::runtime_error("You need to calm down.");
return 0;
}));
transport->writeChain(
stream,
folly::IOBuf::copyBuffer("You are being too loud."),
Expand All @@ -1765,10 +1767,12 @@ TEST_P(QuicTransportImplTestBase, ConnectionErrorUnhandledException) {
onConnectionSetupError(QuicError(
QuicErrorCode(TransportErrorCode::INTERNAL_ERROR),
std::string("Well there's your problem"))));
EXPECT_CALL(*socketPtr, write(_, _)).WillOnce(Invoke([](auto&, auto&) {
throw std::runtime_error("Well there's your problem");
return 0;
}));
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillOnce(
Invoke([](const folly::SocketAddress&, const struct iovec*, size_t) {
throw std::runtime_error("Well there's your problem");
return 0;
}));
transport->writeChain(stream, folly::IOBuf::copyBuffer("Hey"), true, nullptr);
transport->addDataToStream(
stream, StreamBuffer(folly::IOBuf::copyBuffer("Data"), 0));
Expand Down Expand Up @@ -2939,7 +2943,7 @@ TEST_P(QuicTransportImplTestBase, TestGracefulCloseWithActiveStream) {
transport->notifyPendingWriteOnConnection(&wcbConn);
transport->notifyPendingWriteOnStream(stream, &wcb);
transport->setReadCallback(stream, &rcb);
EXPECT_CALL(*socketPtr, write(_, _))
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
Expand Down Expand Up @@ -2993,7 +2997,7 @@ TEST_P(QuicTransportImplTestBase, TestGracefulCloseWithNoActiveStream) {
EXPECT_CALL(connCallback, onConnectionError(_)).Times(0);

transport->setReadCallback(stream, &rcb);
EXPECT_CALL(*socketPtr, write(_, _))
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
Expand Down Expand Up @@ -3055,7 +3059,7 @@ TEST_P(QuicTransportImplTestBase, TestImmediateClose) {
transport->notifyPendingWriteOnStream(stream, &wcb);
transport->setReadCallback(stream, &rcb);
transport->setPeekCallback(stream, &pcb);
EXPECT_CALL(*socketPtr, write(_, _))
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillRepeatedly(SetErrnoAndReturn(EAGAIN, -1));
transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, &deliveryCb);
EXPECT_CALL(txCb, onByteEventRegistered(getTxMatcher(stream, 0)));
Expand Down Expand Up @@ -3195,7 +3199,8 @@ TEST_P(QuicTransportImplTestBase, ExceptionInWriteLooperDoesNotCrash) {
transport->writeChain(stream, IOBuf::copyBuffer("hello"), true, nullptr);
transport->addDataToStream(
stream, StreamBuffer(IOBuf::copyBuffer("hello"), 0, false));
EXPECT_CALL(*socketPtr, write(_, _)).WillOnce(SetErrnoAndReturn(EBADF, -1));
EXPECT_CALL(*socketPtr, write(_, _, _))
.WillOnce(SetErrnoAndReturn(EBADF, -1));
EXPECT_CALL(connSetupCallback, onConnectionSetupError(_))
.WillOnce(Invoke([&](auto) { transport.reset(); }));
transport->writeLooper()->runLoopCallback();
Expand Down Expand Up @@ -4913,14 +4918,14 @@ TEST_P(
}));

// Fail the first write loop.
EXPECT_CALL(*socketPtr, write(_, _))
EXPECT_CALL(*socketPtr, write(_, _, _))
.Times(2) // We attempt to flush the batch twice inside the write loop.
// Fail both.
.WillRepeatedly(Invoke([&](const auto& /* addr */,
const std::unique_ptr<folly::IOBuf>& /*buf*/) {
errno = EAGAIN;
return 0;
}));
.WillRepeatedly(
Invoke([&](const folly::SocketAddress&, const struct iovec*, size_t) {
errno = EAGAIN;
return 0;
}));

transport->writeLooper()->run(true /* thisIteration */);
EXPECT_TRUE(transport->writeLooper()->isRunning());
Expand All @@ -4937,12 +4942,13 @@ TEST_P(
EXPECT_TRUE(writeCallbackArmed);

// Reset will make one write attempt. We don't care what happens to it
EXPECT_CALL(*socketPtr, write(_, _))
EXPECT_CALL(*socketPtr, write(_, _, _))
.Times(1)
.WillRepeatedly(Invoke([&](const auto& /* addr */,
const std::unique_ptr<folly::IOBuf>& buf) {
.WillRepeatedly(Invoke([&](const folly::SocketAddress&,
const struct iovec* vec,
size_t iovec_len) {
errno = 0;
return buf->computeChainDataLength();
return getTotalIovecLen(vec, iovec_len);
}));
transport.reset();
}
Expand Down
Loading

0 comments on commit 2369ecb

Please sign in to comment.