Skip to content

Commit

Permalink
udp: properly handle truncated/dropped datagrams (#14122)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Klein <mklein@lyft.com>
Signed-off-by: Christoph Pakulski <christoph@tetrate.io>
Co-authored-by: Matt Klein <mklein@lyft.com>
Co-authored-by: Christoph Pakulski <christoph@tetrate.io>
  • Loading branch information
cpakulski and mattklein123 authored Nov 20, 2020
1 parent ca6e246 commit 1ed6ddf
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 40 deletions.
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Bug Fixes
* proxy_proto: fixed a bug where the wrong downstream address got sent to upstream connections.
* tls: fix detection of the upstream connection close event.
* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers.
* udp: fixed issue in which receiving truncated UDP datagrams would cause Envoy to crash.
* watchdog: touch the watchdog before most event loop operations to avoid misses when handling bursts of callbacks.

Removed Config or Runtime
Expand Down
3 changes: 3 additions & 0 deletions include/envoy/network/io_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class IoHandle {
unsigned int msg_len_{0};
// The gso_size, if specified in the transport header
unsigned int gso_size_{0};
// If true indicates a successful syscall, but the packet was dropped due to truncation. We do
// not support receiving truncated packets.
bool truncated_and_dropped_{false};
};

/**
Expand Down
12 changes: 10 additions & 2 deletions source/common/api/win32/os_sys_calls_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,22 @@ SysCallSizeResult OsSysCallsImpl::recv(os_fd_t socket, void* buffer, size_t leng
}

SysCallSizeResult OsSysCallsImpl::recvmsg(os_fd_t sockfd, msghdr* msg, int flags) {
DWORD bytes_received;
DWORD bytes_received = 0;
LPFN_WSARECVMSG recvmsg_fn_ptr = getFnPtrWSARecvMsg();
wsamsgResult wsamsg = msghdrToWSAMSG(msg);
// Windows supports only a single flag on input to WSARecvMsg
wsamsg.wsamsg_->dwFlags = flags & MSG_PEEK;
const int rc = recvmsg_fn_ptr(sockfd, wsamsg.wsamsg_.get(), &bytes_received, nullptr, nullptr);
if (rc == SOCKET_ERROR) {
return {-1, ::WSAGetLastError()};
// We try to match the UNIX behavior for truncated packages. In that case the return code is
// the length of the allocated buffer and we get the value from `dwFlags`.
auto last_error = ::WSAGetLastError();
if (last_error == WSAEMSGSIZE) {
msg->msg_flags = wsamsg.wsamsg_->dwFlags;
return {bytes_received, 0};
}

return {rc, last_error};
}
msg->msg_namelen = wsamsg.wsamsg_->namelen;
msg->msg_flags = wsamsg.wsamsg_->dwFlags;
Expand Down
41 changes: 30 additions & 11 deletions source/common/network/io_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ in_addr addressFromMessage(const cmsghdr& cmsg) {
#endif
}

constexpr int messageTruncatedOption() {
#if defined(__APPLE__)
// OSX does not support passing `MSG_TRUNC` to recvmsg and recvmmsg. This does not effect
// functionality and it primarily used for logging.
return 0;
#else
return MSG_TRUNC;
#endif
}

} // namespace

namespace Network {
Expand Down Expand Up @@ -350,7 +360,8 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
hdr.msg_flags = 0;
hdr.msg_control = cbuf.begin();
hdr.msg_controllen = cmsg_space_;
const Api::SysCallSizeResult result = Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, 0);
Api::SysCallSizeResult result =
Api::OsSysCallsSingleton::get().recvmsg(fd_, &hdr, messageTruncatedOption());
if (result.rc_ < 0) {
auto io_result = sysCallResultToIoCallResult(result);
// Emulated edge events need to registered if the socket operation did not complete
Expand All @@ -362,6 +373,13 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
}
return io_result;
}
if ((hdr.msg_flags & MSG_TRUNC) != 0) {
ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", result.rc_);
result.rc_ = 0;
(*output.dropped_packets_)++;
output.msg_[0].truncated_and_dropped_ = true;
return sysCallResultToIoCallResult(result);
}

RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen));
Expand All @@ -386,7 +404,7 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmsg(Buffer::RawSlice* slices,
if (output.dropped_packets_ != nullptr) {
absl::optional<uint32_t> maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg);
if (maybe_dropped) {
*output.dropped_packets_ = *maybe_dropped;
*output.dropped_packets_ += *maybe_dropped;
continue;
}
}
Expand Down Expand Up @@ -439,8 +457,9 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
// Set MSG_WAITFORONE so that recvmmsg will not waiting for
// |num_packets_per_mmsg_call| packets to arrive before returning when the
// socket is a blocking socket.
const Api::SysCallIntResult result = Api::OsSysCallsSingleton::get().recvmmsg(
fd_, mmsg_hdr.data(), num_packets_per_mmsg_call, MSG_TRUNC | MSG_WAITFORONE, nullptr);
const Api::SysCallIntResult result =
Api::OsSysCallsSingleton::get().recvmmsg(fd_, mmsg_hdr.data(), num_packets_per_mmsg_call,
messageTruncatedOption() | MSG_WAITFORONE, nullptr);

if (result.rc_ <= 0) {
auto io_result = sysCallResultToIoCallResult(result);
Expand All @@ -457,18 +476,18 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
int num_packets_read = result.rc_;

for (int i = 0; i < num_packets_read; ++i) {
if (mmsg_hdr[i].msg_len == 0) {
msghdr& hdr = mmsg_hdr[i].msg_hdr;
if ((hdr.msg_flags & MSG_TRUNC) != 0) {
ENVOY_LOG_MISC(debug, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len);
(*output.dropped_packets_)++;
output.msg_[i].truncated_and_dropped_ = true;
continue;
}
msghdr& hdr = mmsg_hdr[i].msg_hdr;

RELEASE_ASSERT((hdr.msg_flags & MSG_CTRUNC) == 0,
fmt::format("Incorrectly set control message length: {}", hdr.msg_controllen));
RELEASE_ASSERT(hdr.msg_namelen > 0,
fmt::format("Unable to get remote address from recvmmsg() for fd: {}", fd_));
if ((hdr.msg_flags & MSG_TRUNC) != 0) {
ENVOY_LOG_MISC(warn, "Dropping truncated UDP packet with size: {}.", mmsg_hdr[i].msg_len);
continue;
}

output.msg_[i].msg_len_ = mmsg_hdr[i].msg_len;
// Get local and peer addresses for each packet.
Expand All @@ -494,7 +513,7 @@ Api::IoCallUint64Result IoSocketHandleImpl::recvmmsg(RawSliceArrays& slices, uin
for (cmsg = CMSG_FIRSTHDR(&hdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&hdr, cmsg)) {
absl::optional<uint32_t> maybe_dropped = maybeGetPacketsDroppedFromHeader(*cmsg);
if (maybe_dropped) {
*output.dropped_packets_ = *maybe_dropped;
*output.dropped_packets_ += *maybe_dropped;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion source/common/network/udp_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class UdpListenerImpl : public BaseListenerImpl,
public:
UdpListenerImpl(Event::DispatcherImpl& dispatcher, SocketSharedPtr socket,
UdpListenerCallbacks& cb, TimeSource& time_source);

~UdpListenerImpl() override;
uint32_t packetsDropped() { return packets_dropped_; }

// Network::Listener Interface
void disable() override;
Expand Down
12 changes: 5 additions & 7 deletions source/common/network/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,10 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
uint64_t packets_read = result.rc_;
ENVOY_LOG_MISC(trace, "recvmmsg read {} packets", packets_read);
for (uint64_t i = 0; i < packets_read; ++i) {
if (output.msg_[i].truncated_and_dropped_) {
continue;
}

Buffer::RawSlice* slice = slices[i].data();
const uint64_t msg_len = output.msg_[i].msg_len_;
ASSERT(msg_len <= slice->len_);
Expand All @@ -651,7 +655,7 @@ Api::IoCallUint64Result Utility::readFromSocket(IoHandle& handle,
Api::IoCallUint64Result result =
receiveMessage(udp_packet_processor.maxPacketSize(), buffer, output, handle, local_address);

if (!result.ok()) {
if (!result.ok() || output.msg_[0].truncated_and_dropped_) {
return result;
}

Expand All @@ -678,12 +682,6 @@ Api::IoErrorPtr Utility::readPacketsFromSocket(IoHandle& handle,
return std::move(result.err_);
}

if (result.rc_ == 0) {
// TODO(conqerAtapple): Is zero length packet interesting? If so add stats
// for it. Otherwise remove the warning log below.
ENVOY_LOG_MISC(trace, "received 0-length packet");
}

if (packets_dropped != old_packets_dropped) {
// The kernel tracks SO_RXQ_OVFL as a uint32 which can overflow to a smaller
// value. So as long as this count differs from previously recorded value,
Expand Down
64 changes: 58 additions & 6 deletions test/common/network/udp_listener_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,21 @@ namespace {
// packets are sent from a network namespace different to that of
// the client. Currently, the testing framework does not support
// this behavior.
// This helper allows to intercept the supportsUdpGro syscall and
// toggle the gro behavior as per individual test requirements.
class MockSupportsUdpGro : public Api::OsSysCallsImpl {
// This helper allows to intercept syscalls and
// toggle the behavior as per individual test requirements.
class OverrideOsSysCallsImpl : public Api::OsSysCallsImpl {
public:
MOCK_METHOD(bool, supportsUdpGro, (), (const));
MOCK_METHOD(bool, supportsMmsg, (), (const));
};

class UdpListenerImplTest : public UdpListenerImplTestBase {
public:
void SetUp() override {
ON_CALL(udp_gro_syscall_, supportsUdpGro()).WillByDefault(Return(false));
ON_CALL(override_syscall_, supportsUdpGro()).WillByDefault(Return(false));
// Return the real version by default.
ON_CALL(override_syscall_, supportsMmsg())
.WillByDefault(Return(os_calls.latched().supportsMmsg()));

// Set listening socket options.
server_socket_->addOptions(SocketOptionFactory::buildIpPacketInfoOptions());
Expand All @@ -64,8 +68,8 @@ class UdpListenerImplTest : public UdpListenerImplTestBase {
ON_CALL(listener_callbacks_, udpPacketWriter()).WillByDefault(ReturnRef(*udp_packet_writer_));
}

NiceMock<MockSupportsUdpGro> udp_gro_syscall_;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls{&udp_gro_syscall_};
NiceMock<OverrideOsSysCallsImpl> override_syscall_;
TestThreadsafeSingletonInjector<Api::OsSysCallsImpl> os_calls{&override_syscall_};
};

INSTANTIATE_TEST_SUITE_P(IpVersions, UdpListenerImplTest,
Expand Down Expand Up @@ -126,6 +130,54 @@ TEST_P(UdpListenerImplTest, UseActualDstUdp) {
dispatcher_->run(Event::Dispatcher::RunType::Block);
}

// Test a large datagram that gets dropped using recvmmsg if supported.
TEST_P(UdpListenerImplTest, LargeDatagramRecvmmsg) {
// This will get dropped.
const std::string first(4096, 'a');
client_.write(first, *send_to_addr_);
const std::string second("second");
client_.write(second, *send_to_addr_);
// This will get dropped.
const std::string third(4096, 'b');
client_.write(third, *send_to_addr_);

EXPECT_CALL(listener_callbacks_, onReadReady());
EXPECT_CALL(listener_callbacks_, onData(_)).WillOnce(Invoke([&](const UdpRecvData& data) -> void {
validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u);
EXPECT_EQ(data.buffer_->toString(), second);

dispatcher_->exit();
}));

dispatcher_->run(Event::Dispatcher::RunType::Block);
EXPECT_EQ(2, listener_->packetsDropped());
}

// Test a large datagram that gets dropped using recvmsg.
TEST_P(UdpListenerImplTest, LargeDatagramRecvmsg) {
ON_CALL(override_syscall_, supportsMmsg()).WillByDefault(Return(false));

// This will get dropped.
const std::string first(4096, 'a');
client_.write(first, *send_to_addr_);
const std::string second("second");
client_.write(second, *send_to_addr_);
// This will get dropped.
const std::string third(4096, 'b');
client_.write(third, *send_to_addr_);

EXPECT_CALL(listener_callbacks_, onReadReady());
EXPECT_CALL(listener_callbacks_, onData(_)).WillOnce(Invoke([&](const UdpRecvData& data) -> void {
validateRecvCallbackParams(data, Api::OsSysCallsSingleton::get().supportsMmsg() ? 16u : 1u);
EXPECT_EQ(data.buffer_->toString(), second);

dispatcher_->exit();
}));

dispatcher_->run(Event::Dispatcher::RunType::Block);
EXPECT_EQ(2, listener_->packetsDropped());
}

/**
* Tests UDP listener for read and write callbacks with actual data.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,19 @@ TEST_F(QuicIoHandleWrapperTest, DelegateIoHandleCalls) {
addr = wrapper_->peerAddress();

Network::IoHandle::RecvMsgOutput output(1, nullptr);
EXPECT_CALL(os_sys_calls_, recvmsg(fd, _, 0)).WillOnce(Invoke([](os_fd_t, msghdr* msg, int) {
sockaddr_storage ss;
auto ipv6_addr = reinterpret_cast<sockaddr_in6*>(&ss);
memset(ipv6_addr, 0, sizeof(sockaddr_in6));
ipv6_addr->sin6_family = AF_INET6;
ipv6_addr->sin6_addr = in6addr_loopback;
ipv6_addr->sin6_port = htons(54321);
*reinterpret_cast<sockaddr_in6*>(msg->msg_name) = *ipv6_addr;
msg->msg_namelen = sizeof(sockaddr_in6);
msg->msg_controllen = 0;
return Api::SysCallSizeResult{5u, 0};
}));
EXPECT_CALL(os_sys_calls_, recvmsg(fd, _, MSG_TRUNC))
.WillOnce(Invoke([](os_fd_t, msghdr* msg, int) {
sockaddr_storage ss;
auto ipv6_addr = reinterpret_cast<sockaddr_in6*>(&ss);
memset(ipv6_addr, 0, sizeof(sockaddr_in6));
ipv6_addr->sin6_family = AF_INET6;
ipv6_addr->sin6_addr = in6addr_loopback;
ipv6_addr->sin6_port = htons(54321);
*reinterpret_cast<sockaddr_in6*>(msg->msg_name) = *ipv6_addr;
msg->msg_namelen = sizeof(sockaddr_in6);
msg->msg_controllen = 0;
return Api::SysCallSizeResult{5u, 0};
}));
wrapper_->recvmsg(&slice, 1, /*self_port=*/12345, output);

size_t num_packet_per_call = 1u;
Expand All @@ -97,7 +98,7 @@ TEST_F(QuicIoHandleWrapperTest, DelegateIoHandleCalls) {
absl::FixedArray<Buffer::RawSlice>({Buffer::RawSlice{data, 5}}));
EXPECT_CALL(os_sys_calls_, recvmmsg(fd, _, num_packet_per_call, _, nullptr))
.WillOnce(Invoke([](os_fd_t, struct mmsghdr*, unsigned int, int, struct timespec*) {
return Api::SysCallIntResult{1u, 0};
return Api::SysCallIntResult{-1, SOCKET_ERROR_AGAIN};
}));
wrapper_->recvmmsg(slices, /*self_port=*/12345, output2);

Expand Down
1 change: 1 addition & 0 deletions test/test_common/threadsafe_singleton_injector.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ template <class T> class TestThreadsafeSingletonInjector {
ThreadSafeSingleton<T>::instance_ = instance;
}
~TestThreadsafeSingletonInjector() { ThreadSafeSingleton<T>::instance_ = latched_instance_; }
T& latched() { return *latched_instance_; }

private:
T* latched_instance_;
Expand Down

0 comments on commit 1ed6ddf

Please sign in to comment.