From fa5a2894be40e2a59ff39c52cdc271f9c4820e41 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Thu, 4 Nov 2021 09:24:54 -0700 Subject: [PATCH 01/35] Windows: Emulate MSG_PEEK by read Commit Message: The way we implement i/o events on windows poses problems with peeking messages. The reason for that is that peeking does not drain the buffer which is an implicit requirement. This can cause slow clients to make envoy connections to hang. We solve this issue by implementing peek by draining the socket and storing the content of the socket to a buffer. Additional Description: See #17395 Risk Level: Medium Testing: Automated Docs Changes: Pending Release Notes: Pending Platform Specific Features: Windows only Co-authored-by: He Jie Xu Signed-off-by: Sotiris Nanopoulos --- source/common/network/BUILD | 1 + .../network/win32_socket_handle_impl.cc | 125 ++++++++++++++++-- .../common/network/win32_socket_handle_impl.h | 20 ++- .../proxy_protocol/proxy_protocol_test.cc | 87 ++++++++---- 4 files changed, 198 insertions(+), 35 deletions(-) diff --git a/source/common/network/BUILD b/source/common/network/BUILD index 98af3afd269a..1d724fd3a332 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -196,6 +196,7 @@ envoy_cc_library( ":io_socket_error_lib", ":socket_interface_lib", ":socket_lib", + "//source/common/buffer:buffer_lib", "//envoy/event:dispatcher_interface", "//envoy/network:io_handle_interface", "//source/common/api:os_sys_calls_lib", diff --git a/source/common/network/win32_socket_handle_impl.cc b/source/common/network/win32_socket_handle_impl.cc index c74f5144691b..35343fcf7586 100644 --- a/source/common/network/win32_socket_handle_impl.cc +++ b/source/common/network/win32_socket_handle_impl.cc @@ -18,16 +18,24 @@ namespace Network { Api::IoCallUint64Result Win32SocketHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices, uint64_t num_slice) { - auto result = IoSocketHandleImpl::readv(max_length, slices, num_slice); - reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); - return result; + if (peek_buffer_->length() == 0) { + auto result = IoSocketHandleImpl::readv(max_length, slices, num_slice); + reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); + return result; + } + + return readvFromPeekBuffer(max_length, slices, num_slice); } Api::IoCallUint64Result Win32SocketHandleImpl::read(Buffer::Instance& buffer, absl::optional max_length_opt) { - auto result = IoSocketHandleImpl::read(buffer, max_length_opt); - reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); - return result; + if (peek_buffer_->length() == 0) { + auto result = IoSocketHandleImpl::read(buffer, max_length_opt); + reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); + return result; + } + + return readFromPeekBuffer(buffer, max_length_opt.value_or(UINT64_MAX)); } Api::IoCallUint64Result Win32SocketHandleImpl::writev(const Buffer::RawSlice* slices, @@ -71,10 +79,38 @@ Api::IoCallUint64Result Win32SocketHandleImpl::recvmmsg(RawSliceArrays& slices, } Api::IoCallUint64Result Win32SocketHandleImpl::recv(void* buffer, size_t length, int flags) { - - Api::IoCallUint64Result result = IoSocketHandleImpl::recv(buffer, length, flags); - reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); - return result; + if (flags & MSG_PEEK) { + // can a remote OOM us now that we are not protected by readDisable? + Api::IoCallUint64Result peek_result = drainToPeekBuffer(); + + // Some fatal error happened + if (!peek_result.wouldBlock()) { + return peek_result; + } + + // No data available, register read again. + if (peek_result.wouldBlock() && peek_buffer_->length() == 0) { + file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + return peek_result; + } + + Api::IoCallUint64Result result = peekFromPeekBuffer(buffer, length); + if (peek_buffer_->length() < length) { + file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + } else { + // This means that our peak buffer has more data than what the user + // wanted. Return the slice to the caller. + // How can the caller (v2 proxy protocol inspector) reactivate the events again here? + } + return result; + } + if (peek_buffer_->length() == 0) { + Api::IoCallUint64Result result = IoSocketHandleImpl::recv(buffer, length, flags); + reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); + return result; + } else { + return readFromPeekBuffer(buffer, length); + } } void Win32SocketHandleImpl::reEnableEventBasedOnIOResult(const Api::IoCallUint64Result& result, @@ -84,5 +120,74 @@ void Win32SocketHandleImpl::reEnableEventBasedOnIOResult(const Api::IoCallUint64 } } +Api::IoCallUint64Result Win32SocketHandleImpl::drainToPeekBuffer() { + while (true) { + Buffer::OwnedImpl read_buffer; + Buffer::Reservation reservation = read_buffer.reserveForRead(); + Api::IoCallUint64Result result = IoSocketHandleImpl::readv( + reservation.length(), reservation.slices(), reservation.numSlices()); + uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0; + reservation.commit(bytes_to_commit); + peek_buffer_->add(read_buffer); + if (!result.ok() || bytes_to_commit == 0) { + return result; + } + } +} + +Api::IoCallUint64Result Win32SocketHandleImpl::readFromPeekBuffer(void* buffer, size_t length) { + uint64_t copy_size = std::min(peek_buffer_->length(), static_cast(length)); + peek_buffer_->copyOut(0, copy_size, buffer); + peek_buffer_->drain(copy_size); + return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); +} + +Api::IoCallUint64Result Win32SocketHandleImpl::readvFromPeekBuffer(uint64_t max_length, + Buffer::RawSlice* slices, + uint64_t num_slice) { + uint64_t total_length_to_read = std::min(max_length, peek_buffer_->length()); + uint64_t num_slices_to_read = 0; + uint64_t num_bytes_to_read = 0; + for (; num_slices_to_read < num_slice && num_bytes_to_read < total_length_to_read; + num_slices_to_read++) { + auto length_to_copy = std::min(static_cast(slices[num_slices_to_read].len_), + total_length_to_read - num_bytes_to_read); + peek_buffer_->copyOut(num_bytes_to_read, length_to_copy, slices[num_slices_to_read].mem_); + num_bytes_to_read += length_to_copy; + } + peek_buffer_->drain(num_bytes_to_read); + return Api::IoCallUint64Result(num_bytes_to_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); +} + +Api::IoCallUint64Result Win32SocketHandleImpl::readFromPeekBuffer(Buffer::Instance& buffer, + size_t length) { + auto lenght_to_move = std::min(peek_buffer_->length(), static_cast(length)); + buffer.move(*peek_buffer_, lenght_to_move); + peek_buffer_->drain(lenght_to_move); + return Api::IoCallUint64Result(lenght_to_move, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); +} + +Api::IoCallUint64Result Win32SocketHandleImpl::peekFromPeekBuffer(void* buffer, size_t length) { + uint64_t copy_size = std::min(peek_buffer_->length(), static_cast(length)); + peek_buffer_->copyOut(0, copy_size, buffer); + return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); +} + +void Win32SocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, + Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) { + IoSocketHandleImpl::initializeFileEvent(dispatcher, cb, trigger, events); + if ((events & Event::FileReadyType::Read) && peek_buffer_->length() > 0) { + activateFileEvents(Event::FileReadyType::Read); + } +} + +void Win32SocketHandleImpl::enableFileEvents(uint32_t events) { + IoSocketHandleImpl::enableFileEvents(events); + if ((events & Event::FileReadyType::Read) && peek_buffer_->length() > 0) { + activateFileEvents(Event::FileReadyType::Read); + } +} + } // namespace Network } // namespace Envoy diff --git a/source/common/network/win32_socket_handle_impl.h b/source/common/network/win32_socket_handle_impl.h index b9465f71db54..c69fadf9b7c4 100644 --- a/source/common/network/win32_socket_handle_impl.h +++ b/source/common/network/win32_socket_handle_impl.h @@ -6,6 +6,7 @@ #include "envoy/event/dispatcher.h" #include "envoy/network/io_handle.h" +#include "source/common/buffer/buffer_impl.h" #include "source/common/common/logger.h" #include "source/common/network/io_socket_error_impl.h" #include "source/common/network/io_socket_handle_impl.h" @@ -20,7 +21,8 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl { public: explicit Win32SocketHandleImpl(os_fd_t fd = INVALID_SOCKET, bool socket_v6only = false, absl::optional domain = absl::nullopt) - : IoSocketHandleImpl(fd, socket_v6only, domain) {} + : peek_buffer_(std::make_unique()), + IoSocketHandleImpl(fd, socket_v6only, domain) {} Api::IoCallUint64Result readv(uint64_t max_length, Buffer::RawSlice* slices, uint64_t num_slice) override; @@ -42,8 +44,24 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl { RecvMsgOutput& output) override; Api::IoCallUint64Result recv(void* buffer, size_t length, int flags) override; + void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) override; + void enableFileEvents(uint32_t events) override; private: void reEnableEventBasedOnIOResult(const Api::IoCallUint64Result& result, uint32_t event); + + // For windows mimic MSG_PEEK + std::unique_ptr peek_buffer_; + + Api::IoCallUint64Result drainToPeekBuffer(); + Api::IoCallUint64Result readFromPeekBuffer(void* buffer, size_t length); + Api::IoCallUint64Result readFromPeekBuffer(Buffer::Instance& buffer, size_t length); + Api::IoCallUint64Result readvFromPeekBuffer(uint64_t max_length, Buffer::RawSlice* slices, + uint64_t num_slice); + Api::IoCallUint64Result peekFromPeekBuffer(void* buffer, size_t length); + + Api::IoCallUint64Result readvImpl(uint64_t max_length, Buffer::RawSlice* slices, + uint64_t num_slice); }; } // namespace Network } // namespace Envoy diff --git a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc index 6a7b0634a8bb..272a7077998f 100644 --- a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc +++ b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc @@ -312,25 +312,31 @@ TEST_P(ProxyProtocolTest, ErrorRecv_2) { Api::MockOsSysCalls os_sys_calls; TestThreadsafeSingletonInjector os_calls(&os_sys_calls); - // TODO(davinci26): Mocking should not be used to provide real system calls. +// TODO(davinci26): Mocking should not be used to provide real system calls. +#ifdef WIN32 + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(AnyNumber()) + .WillRepeatedly(Return(Api::SysCallSizeResult{-1, 0})); +#else + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(AnyNumber()) + .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { + return os_sys_calls_actual_.readv(fd, iov, iovcnt); + })); + EXPECT_CALL(os_sys_calls, recv(_, _, _, _)) + .Times(AnyNumber()) + .WillRepeatedly(Return(Api::SysCallSizeResult{-1, 0})); +#endif EXPECT_CALL(os_sys_calls, connect(_, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke([this](os_fd_t sockfd, const sockaddr* addr, socklen_t addrlen) { return os_sys_calls_actual_.connect(sockfd, addr, addrlen); })); - EXPECT_CALL(os_sys_calls, recv(_, _, _, _)) - .Times(AnyNumber()) - .WillOnce(Return(Api::SysCallSizeResult{-1, 0})); EXPECT_CALL(os_sys_calls, writev(_, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { return os_sys_calls_actual_.writev(fd, iov, iovcnt); })); - EXPECT_CALL(os_sys_calls, readv(_, _, _)) - .Times(AnyNumber()) - .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { - return os_sys_calls_actual_.readv(fd, iov, iovcnt); - })); EXPECT_CALL(os_sys_calls, getsockopt_(_, _, _, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke( @@ -374,9 +380,20 @@ TEST_P(ProxyProtocolTest, ErrorRecv_1) { TestThreadsafeSingletonInjector os_calls(&os_sys_calls); // TODO(davinci26): Mocking should not be used to provide real system calls. +#ifdef WIN32 + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(AnyNumber()) + .WillRepeatedly(Return(Api::SysCallSizeResult{-1, 0})); +#else + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(AnyNumber()) + .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { + return os_sys_calls_actual_.readv(fd, iov, iovcnt); + })); EXPECT_CALL(os_sys_calls, recv(_, _, _, _)) .Times(AnyNumber()) .WillRepeatedly(Return(Api::SysCallSizeResult{-1, 0})); +#endif EXPECT_CALL(os_sys_calls, connect(_, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke([this](os_fd_t sockfd, const sockaddr* addr, socklen_t addrlen) { @@ -387,11 +404,6 @@ TEST_P(ProxyProtocolTest, ErrorRecv_1) { .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { return os_sys_calls_actual_.writev(fd, iov, iovcnt); })); - EXPECT_CALL(os_sys_calls, readv(_, _, _)) - .Times(AnyNumber()) - .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { - return os_sys_calls_actual_.readv(fd, iov, iovcnt); - })); EXPECT_CALL(os_sys_calls, getsockopt_(_, _, _, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke( @@ -782,6 +794,15 @@ TEST_P(ProxyProtocolTest, V2Fragmented4Error) { TestThreadsafeSingletonInjector os_calls(&os_sys_calls); // TODO(davinci26): Mocking should not be used to provide real system calls. +#ifdef WIN32 + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(AnyNumber()) + .WillOnce(Invoke([&](os_fd_t fd, const iovec* iov, int num_iov) { + const Api::SysCallSizeResult x = os_sys_calls_actual_.readv(fd, iov, num_iov); + return x; + })) + .WillRepeatedly(Return(Api::SysCallSizeResult{-1, 0})); +#else EXPECT_CALL(os_sys_calls, recv(_, _, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke([this](os_fd_t fd, void* buf, size_t len, int flags) { @@ -790,6 +811,13 @@ TEST_P(ProxyProtocolTest, V2Fragmented4Error) { EXPECT_CALL(os_sys_calls, recv(_, _, 1, _)) .Times(AnyNumber()) .WillOnce(Return(Api::SysCallSizeResult{-1, 0})); + + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(AnyNumber()) + .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { + return os_sys_calls_actual_.readv(fd, iov, iovcnt); + })); +#endif EXPECT_CALL(os_sys_calls, connect(_, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke([this](os_fd_t sockfd, const sockaddr* addr, socklen_t addrlen) { @@ -800,11 +828,6 @@ TEST_P(ProxyProtocolTest, V2Fragmented4Error) { .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { return os_sys_calls_actual_.writev(fd, iov, iovcnt); })); - EXPECT_CALL(os_sys_calls, readv(_, _, _)) - .Times(AnyNumber()) - .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { - return os_sys_calls_actual_.readv(fd, iov, iovcnt); - })); EXPECT_CALL(os_sys_calls, getsockopt_(_, _, _, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke( @@ -849,6 +872,20 @@ TEST_P(ProxyProtocolTest, V2Fragmented5Error) { TestThreadsafeSingletonInjector os_calls(&os_sys_calls); // TODO(davinci26): Mocking should not be used to provide real system calls. +#ifdef WIN32 + bool partial_writed = false; + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(AnyNumber()) + .WillRepeatedly(Invoke([&](os_fd_t fd, const iovec* iov, int num_iov) { + if (partial_writed) { + ENVOY_LOG_MISC(debug, "inject failure"); + return Api::SysCallSizeResult{-1, 0}; + } + ENVOY_LOG_MISC(debug, "wire"); + const Api::SysCallSizeResult x = os_sys_calls_actual_.readv(fd, iov, num_iov); + return x; + })); +#else EXPECT_CALL(os_sys_calls, recv(_, _, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke([this](os_fd_t fd, void* buf, size_t len, int flags) { @@ -857,6 +894,12 @@ TEST_P(ProxyProtocolTest, V2Fragmented5Error) { EXPECT_CALL(os_sys_calls, recv(_, _, 4, _)) .Times(AnyNumber()) .WillOnce(Return(Api::SysCallSizeResult{-1, 0})); + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(AnyNumber()) + .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { + return os_sys_calls_actual_.readv(fd, iov, iovcnt); + })); +#endif EXPECT_CALL(os_sys_calls, connect(_, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke([this](os_fd_t sockfd, const sockaddr* addr, socklen_t addrlen) { @@ -867,11 +910,6 @@ TEST_P(ProxyProtocolTest, V2Fragmented5Error) { .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { return os_sys_calls_actual_.writev(fd, iov, iovcnt); })); - EXPECT_CALL(os_sys_calls, readv(_, _, _)) - .Times(AnyNumber()) - .WillRepeatedly(Invoke([this](os_fd_t fd, const iovec* iov, int iovcnt) { - return os_sys_calls_actual_.readv(fd, iov, iovcnt); - })); EXPECT_CALL(os_sys_calls, getsockopt_(_, _, _, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke( @@ -901,6 +939,7 @@ TEST_P(ProxyProtocolTest, V2Fragmented5Error) { connect(false); write(buffer, 10); dispatcher_->run(Event::Dispatcher::RunType::NonBlock); + partial_writed = true; write(buffer + 10, 10); expectProxyProtoError(); From c2b5b44799c703dffe68adee9299658a114b49a7 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Thu, 4 Nov 2021 10:09:30 -0700 Subject: [PATCH 02/35] fix format Signed-off-by: Sotiris Nanopoulos --- source/common/network/BUILD | 2 +- source/common/network/win32_socket_handle_impl.h | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/source/common/network/BUILD b/source/common/network/BUILD index 1d724fd3a332..910e90e2b1da 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -196,11 +196,11 @@ envoy_cc_library( ":io_socket_error_lib", ":socket_interface_lib", ":socket_lib", - "//source/common/buffer:buffer_lib", "//envoy/event:dispatcher_interface", "//envoy/network:io_handle_interface", "//source/common/api:os_sys_calls_lib", "//source/common/event:dispatcher_includes", + "//source/common/buffer:buffer_lib", "@envoy_api//envoy/extensions/network/socket_interface/v3:pkg_cc_proto", ], ) diff --git a/source/common/network/win32_socket_handle_impl.h b/source/common/network/win32_socket_handle_impl.h index c69fadf9b7c4..74348128d6ee 100644 --- a/source/common/network/win32_socket_handle_impl.h +++ b/source/common/network/win32_socket_handle_impl.h @@ -47,6 +47,7 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl { void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, Event::FileTriggerType trigger, uint32_t events) override; void enableFileEvents(uint32_t events) override; + private: void reEnableEventBasedOnIOResult(const Api::IoCallUint64Result& result, uint32_t event); From fd850099b6bf7ebc84d2ec3a9841f64017ab6d54 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Thu, 4 Nov 2021 10:32:08 -0700 Subject: [PATCH 03/35] fix compiler warning Signed-off-by: Sotiris Nanopoulos --- source/common/network/BUILD | 2 +- source/common/network/win32_socket_handle_impl.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/network/BUILD b/source/common/network/BUILD index 910e90e2b1da..22b5e04deae6 100644 --- a/source/common/network/BUILD +++ b/source/common/network/BUILD @@ -199,8 +199,8 @@ envoy_cc_library( "//envoy/event:dispatcher_interface", "//envoy/network:io_handle_interface", "//source/common/api:os_sys_calls_lib", - "//source/common/event:dispatcher_includes", "//source/common/buffer:buffer_lib", + "//source/common/event:dispatcher_includes", "@envoy_api//envoy/extensions/network/socket_interface/v3:pkg_cc_proto", ], ) diff --git a/source/common/network/win32_socket_handle_impl.h b/source/common/network/win32_socket_handle_impl.h index 74348128d6ee..8d0d839b5e13 100644 --- a/source/common/network/win32_socket_handle_impl.h +++ b/source/common/network/win32_socket_handle_impl.h @@ -21,8 +21,8 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl { public: explicit Win32SocketHandleImpl(os_fd_t fd = INVALID_SOCKET, bool socket_v6only = false, absl::optional domain = absl::nullopt) - : peek_buffer_(std::make_unique()), - IoSocketHandleImpl(fd, socket_v6only, domain) {} + : IoSocketHandleImpl(fd, socket_v6only, domain), + peek_buffer_(std::make_unique()) {} Api::IoCallUint64Result readv(uint64_t max_length, Buffer::RawSlice* slices, uint64_t num_slice) override; From 8972610074521f8a2b20610454d70fee33caa5f9 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Thu, 4 Nov 2021 12:41:56 -0700 Subject: [PATCH 04/35] fix compilation issue Signed-off-by: Sotiris Nanopoulos --- .../filters/listener/proxy_protocol/proxy_protocol_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc index 272a7077998f..f864b0a7eb2d 100644 --- a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc +++ b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc @@ -872,8 +872,8 @@ TEST_P(ProxyProtocolTest, V2Fragmented5Error) { TestThreadsafeSingletonInjector os_calls(&os_sys_calls); // TODO(davinci26): Mocking should not be used to provide real system calls. -#ifdef WIN32 bool partial_writed = false; +#ifdef WIN32 EXPECT_CALL(os_sys_calls, readv(_, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke([&](os_fd_t fd, const iovec* iov, int num_iov) { From 4aeaeae3dce22fba375d828098f281b49c845b85 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 5 Nov 2021 09:09:41 -0700 Subject: [PATCH 05/35] more coverage is needed but still testing ci Signed-off-by: Sotiris Nanopoulos --- .../network/win32_socket_handle_impl.cc | 7 +- test/common/network/BUILD | 13 +++ .../network/win32_socket_handle_impl_test.cc | 102 ++++++++++++++++++ .../proxy_protocol/proxy_protocol_test.cc | 4 +- 4 files changed, 122 insertions(+), 4 deletions(-) create mode 100644 test/common/network/win32_socket_handle_impl_test.cc diff --git a/source/common/network/win32_socket_handle_impl.cc b/source/common/network/win32_socket_handle_impl.cc index 35343fcf7586..a2a286d0a4c4 100644 --- a/source/common/network/win32_socket_handle_impl.cc +++ b/source/common/network/win32_socket_handle_impl.cc @@ -82,7 +82,6 @@ Api::IoCallUint64Result Win32SocketHandleImpl::recv(void* buffer, size_t length, if (flags & MSG_PEEK) { // can a remote OOM us now that we are not protected by readDisable? Api::IoCallUint64Result peek_result = drainToPeekBuffer(); - // Some fatal error happened if (!peek_result.wouldBlock()) { return peek_result; @@ -90,12 +89,14 @@ Api::IoCallUint64Result Win32SocketHandleImpl::recv(void* buffer, size_t length, // No data available, register read again. if (peek_result.wouldBlock() && peek_buffer_->length() == 0) { - file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + if (file_event_) { + file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + } return peek_result; } Api::IoCallUint64Result result = peekFromPeekBuffer(buffer, length); - if (peek_buffer_->length() < length) { + if (peek_buffer_->length() < length && file_event_) { file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); } else { // This means that our peak buffer has more data than what the user diff --git a/test/common/network/BUILD b/test/common/network/BUILD index 228068e269aa..a0b1e7e33ed1 100644 --- a/test/common/network/BUILD +++ b/test/common/network/BUILD @@ -406,6 +406,19 @@ envoy_cc_test( ], ) +envoy_cc_test( + name = "win32_socket_handle_impl_test", + srcs = ["win32_socket_handle_impl_test.cc"], + deps = [ + "//source/common/buffer:buffer_lib", + "//source/common/common:utility_lib", + "//source/common/network:address_lib", + "//test/mocks/api:api_mocks", + "//test/mocks/event:event_mocks", + "//test/test_common:threadsafe_singleton_injector_lib", + ], +) + envoy_cc_test( name = "io_socket_handle_impl_integration_test", srcs = ["io_socket_handle_impl_integration_test.cc"], diff --git a/test/common/network/win32_socket_handle_impl_test.cc b/test/common/network/win32_socket_handle_impl_test.cc new file mode 100644 index 000000000000..a3b868620c0a --- /dev/null +++ b/test/common/network/win32_socket_handle_impl_test.cc @@ -0,0 +1,102 @@ +#include "source/common/common/utility.h" +#include "source/common/network/address_impl.h" +#include "source/common/network/io_socket_error_impl.h" +#include "source/common/network/io_socket_handle_impl.h" +#include "source/common/network/listen_socket_impl.h" + +#include "test/mocks/api/mocks.h" +#include "test/mocks/event/mocks.h" +#include "test/test_common/environment.h" +#include "test/test_common/network_utility.h" +#include "test/test_common/threadsafe_singleton_injector.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::_; +using testing::DoAll; +using testing::Eq; +using testing::Invoke; +using testing::NiceMock; +using testing::Return; + +namespace Envoy { +namespace Network { + +class Win32SocketHandleImplTest : public testing::Test { +public: + Win32SocketHandleImplTest() : io_handle_(42) { + dispatcher_ = std::make_unique>(); + file_event_ = new NiceMock; + EXPECT_CALL(*dispatcher_, createFileEvent_(42, _, _, _)).WillOnce(Return(file_event_)); + io_handle_.setBlocking(false); + io_handle_.initializeFileEvent( + *dispatcher_, [](uint32_t) -> void {}, Event::PlatformDefaultTriggerType, + Event::FileReadyType::Read | Event::FileReadyType::Closed); + } + +protected: + std::unique_ptr> dispatcher_; + NiceMock* file_event_; + Network::Win32SocketHandleImpl io_handle_; +}; + +TEST_F(Win32SocketHandleImplTest, ReadvWithNoBufferShouldReadFromTheWire) { + + Api::MockOsSysCalls os_sys_calls; + TestThreadsafeSingletonInjector os_calls(&os_sys_calls); + + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(1) + .WillRepeatedly(Return(Api::SysCallSizeResult{10, 0})); + + Buffer::OwnedImpl read_buffer; + Buffer::Reservation reservation = read_buffer.reserveForRead(); + auto rc = io_handle_.readv(reservation.length(), reservation.slices(), reservation.numSlices()); + EXPECT_EQ(rc.return_value_, 10); +} + +TEST_F(Win32SocketHandleImplTest, ReadvShouldReenableEventsOnBlock) { + Api::MockOsSysCalls os_sys_calls; + TestThreadsafeSingletonInjector os_calls(&os_sys_calls); + + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(1) + .WillRepeatedly(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_AGAIN})); + + EXPECT_CALL(*file_event_, registerEventIfEmulatedEdge(_)); + Buffer::OwnedImpl read_buffer; + Buffer::Reservation reservation = read_buffer.reserveForRead(); + auto rc = io_handle_.readv(reservation.length(), reservation.slices(), reservation.numSlices()); + EXPECT_EQ(rc.return_value_, 0); + EXPECT_EQ(rc.err_->getErrorCode(), IoSocketError::getIoSocketEagainInstance()->getErrorCode()); +} + +TEST_F(Win32SocketHandleImplTest, ReadvWithBufferShouldReadFromBuffer) { + Api::MockOsSysCalls os_sys_calls; + TestThreadsafeSingletonInjector os_calls(&os_sys_calls); + constexpr int data_length = 10; + std::string data(data_length, '*'); + absl::FixedArray peek_iov(1); + peek_iov[0].iov_base = static_cast(data.data()); + peek_iov[0].iov_len = data.length(); + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(2) + .WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iov) { + iov = peek_iov.begin(); + num_iov = 1; + return Api::SysCallSizeResult{data_length, 0}; + })) + .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_AGAIN})); + + absl::FixedArray buf(data_length); + auto rc = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); + EXPECT_EQ(rc.return_value_, data_length); + Buffer::OwnedImpl read_buffer; + Buffer::Reservation reservation = read_buffer.reserveForRead(); + rc = io_handle_.readv(reservation.length(), reservation.slices(), reservation.numSlices()); + EXPECT_EQ(rc.return_value_, 10); +} + +} // namespace Network +} // namespace Envoy diff --git a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc index f864b0a7eb2d..8fad7b727932 100644 --- a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc +++ b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc @@ -872,8 +872,8 @@ TEST_P(ProxyProtocolTest, V2Fragmented5Error) { TestThreadsafeSingletonInjector os_calls(&os_sys_calls); // TODO(davinci26): Mocking should not be used to provide real system calls. - bool partial_writed = false; #ifdef WIN32 + bool partial_writed = false; EXPECT_CALL(os_sys_calls, readv(_, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke([&](os_fd_t fd, const iovec* iov, int num_iov) { @@ -939,7 +939,9 @@ TEST_P(ProxyProtocolTest, V2Fragmented5Error) { connect(false); write(buffer, 10); dispatcher_->run(Event::Dispatcher::RunType::NonBlock); +#ifdef WIN32 partial_writed = true; +#endif write(buffer + 10, 10); expectProxyProtoError(); From 15fc82022b04a7985dd6e427d8ffa47489bc519b Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 5 Nov 2021 13:09:59 -0700 Subject: [PATCH 06/35] fix format and increase coverage Signed-off-by: Sotiris Nanopoulos --- .../network/win32_socket_handle_impl_test.cc | 56 ++++++++++++++++++- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/test/common/network/win32_socket_handle_impl_test.cc b/test/common/network/win32_socket_handle_impl_test.cc index a3b868620c0a..a7bc632d6cd3 100644 --- a/test/common/network/win32_socket_handle_impl_test.cc +++ b/test/common/network/win32_socket_handle_impl_test.cc @@ -14,8 +14,6 @@ #include "gtest/gtest.h" using testing::_; -using testing::DoAll; -using testing::Eq; using testing::Invoke; using testing::NiceMock; using testing::Return; @@ -69,7 +67,7 @@ TEST_F(Win32SocketHandleImplTest, ReadvShouldReenableEventsOnBlock) { Buffer::Reservation reservation = read_buffer.reserveForRead(); auto rc = io_handle_.readv(reservation.length(), reservation.slices(), reservation.numSlices()); EXPECT_EQ(rc.return_value_, 0); - EXPECT_EQ(rc.err_->getErrorCode(), IoSocketError::getIoSocketEagainInstance()->getErrorCode()); + EXPECT_EQ(rc.err_->getErrorCode(), Api::IoError::IoErrorCode::Again); } TEST_F(Win32SocketHandleImplTest, ReadvWithBufferShouldReadFromBuffer) { @@ -98,5 +96,57 @@ TEST_F(Win32SocketHandleImplTest, ReadvWithBufferShouldReadFromBuffer) { EXPECT_EQ(rc.return_value_, 10); } +TEST_F(Win32SocketHandleImplTest, RecvWithoutPeekShouldReadFromWire) { + Api::MockOsSysCalls os_sys_calls; + TestThreadsafeSingletonInjector os_calls(&os_sys_calls); + + EXPECT_CALL(os_sys_calls, recv(_, _, _, _)) + .Times(1) + .WillRepeatedly(Return(Api::SysCallSizeResult{10, 0})); + + absl::FixedArray buf(10); + auto rc = io_handle_.recv(buf.data(), buf.size(), 0); + EXPECT_EQ(rc.return_value_, 10); +} + +TEST_F(Win32SocketHandleImplTest, RecvWithPeekReactivatesReadOnBlock) { + Api::MockOsSysCalls os_sys_calls; + TestThreadsafeSingletonInjector os_calls(&os_sys_calls); + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(1) + .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_AGAIN}));; + + EXPECT_CALL(*file_event_, registerEventIfEmulatedEdge(_)); + absl::FixedArray buf(10); + auto rc = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); + EXPECT_EQ(rc.err_->getErrorCode(), Api::IoError::IoErrorCode::Again); +} + +TEST_F(Win32SocketHandleImplTest, RecvWithPeekFlagReturnsFinalError) { + Api::MockOsSysCalls os_sys_calls; + TestThreadsafeSingletonInjector os_calls(&os_sys_calls); + constexpr int data_length = 10; + std::string data(data_length, '*'); + absl::FixedArray peek_iov(1); + peek_iov[0].iov_base = static_cast(data.data()); + peek_iov[0].iov_len = data.length(); + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .Times(2) + .WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iov) { + // Gcc treats the variables as unused and this causes + // a compilation failure. + UNREFERENCED_PARAMETER(iov); + UNREFERENCED_PARAMETER(num_iov); + iov = peek_iov.begin(); + num_iov = 1; + return Api::SysCallSizeResult{data_length, 0}; + })) + .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_CONNRESET})); + + absl::FixedArray buf(data_length); + auto rc = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); + EXPECT_EQ(rc.err_->getErrorCode(), Api::IoError::IoErrorCode::ConnectionReset); +} + } // namespace Network } // namespace Envoy From 4e608159cde9283d52dbbf687dfd4004b3f1a47a Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 5 Nov 2021 13:15:25 -0700 Subject: [PATCH 07/35] fix format Signed-off-by: Sotiris Nanopoulos --- test/common/network/win32_socket_handle_impl_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/common/network/win32_socket_handle_impl_test.cc b/test/common/network/win32_socket_handle_impl_test.cc index a7bc632d6cd3..d35c9426043d 100644 --- a/test/common/network/win32_socket_handle_impl_test.cc +++ b/test/common/network/win32_socket_handle_impl_test.cc @@ -114,7 +114,8 @@ TEST_F(Win32SocketHandleImplTest, RecvWithPeekReactivatesReadOnBlock) { TestThreadsafeSingletonInjector os_calls(&os_sys_calls); EXPECT_CALL(os_sys_calls, readv(_, _, _)) .Times(1) - .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_AGAIN}));; + .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_AGAIN})); + ; EXPECT_CALL(*file_event_, registerEventIfEmulatedEdge(_)); absl::FixedArray buf(10); From fe11b35dd62669a8fdf15d61e4f66b72c8a8546f Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 5 Nov 2021 14:49:57 -0700 Subject: [PATCH 08/35] fix gcc Signed-off-by: Sotiris Nanopoulos --- test/common/network/win32_socket_handle_impl_test.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/common/network/win32_socket_handle_impl_test.cc b/test/common/network/win32_socket_handle_impl_test.cc index d35c9426043d..5577993153ee 100644 --- a/test/common/network/win32_socket_handle_impl_test.cc +++ b/test/common/network/win32_socket_handle_impl_test.cc @@ -81,6 +81,10 @@ TEST_F(Win32SocketHandleImplTest, ReadvWithBufferShouldReadFromBuffer) { EXPECT_CALL(os_sys_calls, readv(_, _, _)) .Times(2) .WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iov) { + // Gcc treats the variables as unused and this causes + // a compilation failure. + UNREFERENCED_PARAMETER(iov); + UNREFERENCED_PARAMETER(num_iov); iov = peek_iov.begin(); num_iov = 1; return Api::SysCallSizeResult{data_length, 0}; From 1534dcd520695bcb1e871799461681d9d73f0f22 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Mon, 8 Nov 2021 11:29:05 -0800 Subject: [PATCH 09/35] address PR feedack v1 Signed-off-by: Sotiris Nanopoulos --- source/common/network/win32_socket_handle_impl.cc | 5 +---- source/common/network/win32_socket_handle_impl.h | 3 --- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/source/common/network/win32_socket_handle_impl.cc b/source/common/network/win32_socket_handle_impl.cc index a2a286d0a4c4..c00f902a29d1 100644 --- a/source/common/network/win32_socket_handle_impl.cc +++ b/source/common/network/win32_socket_handle_impl.cc @@ -123,13 +123,11 @@ void Win32SocketHandleImpl::reEnableEventBasedOnIOResult(const Api::IoCallUint64 Api::IoCallUint64Result Win32SocketHandleImpl::drainToPeekBuffer() { while (true) { - Buffer::OwnedImpl read_buffer; - Buffer::Reservation reservation = read_buffer.reserveForRead(); + Buffer::Reservation reservation = peek_buffer_->reserveForRead(); Api::IoCallUint64Result result = IoSocketHandleImpl::readv( reservation.length(), reservation.slices(), reservation.numSlices()); uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0; reservation.commit(bytes_to_commit); - peek_buffer_->add(read_buffer); if (!result.ok() || bytes_to_commit == 0) { return result; } @@ -164,7 +162,6 @@ Api::IoCallUint64Result Win32SocketHandleImpl::readFromPeekBuffer(Buffer::Instan size_t length) { auto lenght_to_move = std::min(peek_buffer_->length(), static_cast(length)); buffer.move(*peek_buffer_, lenght_to_move); - peek_buffer_->drain(lenght_to_move); return Api::IoCallUint64Result(lenght_to_move, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); } diff --git a/source/common/network/win32_socket_handle_impl.h b/source/common/network/win32_socket_handle_impl.h index 8d0d839b5e13..cf6783932758 100644 --- a/source/common/network/win32_socket_handle_impl.h +++ b/source/common/network/win32_socket_handle_impl.h @@ -60,9 +60,6 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl { Api::IoCallUint64Result readvFromPeekBuffer(uint64_t max_length, Buffer::RawSlice* slices, uint64_t num_slice); Api::IoCallUint64Result peekFromPeekBuffer(void* buffer, size_t length); - - Api::IoCallUint64Result readvImpl(uint64_t max_length, Buffer::RawSlice* slices, - uint64_t num_slice); }; } // namespace Network } // namespace Envoy From 3eb8880e4cf2623fd1e5c64d431d6b223ea2c09e Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 19 Nov 2021 07:10:49 -0800 Subject: [PATCH 10/35] address comments Signed-off-by: Sotiris Nanopoulos --- .../network/test/postgres_decoder_test.cc | 2 + envoy/buffer/buffer.h | 10 +++ source/common/buffer/buffer_impl.cc | 14 ++++ source/common/buffer/buffer_impl.h | 1 + .../network/win32_socket_handle_impl.cc | 80 ++++++++----------- .../common/network/win32_socket_handle_impl.h | 15 ++-- test/common/buffer/owned_impl_test.cc | 36 +++++++++ .../network/win32_socket_handle_impl_test.cc | 7 +- 8 files changed, 110 insertions(+), 55 deletions(-) diff --git a/contrib/postgres_proxy/filters/network/test/postgres_decoder_test.cc b/contrib/postgres_proxy/filters/network/test/postgres_decoder_test.cc index b143c1db2573..c50c4b677e6d 100644 --- a/contrib/postgres_proxy/filters/network/test/postgres_decoder_test.cc +++ b/contrib/postgres_proxy/filters/network/test/postgres_decoder_test.cc @@ -646,6 +646,8 @@ class FakeBuffer : public Buffer::Instance { MOCK_METHOD(void, prepend, (absl::string_view), (override)); MOCK_METHOD(void, prepend, (Instance&), (override)); MOCK_METHOD(void, copyOut, (size_t, uint64_t, void*), (const, override)); + MOCK_METHOD(uint64_t, copyOutToSlices, + (uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice), (const, override)); MOCK_METHOD(void, drain, (uint64_t), (override)); MOCK_METHOD(Buffer::RawSliceVector, getRawSlices, (absl::optional), (const, override)); MOCK_METHOD(Buffer::RawSlice, frontSlice, (), (const, override)); diff --git a/envoy/buffer/buffer.h b/envoy/buffer/buffer.h index 73dada2f25c3..d678df351459 100644 --- a/envoy/buffer/buffer.h +++ b/envoy/buffer/buffer.h @@ -201,6 +201,16 @@ class Instance { */ virtual void copyOut(size_t start, uint64_t size, void* data) const PURE; + /** + * Copy out a section of the buffer to dynamic array of slices. + * @param size supplies the size of the data that will be moved. + * @param slices supplies the output slices to fill. + * @param num_slice supplies the number of slices to fill. + * @return the number of bytes copied. + */ + virtual uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, + uint64_t num_slice) const PURE; + /** * Drain data from the buffer. * @param size supplies the length of data to drain. diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 85a7bd34f658..76328ea5a6ff 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -118,6 +118,20 @@ void OwnedImpl::copyOut(size_t start, uint64_t size, void* data) const { ASSERT(size == 0); } +uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice) const { + uint64_t total_length_to_read = std::min(size, this->length()); + uint64_t num_slices_to_read = 0; + uint64_t num_bytes_to_read = 0; + for (; num_slices_to_read < num_slice && num_bytes_to_read < total_length_to_read; + num_slices_to_read++) { + auto length_to_copy = std::min(static_cast(slices[num_slices_to_read].len_), + total_length_to_read - num_bytes_to_read); + this->copyOut(num_bytes_to_read, length_to_copy, slices[num_slices_to_read].mem_); + num_bytes_to_read += length_to_copy; + } + return num_bytes_to_read; +} + void OwnedImpl::drain(uint64_t size) { drainImpl(size); } void OwnedImpl::drainImpl(uint64_t size) { diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 7368b4880e1b..1e3fbf0bb3f3 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -681,6 +681,7 @@ class OwnedImpl : public LibEventInstance { void prepend(absl::string_view data) override; void prepend(Instance& data) override; void copyOut(size_t start, uint64_t size, void* data) const override; + uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice) const; void drain(uint64_t size) override; RawSliceVector getRawSlices(absl::optional max_slices = absl::nullopt) const override; RawSlice frontSlice() const override; diff --git a/source/common/network/win32_socket_handle_impl.cc b/source/common/network/win32_socket_handle_impl.cc index c00f902a29d1..3d9d20be2f0b 100644 --- a/source/common/network/win32_socket_handle_impl.cc +++ b/source/common/network/win32_socket_handle_impl.cc @@ -18,7 +18,7 @@ namespace Network { Api::IoCallUint64Result Win32SocketHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices, uint64_t num_slice) { - if (peek_buffer_->length() == 0) { + if (peek_buffer_.length() == 0) { auto result = IoSocketHandleImpl::readv(max_length, slices, num_slice); reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); return result; @@ -29,7 +29,7 @@ Api::IoCallUint64Result Win32SocketHandleImpl::readv(uint64_t max_length, Buffer Api::IoCallUint64Result Win32SocketHandleImpl::read(Buffer::Instance& buffer, absl::optional max_length_opt) { - if (peek_buffer_->length() == 0) { + if (peek_buffer_.length() == 0) { auto result = IoSocketHandleImpl::read(buffer, max_length_opt); reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); return result; @@ -80,32 +80,28 @@ Api::IoCallUint64Result Win32SocketHandleImpl::recvmmsg(RawSliceArrays& slices, Api::IoCallUint64Result Win32SocketHandleImpl::recv(void* buffer, size_t length, int flags) { if (flags & MSG_PEEK) { - // can a remote OOM us now that we are not protected by readDisable? - Api::IoCallUint64Result peek_result = drainToPeekBuffer(); - // Some fatal error happened - if (!peek_result.wouldBlock()) { - return peek_result; - } - - // No data available, register read again. - if (peek_result.wouldBlock() && peek_buffer_->length() == 0) { - if (file_event_) { + // The caller is responsible for calling with the larger size in cases it needs to do so + // it can't rely on transparent reactivations. + // So no activations should be needed in this case. + Api::IoCallUint64Result peek_result = drainToPeekBuffer(length); + + // Some error happened + if (!peek_result.ok()) { + if (peek_result.wouldBlock() && file_event_) { file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); + if (peek_buffer_.length() == 0) { + return peek_result; + } + } else { + return peek_result; } - return peek_result; } Api::IoCallUint64Result result = peekFromPeekBuffer(buffer, length); - if (peek_buffer_->length() < length && file_event_) { - file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); - } else { - // This means that our peak buffer has more data than what the user - // wanted. Return the slice to the caller. - // How can the caller (v2 proxy protocol inspector) reactivate the events again here? - } return result; } - if (peek_buffer_->length() == 0) { + + if (peek_buffer_.length() == 0) { Api::IoCallUint64Result result = IoSocketHandleImpl::recv(buffer, length, flags); reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); return result; @@ -121,53 +117,47 @@ void Win32SocketHandleImpl::reEnableEventBasedOnIOResult(const Api::IoCallUint64 } } -Api::IoCallUint64Result Win32SocketHandleImpl::drainToPeekBuffer() { - while (true) { - Buffer::Reservation reservation = peek_buffer_->reserveForRead(); +Api::IoCallUint64Result Win32SocketHandleImpl::drainToPeekBuffer(size_t length) { + size_t total_bytes_read = 0; + while (peek_buffer_.length() < length) { + Buffer::Reservation reservation = peek_buffer_.reserveForRead(); Api::IoCallUint64Result result = IoSocketHandleImpl::readv( reservation.length(), reservation.slices(), reservation.numSlices()); uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0; reservation.commit(bytes_to_commit); + total_bytes_read += bytes_to_commit; if (!result.ok() || bytes_to_commit == 0) { return result; } } + return Api::IoCallUint64Result(total_bytes_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); } Api::IoCallUint64Result Win32SocketHandleImpl::readFromPeekBuffer(void* buffer, size_t length) { - uint64_t copy_size = std::min(peek_buffer_->length(), static_cast(length)); - peek_buffer_->copyOut(0, copy_size, buffer); - peek_buffer_->drain(copy_size); + uint64_t copy_size = std::min(peek_buffer_.length(), static_cast(length)); + peek_buffer_.copyOut(0, copy_size, buffer); + peek_buffer_.drain(copy_size); return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); } Api::IoCallUint64Result Win32SocketHandleImpl::readvFromPeekBuffer(uint64_t max_length, Buffer::RawSlice* slices, uint64_t num_slice) { - uint64_t total_length_to_read = std::min(max_length, peek_buffer_->length()); - uint64_t num_slices_to_read = 0; - uint64_t num_bytes_to_read = 0; - for (; num_slices_to_read < num_slice && num_bytes_to_read < total_length_to_read; - num_slices_to_read++) { - auto length_to_copy = std::min(static_cast(slices[num_slices_to_read].len_), - total_length_to_read - num_bytes_to_read); - peek_buffer_->copyOut(num_bytes_to_read, length_to_copy, slices[num_slices_to_read].mem_); - num_bytes_to_read += length_to_copy; - } - peek_buffer_->drain(num_bytes_to_read); - return Api::IoCallUint64Result(num_bytes_to_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); + uint64_t bytes_read = peek_buffer_.copyOutToSlices(max_length, slices, num_slice); + peek_buffer_.drain(bytes_read); + return Api::IoCallUint64Result(bytes_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); } Api::IoCallUint64Result Win32SocketHandleImpl::readFromPeekBuffer(Buffer::Instance& buffer, size_t length) { - auto lenght_to_move = std::min(peek_buffer_->length(), static_cast(length)); - buffer.move(*peek_buffer_, lenght_to_move); + auto lenght_to_move = std::min(peek_buffer_.length(), static_cast(length)); + buffer.move(peek_buffer_, lenght_to_move); return Api::IoCallUint64Result(lenght_to_move, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); } Api::IoCallUint64Result Win32SocketHandleImpl::peekFromPeekBuffer(void* buffer, size_t length) { - uint64_t copy_size = std::min(peek_buffer_->length(), static_cast(length)); - peek_buffer_->copyOut(0, copy_size, buffer); + uint64_t copy_size = std::min(peek_buffer_.length(), static_cast(length)); + peek_buffer_.copyOut(0, copy_size, buffer); return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {})); } @@ -175,14 +165,14 @@ void Win32SocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, Event::FileTriggerType trigger, uint32_t events) { IoSocketHandleImpl::initializeFileEvent(dispatcher, cb, trigger, events); - if ((events & Event::FileReadyType::Read) && peek_buffer_->length() > 0) { + if ((events & Event::FileReadyType::Read) && peek_buffer_.length() > 0) { activateFileEvents(Event::FileReadyType::Read); } } void Win32SocketHandleImpl::enableFileEvents(uint32_t events) { IoSocketHandleImpl::enableFileEvents(events); - if ((events & Event::FileReadyType::Read) && peek_buffer_->length() > 0) { + if ((events & Event::FileReadyType::Read) && peek_buffer_.length() > 0) { activateFileEvents(Event::FileReadyType::Read); } } diff --git a/source/common/network/win32_socket_handle_impl.h b/source/common/network/win32_socket_handle_impl.h index cf6783932758..de31b225e342 100644 --- a/source/common/network/win32_socket_handle_impl.h +++ b/source/common/network/win32_socket_handle_impl.h @@ -21,8 +21,7 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl { public: explicit Win32SocketHandleImpl(os_fd_t fd = INVALID_SOCKET, bool socket_v6only = false, absl::optional domain = absl::nullopt) - : IoSocketHandleImpl(fd, socket_v6only, domain), - peek_buffer_(std::make_unique()) {} + : IoSocketHandleImpl(fd, socket_v6only, domain) {} Api::IoCallUint64Result readv(uint64_t max_length, Buffer::RawSlice* slices, uint64_t num_slice) override; @@ -51,15 +50,21 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl { private: void reEnableEventBasedOnIOResult(const Api::IoCallUint64Result& result, uint32_t event); - // For windows mimic MSG_PEEK - std::unique_ptr peek_buffer_; + // on Windows we use the MSG_PEEK of recv instead of peeking the socket + // we drain the socket to memory. Subsequent read calls need to read + // first from the class buffer and then go to the underlying socket. + Api::IoCallUint64Result drainToPeekBuffer(size_t length); - Api::IoCallUint64Result drainToPeekBuffer(); + // Useful functions to read from the peek buffer based on + // the signatures of readv/read/recv OS socket functions. Api::IoCallUint64Result readFromPeekBuffer(void* buffer, size_t length); Api::IoCallUint64Result readFromPeekBuffer(Buffer::Instance& buffer, size_t length); Api::IoCallUint64Result readvFromPeekBuffer(uint64_t max_length, Buffer::RawSlice* slices, uint64_t num_slice); Api::IoCallUint64Result peekFromPeekBuffer(void* buffer, size_t length); + + // For windows mimic MSG_PEEK + Buffer::OwnedImpl peek_buffer_; }; } // namespace Network } // namespace Envoy diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index 7a4adc7d3058..2e75899cd99e 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -1080,6 +1080,42 @@ void TestBufferMove(uint64_t buffer1_length, uint64_t buffer2_length, EXPECT_EQ(0, buffer2.length()); } +TEST_F(OwnedImplTest, CopyOutToSlicesTests) { + std::string data = "Hello, World!"; + Buffer::OwnedImpl buffer; + buffer.prepend(data); + + EXPECT_EQ(data.size(), buffer.length()); + EXPECT_EQ(data, buffer.toString()); + + { + Buffer::OwnedImpl buf; + auto reservation = buf.reserveSingleSlice(1024); + auto slice = reservation.slice(); + EXPECT_EQ(data.size(), buffer.copyOutToSlices(100, &slice, 1)); + } + + { + Buffer::OwnedImpl buf; + auto reservation = buf.reserveSingleSlice(5); + auto slice = reservation.slice(); + EXPECT_EQ(5, buffer.copyOutToSlices(100, &slice, 1)); + } + + { + Buffer::OwnedImpl buf; + auto reservation = buf.reserveForRead(); + EXPECT_EQ(5, buffer.copyOutToSlices(5, reservation.slices(), reservation.numSlices())); + } + + { + Buffer::OwnedImpl buf; + auto reservation = buf.reserveForRead(); + EXPECT_EQ(data.size(), + buffer.copyOutToSlices(100, reservation.slices(), reservation.numSlices())); + } +} + // Slice size large enough to prevent slice content from being coalesced into an existing slice constexpr uint64_t kLargeSliceSize = 2048; diff --git a/test/common/network/win32_socket_handle_impl_test.cc b/test/common/network/win32_socket_handle_impl_test.cc index 5577993153ee..cd19a9cc824a 100644 --- a/test/common/network/win32_socket_handle_impl_test.cc +++ b/test/common/network/win32_socket_handle_impl_test.cc @@ -79,7 +79,6 @@ TEST_F(Win32SocketHandleImplTest, ReadvWithBufferShouldReadFromBuffer) { peek_iov[0].iov_base = static_cast(data.data()); peek_iov[0].iov_len = data.length(); EXPECT_CALL(os_sys_calls, readv(_, _, _)) - .Times(2) .WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iov) { // Gcc treats the variables as unused and this causes // a compilation failure. @@ -88,8 +87,7 @@ TEST_F(Win32SocketHandleImplTest, ReadvWithBufferShouldReadFromBuffer) { iov = peek_iov.begin(); num_iov = 1; return Api::SysCallSizeResult{data_length, 0}; - })) - .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_AGAIN})); + })); absl::FixedArray buf(data_length); auto rc = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); @@ -119,7 +117,6 @@ TEST_F(Win32SocketHandleImplTest, RecvWithPeekReactivatesReadOnBlock) { EXPECT_CALL(os_sys_calls, readv(_, _, _)) .Times(1) .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_AGAIN})); - ; EXPECT_CALL(*file_event_, registerEventIfEmulatedEdge(_)); absl::FixedArray buf(10); @@ -144,7 +141,7 @@ TEST_F(Win32SocketHandleImplTest, RecvWithPeekFlagReturnsFinalError) { UNREFERENCED_PARAMETER(num_iov); iov = peek_iov.begin(); num_iov = 1; - return Api::SysCallSizeResult{data_length, 0}; + return Api::SysCallSizeResult{data_length / 2, 0}; })) .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_CONNRESET})); From f2fd1fb9ff5104b8c1e1eae2ef9dae079c8a0c39 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 19 Nov 2021 07:25:20 -0800 Subject: [PATCH 11/35] fix spelling Signed-off-by: Sotiris Nanopoulos --- source/common/network/win32_socket_handle_impl.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/network/win32_socket_handle_impl.cc b/source/common/network/win32_socket_handle_impl.cc index 3d9d20be2f0b..c5f980d0b301 100644 --- a/source/common/network/win32_socket_handle_impl.cc +++ b/source/common/network/win32_socket_handle_impl.cc @@ -80,9 +80,9 @@ Api::IoCallUint64Result Win32SocketHandleImpl::recvmmsg(RawSliceArrays& slices, Api::IoCallUint64Result Win32SocketHandleImpl::recv(void* buffer, size_t length, int flags) { if (flags & MSG_PEEK) { - // The caller is responsible for calling with the larger size in cases it needs to do so - // it can't rely on transparent reactivations. - // So no activations should be needed in this case. + // The caller is responsible for calling with the larger size + // in cases it needs to do so it can't rely on transparent event activation. + // So no in this case we should activate read again unless the read blocked. Api::IoCallUint64Result peek_result = drainToPeekBuffer(length); // Some error happened From 72e8f201fc807b6388e8ef29ba711f76642ed344 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 19 Nov 2021 07:51:03 -0800 Subject: [PATCH 12/35] fix format Signed-off-by: Sotiris Nanopoulos --- source/common/buffer/buffer_impl.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 76328ea5a6ff..d65d6be7ebbc 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -118,7 +118,8 @@ void OwnedImpl::copyOut(size_t start, uint64_t size, void* data) const { ASSERT(size == 0); } -uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice) const { +uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, + uint64_t num_slice) const { uint64_t total_length_to_read = std::min(size, this->length()); uint64_t num_slices_to_read = 0; uint64_t num_bytes_to_read = 0; From db35fbf92e2c05db63c5e27321d60e9840360a33 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 19 Nov 2021 08:41:28 -0800 Subject: [PATCH 13/35] add override keyword Signed-off-by: Sotiris Nanopoulos --- source/common/buffer/buffer_impl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 1e3fbf0bb3f3..3c5bba1bb18b 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -681,7 +681,7 @@ class OwnedImpl : public LibEventInstance { void prepend(absl::string_view data) override; void prepend(Instance& data) override; void copyOut(size_t start, uint64_t size, void* data) const override; - uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice) const; + uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice) const override; void drain(uint64_t size) override; RawSliceVector getRawSlices(absl::optional max_slices = absl::nullopt) const override; RawSlice frontSlice() const override; From 16caa4faf463e25493268e33fdccd4e0bd924eaf Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 19 Nov 2021 09:03:52 -0800 Subject: [PATCH 14/35] another format fix Signed-off-by: Sotiris Nanopoulos --- source/common/buffer/buffer_impl.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/common/buffer/buffer_impl.h b/source/common/buffer/buffer_impl.h index 3c5bba1bb18b..c66550081deb 100644 --- a/source/common/buffer/buffer_impl.h +++ b/source/common/buffer/buffer_impl.h @@ -681,7 +681,8 @@ class OwnedImpl : public LibEventInstance { void prepend(absl::string_view data) override; void prepend(Instance& data) override; void copyOut(size_t start, uint64_t size, void* data) const override; - uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice) const override; + uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, + uint64_t num_slice) const override; void drain(uint64_t size) override; RawSliceVector getRawSlices(absl::optional max_slices = absl::nullopt) const override; RawSlice frontSlice() const override; From a479ef850a97de223521784bb1e691bd77cfd5d2 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 19 Nov 2021 11:25:19 -0800 Subject: [PATCH 15/35] fix string buffer used in fuzzing Signed-off-by: Sotiris Nanopoulos --- test/common/buffer/buffer_fuzz.cc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index 9d7cb96004e1..77a65435d9fb 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -114,6 +114,11 @@ class StringBuffer : public Buffer::Instance { ::memcpy(data, this->start() + start, size); } + uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, + uint64_t num_slice) const { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE + } + void drain(uint64_t size) override { FUZZ_ASSERT(size <= size_); start_ += size; From d3dd4d36ca01ac97054128f2e1fdc3587fc72956 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 19 Nov 2021 12:04:08 -0800 Subject: [PATCH 16/35] fix typo in fuzzer Signed-off-by: Sotiris Nanopoulos --- test/common/buffer/buffer_fuzz.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index 77a65435d9fb..79220b058d65 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -114,8 +114,7 @@ class StringBuffer : public Buffer::Instance { ::memcpy(data, this->start() + start, size); } - uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, - uint64_t num_slice) const { + uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice) const { NOT_IMPLEMENTED_GCOVR_EXCL_LINE } From 23c63413bc62268aeafb1e2057a31803904b2324 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 19 Nov 2021 12:46:01 -0800 Subject: [PATCH 17/35] add override keyword Signed-off-by: Sotiris Nanopoulos --- test/common/buffer/buffer_fuzz.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index 79220b058d65..f53fb7b6dcc5 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -114,7 +114,8 @@ class StringBuffer : public Buffer::Instance { ::memcpy(data, this->start() + start, size); } - uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice) const { + uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, + uint64_t num_slice) const override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE } From 1c1223f733eced99545be22c3c5d12bd81a3ce1d Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 19 Nov 2021 13:16:13 -0800 Subject: [PATCH 18/35] fixed another typo Signed-off-by: Sotiris Nanopoulos --- test/common/buffer/buffer_fuzz.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index f53fb7b6dcc5..b20877b3aff5 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -116,7 +116,7 @@ class StringBuffer : public Buffer::Instance { uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice) const override { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } void drain(uint64_t size) override { From abe8be2b1e1fa155f5b70cb44a0226b324820711 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Fri, 19 Nov 2021 13:51:14 -0800 Subject: [PATCH 19/35] remove unused parameters Signed-off-by: Sotiris Nanopoulos --- test/common/buffer/buffer_fuzz.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index b20877b3aff5..e22e54a6dd69 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -114,8 +114,7 @@ class StringBuffer : public Buffer::Instance { ::memcpy(data, this->start() + start, size); } - uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, - uint64_t num_slice) const override { + uint64_t copyOutToSlices(uint64_t, Buffer::RawSlice*, uint64_t) const override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; } From 31aee3734f1f98163425f5d929f786cb8eeb225d Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Tue, 23 Nov 2021 08:43:48 -0800 Subject: [PATCH 20/35] PR comments and tests Signed-off-by: Sotiris Nanopoulos --- .../network/win32_socket_handle_impl.cc | 33 ++++++++++++------- .../common/network/win32_socket_handle_impl.h | 5 +++ .../network/win32_socket_handle_impl_test.cc | 28 ++++++++++++++++ 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/source/common/network/win32_socket_handle_impl.cc b/source/common/network/win32_socket_handle_impl.cc index c5f980d0b301..84bf6a6690ba 100644 --- a/source/common/network/win32_socket_handle_impl.cc +++ b/source/common/network/win32_socket_handle_impl.cc @@ -80,6 +80,23 @@ Api::IoCallUint64Result Win32SocketHandleImpl::recvmmsg(RawSliceArrays& slices, Api::IoCallUint64Result Win32SocketHandleImpl::recv(void* buffer, size_t length, int flags) { if (flags & MSG_PEEK) { + return peek(buffer, length); + } + + if (peek_buffer_.length() == 0) { + Api::IoCallUint64Result result = IoSocketHandleImpl::recv(buffer, length, flags); + reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); + return result; + } else { + return readFromPeekBuffer(buffer, length); + } +} + +Api::IoCallUint64Result Win32SocketHandleImpl::peek(void* buffer, size_t length) { + + // if the `peek_buffer_` has already enough data we can skip reading + // from the wire. + if (length > peek_buffer_.length()) { // The caller is responsible for calling with the larger size // in cases it needs to do so it can't rely on transparent event activation. // So no in this case we should activate read again unless the read blocked. @@ -96,18 +113,9 @@ Api::IoCallUint64Result Win32SocketHandleImpl::recv(void* buffer, size_t length, return peek_result; } } - - Api::IoCallUint64Result result = peekFromPeekBuffer(buffer, length); - return result; } - if (peek_buffer_.length() == 0) { - Api::IoCallUint64Result result = IoSocketHandleImpl::recv(buffer, length, flags); - reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); - return result; - } else { - return readFromPeekBuffer(buffer, length); - } + return peekFromPeekBuffer(buffer, length); } void Win32SocketHandleImpl::reEnableEventBasedOnIOResult(const Api::IoCallUint64Result& result, @@ -121,8 +129,9 @@ Api::IoCallUint64Result Win32SocketHandleImpl::drainToPeekBuffer(size_t length) size_t total_bytes_read = 0; while (peek_buffer_.length() < length) { Buffer::Reservation reservation = peek_buffer_.reserveForRead(); - Api::IoCallUint64Result result = IoSocketHandleImpl::readv( - reservation.length(), reservation.slices(), reservation.numSlices()); + auto bytes_to_read = std::min((length - total_bytes_read), reservation.length()); + Api::IoCallUint64Result result = + IoSocketHandleImpl::readv(bytes_to_read, reservation.slices(), reservation.numSlices()); uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0; reservation.commit(bytes_to_commit); total_bytes_read += bytes_to_commit; diff --git a/source/common/network/win32_socket_handle_impl.h b/source/common/network/win32_socket_handle_impl.h index de31b225e342..5ab1b5e8b52b 100644 --- a/source/common/network/win32_socket_handle_impl.h +++ b/source/common/network/win32_socket_handle_impl.h @@ -53,6 +53,11 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl { // on Windows we use the MSG_PEEK of recv instead of peeking the socket // we drain the socket to memory. Subsequent read calls need to read // first from the class buffer and then go to the underlying socket. + + // Implement the peek logic of recv for readability purposes + Api::IoCallUint64Result peek(void* buffer, size_t length); + + // Drain the socket into `peek_buffer_` Api::IoCallUint64Result drainToPeekBuffer(size_t length); // Useful functions to read from the peek buffer based on diff --git a/test/common/network/win32_socket_handle_impl_test.cc b/test/common/network/win32_socket_handle_impl_test.cc index cd19a9cc824a..1c5f4204b3d6 100644 --- a/test/common/network/win32_socket_handle_impl_test.cc +++ b/test/common/network/win32_socket_handle_impl_test.cc @@ -150,5 +150,33 @@ TEST_F(Win32SocketHandleImplTest, RecvWithPeekFlagReturnsFinalError) { EXPECT_EQ(rc.err_->getErrorCode(), Api::IoError::IoErrorCode::ConnectionReset); } +TEST_F(Win32SocketHandleImplTest, ReadvWithPeekShouldReadFromBuffer) { + Api::MockOsSysCalls os_sys_calls; + TestThreadsafeSingletonInjector os_calls(&os_sys_calls); + constexpr int data_length = 10; + std::string data(data_length, '*'); + absl::FixedArray peek_iov(1); + peek_iov[0].iov_base = static_cast(data.data()); + peek_iov[0].iov_len = data.length(); + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iov) { + // Gcc treats the variables as unused and this causes + // a compilation failure. + UNREFERENCED_PARAMETER(iov); + UNREFERENCED_PARAMETER(num_iov); + iov = peek_iov.begin(); + num_iov = 1; + return Api::SysCallSizeResult{data_length, 0}; + })); + + absl::FixedArray buf(data_length); + auto rc = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); + EXPECT_EQ(rc.return_value_, data_length); + // Second call should not make a system call, it should + // read from memory. + rc = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); + EXPECT_EQ(rc.return_value_, data_length); +} + } // namespace Network } // namespace Envoy From 55361a3fb51b9b4e6a12bcda4b1e15ca2cb4f5b1 Mon Sep 17 00:00:00 2001 From: Sotiris Nanopoulos Date: Tue, 23 Nov 2021 12:56:34 -0800 Subject: [PATCH 21/35] add hint for mac compiler Signed-off-by: Sotiris Nanopoulos --- source/common/network/win32_socket_handle_impl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/network/win32_socket_handle_impl.cc b/source/common/network/win32_socket_handle_impl.cc index 84bf6a6690ba..9447bcd9559f 100644 --- a/source/common/network/win32_socket_handle_impl.cc +++ b/source/common/network/win32_socket_handle_impl.cc @@ -129,7 +129,7 @@ Api::IoCallUint64Result Win32SocketHandleImpl::drainToPeekBuffer(size_t length) size_t total_bytes_read = 0; while (peek_buffer_.length() < length) { Buffer::Reservation reservation = peek_buffer_.reserveForRead(); - auto bytes_to_read = std::min((length - total_bytes_read), reservation.length()); + uint64_t bytes_to_read = std::min((length - total_bytes_read), reservation.length()); Api::IoCallUint64Result result = IoSocketHandleImpl::readv(bytes_to_read, reservation.slices(), reservation.numSlices()); uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0; From 320e3706a7119e63f32bf5d59963c3479b0a8fee Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 15 Dec 2021 06:15:34 +0000 Subject: [PATCH 22/35] Improve the copyOutToSlices method Signed-off-by: He Jie Xu --- source/common/buffer/buffer_impl.cc | 37 +++++++++++++++++------- test/common/buffer/owned_impl_test.cc | 41 +++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 11 deletions(-) diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index d65d6be7ebbc..9a9e4bf09c2c 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -118,19 +118,34 @@ void OwnedImpl::copyOut(size_t start, uint64_t size, void* data) const { ASSERT(size == 0); } -uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* slices, +uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* dest_slices, uint64_t num_slice) const { uint64_t total_length_to_read = std::min(size, this->length()); - uint64_t num_slices_to_read = 0; - uint64_t num_bytes_to_read = 0; - for (; num_slices_to_read < num_slice && num_bytes_to_read < total_length_to_read; - num_slices_to_read++) { - auto length_to_copy = std::min(static_cast(slices[num_slices_to_read].len_), - total_length_to_read - num_bytes_to_read); - this->copyOut(num_bytes_to_read, length_to_copy, slices[num_slices_to_read].mem_); - num_bytes_to_read += length_to_copy; - } - return num_bytes_to_read; + uint64_t num_bytes_read = 0; + uint64_t num_dest_slices_read = 0; + uint64_t num_src_slices_read = 0; + uint64_t dest_slice_off = 0; + uint64_t src_slice_off = 0; + for (; num_dest_slices_read < num_slice && num_bytes_read < total_length_to_read && + num_src_slices_read < slices_.size();) { + auto& src_slice = slices_[num_src_slices_read]; + auto& dest_slice = dest_slices[num_dest_slices_read]; + auto length_to_copy = std::min( + src_slice.dataSize() - src_slice_off, + std::min(static_cast(dest_slice.len_), total_length_to_read - num_bytes_read)); + memcpy(dest_slice.mem_, src_slice.data(), length_to_copy); // NOLINT(safe-memcpy) + dest_slice.len_ = length_to_copy; + src_slice_off = src_slice.dataSize() - src_slice_off - length_to_copy; + dest_slice_off = dest_slice.len_ - dest_slice_off - length_to_copy; + if (src_slice_off == 0) { + num_src_slices_read++; + } + if (dest_slice_off == 0) { + num_dest_slices_read++; + } + num_bytes_read += length_to_copy; + } + return num_bytes_read; } void OwnedImpl::drain(uint64_t size) { drainImpl(size); } diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index 2e75899cd99e..edb0bcd293c7 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -1093,6 +1093,8 @@ TEST_F(OwnedImplTest, CopyOutToSlicesTests) { auto reservation = buf.reserveSingleSlice(1024); auto slice = reservation.slice(); EXPECT_EQ(data.size(), buffer.copyOutToSlices(100, &slice, 1)); + reservation.commit(data.size()); + EXPECT_EQ(data, buffer.toString()); } { @@ -1100,12 +1102,16 @@ TEST_F(OwnedImplTest, CopyOutToSlicesTests) { auto reservation = buf.reserveSingleSlice(5); auto slice = reservation.slice(); EXPECT_EQ(5, buffer.copyOutToSlices(100, &slice, 1)); + reservation.commit(5); + EXPECT_EQ("Hello", buf.toString()); } { Buffer::OwnedImpl buf; auto reservation = buf.reserveForRead(); EXPECT_EQ(5, buffer.copyOutToSlices(5, reservation.slices(), reservation.numSlices())); + reservation.commit(5); + EXPECT_EQ("Hello", buf.toString()); } { @@ -1113,6 +1119,41 @@ TEST_F(OwnedImplTest, CopyOutToSlicesTests) { auto reservation = buf.reserveForRead(); EXPECT_EQ(data.size(), buffer.copyOutToSlices(100, reservation.slices(), reservation.numSlices())); + reservation.commit(data.size()); + EXPECT_EQ(data, buffer.toString()); + } + // Test the destination buffer has smaller slice than the source buffer. + { + Buffer::OwnedImpl buf; + buf.appendSliceForTest("aa", 2); + buf.appendSliceForTest("aa", 2); + buf.appendSliceForTest("aa", 2); + buf.appendSliceForTest("aa", 2); + buf.appendSliceForTest("aa", 2); + buf.appendSliceForTest("aa", 2); + buf.appendSliceForTest("a", 1); + auto reservation = buf.reserveForRead(); + EXPECT_EQ(data.size(), + buffer.copyOutToSlices(100, reservation.slices(), reservation.numSlices())); + reservation.commit(data.size()); + EXPECT_EQ(data, buffer.toString()); + } + // Test the source buffer has smaller slice than the destination buffer. + { + Buffer::OwnedImpl dest_buf; + dest_buf.appendSliceForTest("He", 2); + dest_buf.appendSliceForTest("ll", 2); + dest_buf.appendSliceForTest("o,", 2); + dest_buf.appendSliceForTest(" W", 2); + dest_buf.appendSliceForTest("or", 2); + dest_buf.appendSliceForTest("ld", 2); + dest_buf.appendSliceForTest("!", 1); + Buffer::OwnedImpl buf; + auto reservation = buf.reserveForRead(); + EXPECT_EQ(data.size(), + buffer.copyOutToSlices(100, reservation.slices(), reservation.numSlices())); + reservation.commit(data.size()); + EXPECT_EQ(data, buffer.toString()); } } From a2e879929e87cf065d4b9c03df2bf3f062ff6968 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 15 Dec 2021 07:41:11 +0000 Subject: [PATCH 23/35] address comment Signed-off-by: He Jie Xu --- envoy/buffer/buffer.h | 2 +- .../filters/listener/proxy_protocol/proxy_protocol_test.cc | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/envoy/buffer/buffer.h b/envoy/buffer/buffer.h index d678df351459..f0afe590d3b4 100644 --- a/envoy/buffer/buffer.h +++ b/envoy/buffer/buffer.h @@ -203,7 +203,7 @@ class Instance { /** * Copy out a section of the buffer to dynamic array of slices. - * @param size supplies the size of the data that will be moved. + * @param size supplies the size of the data that will be copied. * @param slices supplies the output slices to fill. * @param num_slice supplies the number of slices to fill. * @return the number of bytes copied. diff --git a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc index b68b6bb4be66..1a5528fc2dd5 100644 --- a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc +++ b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc @@ -877,11 +877,11 @@ TEST_P(ProxyProtocolTest, V2Fragmented5Error) { // TODO(davinci26): Mocking should not be used to provide real system calls. #ifdef WIN32 - bool partial_writed = false; + bool partial_write = false; EXPECT_CALL(os_sys_calls, readv(_, _, _)) .Times(AnyNumber()) .WillRepeatedly(Invoke([&](os_fd_t fd, const iovec* iov, int num_iov) { - if (partial_writed) { + if (partial_write) { ENVOY_LOG_MISC(debug, "inject failure"); return Api::SysCallSizeResult{-1, 0}; } @@ -944,7 +944,7 @@ TEST_P(ProxyProtocolTest, V2Fragmented5Error) { write(buffer, 10); dispatcher_->run(Event::Dispatcher::RunType::NonBlock); #ifdef WIN32 - partial_writed = true; + partial_write = true; #endif write(buffer + 10, 10); From d113f4c57cdc3c7e30a13daec879209e5eb97aa2 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 15 Dec 2021 07:45:08 +0000 Subject: [PATCH 24/35] type conversion for macos Signed-off-by: He Jie Xu --- source/common/network/win32_socket_handle_impl.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/common/network/win32_socket_handle_impl.cc b/source/common/network/win32_socket_handle_impl.cc index 9447bcd9559f..5262ba32c2cc 100644 --- a/source/common/network/win32_socket_handle_impl.cc +++ b/source/common/network/win32_socket_handle_impl.cc @@ -129,7 +129,8 @@ Api::IoCallUint64Result Win32SocketHandleImpl::drainToPeekBuffer(size_t length) size_t total_bytes_read = 0; while (peek_buffer_.length() < length) { Buffer::Reservation reservation = peek_buffer_.reserveForRead(); - uint64_t bytes_to_read = std::min((length - total_bytes_read), reservation.length()); + uint64_t bytes_to_read = + std::min(static_cast(length - total_bytes_read), reservation.length()); Api::IoCallUint64Result result = IoSocketHandleImpl::readv(bytes_to_read, reservation.slices(), reservation.numSlices()); uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0; From 0c03750a42a0ca6ab5fe62cd6f6b557913daafad Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Fri, 24 Dec 2021 04:57:00 +0000 Subject: [PATCH 25/35] correct the copyOutToSlices method Signed-off-by: He Jie Xu --- source/common/buffer/buffer_impl.cc | 14 ++++---- test/common/buffer/owned_impl_test.cc | 51 +++++++++++++++------------ 2 files changed, 36 insertions(+), 29 deletions(-) diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 9a9e4bf09c2c..029dde845e63 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -133,15 +133,17 @@ uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* dest_slices auto length_to_copy = std::min( src_slice.dataSize() - src_slice_off, std::min(static_cast(dest_slice.len_), total_length_to_read - num_bytes_read)); - memcpy(dest_slice.mem_, src_slice.data(), length_to_copy); // NOLINT(safe-memcpy) - dest_slice.len_ = length_to_copy; - src_slice_off = src_slice.dataSize() - src_slice_off - length_to_copy; - dest_slice_off = dest_slice.len_ - dest_slice_off - length_to_copy; - if (src_slice_off == 0) { + memcpy(static_cast(dest_slice.mem_) + dest_slice_off, // NOLINT(safe-memcpy) + src_slice.data() + src_slice_off, length_to_copy); + src_slice_off = src_slice_off + length_to_copy; + dest_slice_off = dest_slice_off + length_to_copy; + if (src_slice_off == src_slice.dataSize()) { num_src_slices_read++; + src_slice_off = 0; } - if (dest_slice_off == 0) { + if (dest_slice_off == dest_slice.len_) { num_dest_slices_read++; + dest_slice_off = 0; } num_bytes_read += length_to_copy; } diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index edb0bcd293c7..bb87e5ece470 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -1,4 +1,5 @@ #include +#include #include "envoy/api/io_error.h" @@ -1120,40 +1121,44 @@ TEST_F(OwnedImplTest, CopyOutToSlicesTests) { EXPECT_EQ(data.size(), buffer.copyOutToSlices(100, reservation.slices(), reservation.numSlices())); reservation.commit(data.size()); - EXPECT_EQ(data, buffer.toString()); + EXPECT_EQ(data, buf.toString()); } // Test the destination buffer has smaller slice than the source buffer. { - Buffer::OwnedImpl buf; - buf.appendSliceForTest("aa", 2); - buf.appendSliceForTest("aa", 2); - buf.appendSliceForTest("aa", 2); - buf.appendSliceForTest("aa", 2); - buf.appendSliceForTest("aa", 2); - buf.appendSliceForTest("aa", 2); - buf.appendSliceForTest("a", 1); - auto reservation = buf.reserveForRead(); + Buffer::OwnedImpl src_buf; + std::string data; + for (auto i = 0; i < (32 * 1024); i++) { + data.append(std::to_string(i % 10)); + } + // Build the source buffer to have a single 32KB slice. + src_buf.appendSliceForTest(data); + + Buffer::OwnedImpl dest_buf; + // The destination buffer are expected to have 8 Slices, each slice has 16KB buffer. + auto reservation = dest_buf.reserveForRead(); + // Copy single 32 KB slice's data to 8 * 16KB slices. EXPECT_EQ(data.size(), - buffer.copyOutToSlices(100, reservation.slices(), reservation.numSlices())); + src_buf.copyOutToSlices(32 * 1024, reservation.slices(), reservation.numSlices())); reservation.commit(data.size()); - EXPECT_EQ(data, buffer.toString()); + EXPECT_EQ(data, dest_buf.toString()); } // Test the source buffer has smaller slice than the destination buffer. { + Buffer::OwnedImpl src_buf; + // Build the source buffer to have 7 slices. + src_buf.appendSliceForTest("He", 2); + src_buf.appendSliceForTest("ll", 2); + src_buf.appendSliceForTest("o,", 2); + src_buf.appendSliceForTest(" W", 2); + src_buf.appendSliceForTest("or", 2); + src_buf.appendSliceForTest("ld", 2); + src_buf.appendSliceForTest("!", 1); Buffer::OwnedImpl dest_buf; - dest_buf.appendSliceForTest("He", 2); - dest_buf.appendSliceForTest("ll", 2); - dest_buf.appendSliceForTest("o,", 2); - dest_buf.appendSliceForTest(" W", 2); - dest_buf.appendSliceForTest("or", 2); - dest_buf.appendSliceForTest("ld", 2); - dest_buf.appendSliceForTest("!", 1); - Buffer::OwnedImpl buf; - auto reservation = buf.reserveForRead(); + auto reservation = dest_buf.reserveForRead(); EXPECT_EQ(data.size(), - buffer.copyOutToSlices(100, reservation.slices(), reservation.numSlices())); + src_buf.copyOutToSlices(100, reservation.slices(), reservation.numSlices())); reservation.commit(data.size()); - EXPECT_EQ(data, buffer.toString()); + EXPECT_EQ(data, dest_buf.toString()); } } From 959170b2d6f8cef98be32c70f1aefc199f11c587 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Fri, 24 Dec 2021 06:46:00 +0000 Subject: [PATCH 26/35] remove NOT_IMPLEMENTED_GCOVR_EXCL_LINE Signed-off-by: He Jie Xu --- test/common/buffer/buffer_fuzz.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index ccb8697a1c7f..c5cc697290de 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -115,7 +115,7 @@ class StringBuffer : public Buffer::Instance { } uint64_t copyOutToSlices(uint64_t, Buffer::RawSlice*, uint64_t) const override { - NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + PANIC("not implemented"); } void drain(uint64_t size) override { From d2fe6e299248fbf72181641b05591a067a2dad17 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Thu, 6 Jan 2022 05:58:22 +0000 Subject: [PATCH 27/35] address comment Signed-off-by: He Jie Xu --- .../common/network/win32_socket_handle_impl.h | 6 ++- test/common/buffer/owned_impl_test.cc | 14 +++++ .../network/win32_socket_handle_impl_test.cc | 51 +++++-------------- 3 files changed, 33 insertions(+), 38 deletions(-) diff --git a/source/common/network/win32_socket_handle_impl.h b/source/common/network/win32_socket_handle_impl.h index 5ab1b5e8b52b..2ffd7de5860f 100644 --- a/source/common/network/win32_socket_handle_impl.h +++ b/source/common/network/win32_socket_handle_impl.h @@ -57,7 +57,11 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl { // Implement the peek logic of recv for readability purposes Api::IoCallUint64Result peek(void* buffer, size_t length); - // Drain the socket into `peek_buffer_` + /** + * Drain the socket into `peek_buffer_`. + * @param length is the desired length of data drained into the `peek_buffer_`. + * @return the actual length of data drained into the `peek_buffer_`. + */ Api::IoCallUint64Result drainToPeekBuffer(size_t length); // Useful functions to read from the peek buffer based on diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index bb87e5ece470..b8e639d95de0 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -1132,10 +1132,17 @@ TEST_F(OwnedImplTest, CopyOutToSlicesTests) { } // Build the source buffer to have a single 32KB slice. src_buf.appendSliceForTest(data); + EXPECT_EQ(1, src_buf.getRawSlices().size()); + EXPECT_EQ(32 * 1024, src_buf.frontSlice().len_); Buffer::OwnedImpl dest_buf; // The destination buffer are expected to have 8 Slices, each slice has 16KB buffer. auto reservation = dest_buf.reserveForRead(); + EXPECT_EQ(8, reservation.numSlices()); + for (uint64_t i = 0; i < reservation.numSlices(); i++) { + EXPECT_EQ(16 * 1024, reservation.slices()[i].len_); + } + // Copy single 32 KB slice's data to 8 * 16KB slices. EXPECT_EQ(data.size(), src_buf.copyOutToSlices(32 * 1024, reservation.slices(), reservation.numSlices())); @@ -1154,7 +1161,14 @@ TEST_F(OwnedImplTest, CopyOutToSlicesTests) { src_buf.appendSliceForTest("ld", 2); src_buf.appendSliceForTest("!", 1); Buffer::OwnedImpl dest_buf; + // The destination buffer are expected to have 8 Slices, each slice has 16KB buffer. auto reservation = dest_buf.reserveForRead(); + EXPECT_EQ(8, reservation.numSlices()); + for (uint64_t i = 0; i < reservation.numSlices(); i++) { + EXPECT_EQ(16 * 1024, reservation.slices()[i].len_); + } + + // Copy data from src 7 slices into the first 16K slice of dest. EXPECT_EQ(data.size(), src_buf.copyOutToSlices(100, reservation.slices(), reservation.numSlices())); reservation.commit(data.size()); diff --git a/test/common/network/win32_socket_handle_impl_test.cc b/test/common/network/win32_socket_handle_impl_test.cc index 1c5f4204b3d6..4bdcc4444355 100644 --- a/test/common/network/win32_socket_handle_impl_test.cc +++ b/test/common/network/win32_socket_handle_impl_test.cc @@ -75,27 +75,21 @@ TEST_F(Win32SocketHandleImplTest, ReadvWithBufferShouldReadFromBuffer) { TestThreadsafeSingletonInjector os_calls(&os_sys_calls); constexpr int data_length = 10; std::string data(data_length, '*'); - absl::FixedArray peek_iov(1); - peek_iov[0].iov_base = static_cast(data.data()); - peek_iov[0].iov_len = data.length(); - EXPECT_CALL(os_sys_calls, readv(_, _, _)) - .WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iov) { - // Gcc treats the variables as unused and this causes - // a compilation failure. - UNREFERENCED_PARAMETER(iov); - UNREFERENCED_PARAMETER(num_iov); - iov = peek_iov.begin(); - num_iov = 1; - return Api::SysCallSizeResult{data_length, 0}; - })); + EXPECT_CALL(os_sys_calls, readv(_, _, _)).WillOnce(Invoke([&](os_fd_t, const iovec* iov, int) { + memcpy(iov->iov_base, data.data(), data_length); // NOLINT(safe-memcpy) + return Api::SysCallSizeResult{data_length, 0}; + })); absl::FixedArray buf(data_length); auto rc = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); EXPECT_EQ(rc.return_value_, data_length); + EXPECT_EQ(data, std::string(buf.data(), buf.size())); Buffer::OwnedImpl read_buffer; Buffer::Reservation reservation = read_buffer.reserveForRead(); rc = io_handle_.readv(reservation.length(), reservation.slices(), reservation.numSlices()); EXPECT_EQ(rc.return_value_, 10); + reservation.commit(rc.return_value_); + EXPECT_EQ(data, read_buffer.toString()); } TEST_F(Win32SocketHandleImplTest, RecvWithoutPeekShouldReadFromWire) { @@ -128,19 +122,9 @@ TEST_F(Win32SocketHandleImplTest, RecvWithPeekFlagReturnsFinalError) { Api::MockOsSysCalls os_sys_calls; TestThreadsafeSingletonInjector os_calls(&os_sys_calls); constexpr int data_length = 10; - std::string data(data_length, '*'); - absl::FixedArray peek_iov(1); - peek_iov[0].iov_base = static_cast(data.data()); - peek_iov[0].iov_len = data.length(); EXPECT_CALL(os_sys_calls, readv(_, _, _)) .Times(2) - .WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iov) { - // Gcc treats the variables as unused and this causes - // a compilation failure. - UNREFERENCED_PARAMETER(iov); - UNREFERENCED_PARAMETER(num_iov); - iov = peek_iov.begin(); - num_iov = 1; + .WillOnce(Invoke([&](os_fd_t, const iovec*, int) { return Api::SysCallSizeResult{data_length / 2, 0}; })) .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_CONNRESET})); @@ -155,27 +139,20 @@ TEST_F(Win32SocketHandleImplTest, ReadvWithPeekShouldReadFromBuffer) { TestThreadsafeSingletonInjector os_calls(&os_sys_calls); constexpr int data_length = 10; std::string data(data_length, '*'); - absl::FixedArray peek_iov(1); - peek_iov[0].iov_base = static_cast(data.data()); - peek_iov[0].iov_len = data.length(); - EXPECT_CALL(os_sys_calls, readv(_, _, _)) - .WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iov) { - // Gcc treats the variables as unused and this causes - // a compilation failure. - UNREFERENCED_PARAMETER(iov); - UNREFERENCED_PARAMETER(num_iov); - iov = peek_iov.begin(); - num_iov = 1; - return Api::SysCallSizeResult{data_length, 0}; - })); + EXPECT_CALL(os_sys_calls, readv(_, _, _)).WillOnce(Invoke([&](os_fd_t, const iovec* iov, int) { + memcpy(iov->iov_base, data.data(), data_length); // NOLINT(safe-memcpy) + return Api::SysCallSizeResult{data_length, 0}; + })); absl::FixedArray buf(data_length); auto rc = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); EXPECT_EQ(rc.return_value_, data_length); + EXPECT_EQ(data, std::string(buf.data(), buf.size())); // Second call should not make a system call, it should // read from memory. rc = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); EXPECT_EQ(rc.return_value_, data_length); + EXPECT_EQ(data, std::string(buf.data(), buf.size())); } } // namespace Network From fc5b18834b19c5ce7e1a20661fe634a341165917 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Tue, 11 Jan 2022 22:46:04 +0000 Subject: [PATCH 28/35] address comment Signed-off-by: He Jie Xu --- source/common/buffer/buffer_impl.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 702e2aee75a3..c8cc66e7180a 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -126,8 +126,7 @@ uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* dest_slices uint64_t num_src_slices_read = 0; uint64_t dest_slice_off = 0; uint64_t src_slice_off = 0; - for (; num_dest_slices_read < num_slice && num_bytes_read < total_length_to_read && - num_src_slices_read < slices_.size();) { + for (; num_dest_slices_read < num_slice && num_bytes_read < total_length_to_read;) { auto& src_slice = slices_[num_src_slices_read]; auto& dest_slice = dest_slices[num_dest_slices_read]; auto length_to_copy = std::min( From a8db58b889af33d60b9ff85156db4fa320d9b2b4 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Mon, 24 Jan 2022 15:19:30 +0800 Subject: [PATCH 29/35] Fix the peek size Signed-off-by: He Jie Xu --- .../network/win32_socket_handle_impl.cc | 2 +- .../network/win32_socket_handle_impl_test.cc | 29 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/source/common/network/win32_socket_handle_impl.cc b/source/common/network/win32_socket_handle_impl.cc index 5262ba32c2cc..2aa653bbece4 100644 --- a/source/common/network/win32_socket_handle_impl.cc +++ b/source/common/network/win32_socket_handle_impl.cc @@ -130,7 +130,7 @@ Api::IoCallUint64Result Win32SocketHandleImpl::drainToPeekBuffer(size_t length) while (peek_buffer_.length() < length) { Buffer::Reservation reservation = peek_buffer_.reserveForRead(); uint64_t bytes_to_read = - std::min(static_cast(length - total_bytes_read), reservation.length()); + std::min(static_cast(length - peek_buffer_.length()), reservation.length()); Api::IoCallUint64Result result = IoSocketHandleImpl::readv(bytes_to_read, reservation.slices(), reservation.numSlices()); uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0; diff --git a/test/common/network/win32_socket_handle_impl_test.cc b/test/common/network/win32_socket_handle_impl_test.cc index 4bdcc4444355..11adcddb58b3 100644 --- a/test/common/network/win32_socket_handle_impl_test.cc +++ b/test/common/network/win32_socket_handle_impl_test.cc @@ -105,6 +105,35 @@ TEST_F(Win32SocketHandleImplTest, RecvWithoutPeekShouldReadFromWire) { EXPECT_EQ(rc.return_value_, 10); } +TEST_F(Win32SocketHandleImplTest, RecvWithPeekMultipleTimes) { + Api::MockOsSysCalls os_sys_calls; + TestThreadsafeSingletonInjector os_calls(&os_sys_calls); + EXPECT_CALL(os_sys_calls, readv(_, _, _)).WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iovs) { + size_t size_to_read = 0; + for (auto i = 0; i < num_iovs; i++) { + size_to_read += iov[i].iov_len; + } + EXPECT_EQ(10, size_to_read); + return Api::SysCallSizeResult{5, 0}; + })).WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_AGAIN})); + + EXPECT_CALL(*file_event_, registerEventIfEmulatedEdge(_)); + absl::FixedArray buf(10); + auto rc = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); + EXPECT_EQ(rc.return_value_, 5); + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iovs) { + size_t size_to_read = 0; + for (auto i = 0; i < num_iovs; i++) { + size_to_read += iov[i].iov_len; + } + EXPECT_EQ(5, size_to_read); + return Api::SysCallSizeResult{5, 0}; + })); + auto rc2 = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); + EXPECT_EQ(rc2.return_value_, 10); +} + TEST_F(Win32SocketHandleImplTest, RecvWithPeekReactivatesReadOnBlock) { Api::MockOsSysCalls os_sys_calls; TestThreadsafeSingletonInjector os_calls(&os_sys_calls); From 88311b15a3685cc91b7976e7fe0cd72e75def60f Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Mon, 24 Jan 2022 12:21:51 +0000 Subject: [PATCH 30/35] Add fuzz test for copyOutToSlices Signed-off-by: He Jie Xu --- test/common/buffer/buffer_corpus/basic | 3 +++ test/common/buffer/buffer_fuzz.cc | 26 ++++++++++++++++++++++++-- test/common/buffer/buffer_fuzz.proto | 1 + 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/test/common/buffer/buffer_corpus/basic b/test/common/buffer/buffer_corpus/basic index 9fd31255e2e6..9f32b6d0bc31 100644 --- a/test/common/buffer/buffer_corpus/basic +++ b/test/common/buffer/buffer_corpus/basic @@ -27,6 +27,9 @@ actions { length: 200 } } +actions { + copy_out_to_slices: 200 +} actions { drain: 98 } diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index 6a8b0a43f4b7..9e86c18fdd25 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -114,8 +114,18 @@ class StringBuffer : public Buffer::Instance { ::memcpy(data, this->start() + start, size); } - uint64_t copyOutToSlices(uint64_t, Buffer::RawSlice*, uint64_t) const override { - PANIC("not implemented"); + uint64_t copyOutToSlices(uint64_t length, Buffer::RawSlice* slices, uint64_t num_slices) const override { + uint64_t size_copied = 0; + uint64_t num_slices_copied = 0; + while (size_copied < length && num_slices_copied < num_slices) { + auto copy_length = std::min((length - size_copied), slices[num_slices_copied].len_); + ::memcpy(slices[num_slices_copied].mem_, this->start(), copy_length); + size_copied += copy_length; + if (copy_length == slices[num_slices_copied].len_) { + num_slices_copied++; + } + } + return size_copied; } void drain(uint64_t size) override { @@ -322,6 +332,18 @@ uint32_t bufferAction(Context& ctxt, char insert_value, uint32_t max_alloc, Buff FUZZ_ASSERT(::memcmp(copy_buffer, data.data() + start, length) == 0); break; } + case test::common::buffer::Action::kCopyOutToSlices: { + const uint32_t length = + std::min(static_cast(target_buffer.length()), action.copy_out_to_slices()); + Buffer::OwnedImpl buffer; + auto reservation = buffer.reserveForRead(); + auto rc = target_buffer.copyOutToSlices(length, reservation.slices(), reservation.numSlices()); + reservation.commit(rc); + const std::string data = buffer.toString(); + const std::string target_data = target_buffer.toString(); + FUZZ_ASSERT(::memcmp(data.data(), target_data.data(), reservation.length()) == 0); + break; + } case test::common::buffer::Action::kDrain: { const uint32_t previous_length = target_buffer.length(); const uint32_t drain_length = diff --git a/test/common/buffer/buffer_fuzz.proto b/test/common/buffer/buffer_fuzz.proto index a4a18cc100c5..91a43f5d33e6 100644 --- a/test/common/buffer/buffer_fuzz.proto +++ b/test/common/buffer/buffer_fuzz.proto @@ -42,6 +42,7 @@ message Action { uint32 get_raw_slices = 14; Search search = 15; string starts_with = 16; + uint32 copy_out_to_slices = 17; } } From 84d3b810c4cc790824fc81d50658fabc93d8a839 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Mon, 24 Jan 2022 12:23:08 +0000 Subject: [PATCH 31/35] fix format Signed-off-by: He Jie Xu --- .../network/win32_socket_handle_impl.cc | 4 +-- test/common/buffer/buffer_fuzz.cc | 3 +- .../network/win32_socket_handle_impl_test.cc | 32 ++++++++++--------- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/source/common/network/win32_socket_handle_impl.cc b/source/common/network/win32_socket_handle_impl.cc index 2aa653bbece4..ac3a0bd09eff 100644 --- a/source/common/network/win32_socket_handle_impl.cc +++ b/source/common/network/win32_socket_handle_impl.cc @@ -129,8 +129,8 @@ Api::IoCallUint64Result Win32SocketHandleImpl::drainToPeekBuffer(size_t length) size_t total_bytes_read = 0; while (peek_buffer_.length() < length) { Buffer::Reservation reservation = peek_buffer_.reserveForRead(); - uint64_t bytes_to_read = - std::min(static_cast(length - peek_buffer_.length()), reservation.length()); + uint64_t bytes_to_read = std::min( + static_cast(length - peek_buffer_.length()), reservation.length()); Api::IoCallUint64Result result = IoSocketHandleImpl::readv(bytes_to_read, reservation.slices(), reservation.numSlices()); uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0; diff --git a/test/common/buffer/buffer_fuzz.cc b/test/common/buffer/buffer_fuzz.cc index 9e86c18fdd25..918c4e0d493e 100644 --- a/test/common/buffer/buffer_fuzz.cc +++ b/test/common/buffer/buffer_fuzz.cc @@ -114,7 +114,8 @@ class StringBuffer : public Buffer::Instance { ::memcpy(data, this->start() + start, size); } - uint64_t copyOutToSlices(uint64_t length, Buffer::RawSlice* slices, uint64_t num_slices) const override { + uint64_t copyOutToSlices(uint64_t length, Buffer::RawSlice* slices, + uint64_t num_slices) const override { uint64_t size_copied = 0; uint64_t num_slices_copied = 0; while (size_copied < length && num_slices_copied < num_slices) { diff --git a/test/common/network/win32_socket_handle_impl_test.cc b/test/common/network/win32_socket_handle_impl_test.cc index 11adcddb58b3..c06880d9f199 100644 --- a/test/common/network/win32_socket_handle_impl_test.cc +++ b/test/common/network/win32_socket_handle_impl_test.cc @@ -108,14 +108,16 @@ TEST_F(Win32SocketHandleImplTest, RecvWithoutPeekShouldReadFromWire) { TEST_F(Win32SocketHandleImplTest, RecvWithPeekMultipleTimes) { Api::MockOsSysCalls os_sys_calls; TestThreadsafeSingletonInjector os_calls(&os_sys_calls); - EXPECT_CALL(os_sys_calls, readv(_, _, _)).WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iovs) { - size_t size_to_read = 0; - for (auto i = 0; i < num_iovs; i++) { - size_to_read += iov[i].iov_len; - } - EXPECT_EQ(10, size_to_read); - return Api::SysCallSizeResult{5, 0}; - })).WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_AGAIN})); + EXPECT_CALL(os_sys_calls, readv(_, _, _)) + .WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iovs) { + size_t size_to_read = 0; + for (auto i = 0; i < num_iovs; i++) { + size_to_read += iov[i].iov_len; + } + EXPECT_EQ(10, size_to_read); + return Api::SysCallSizeResult{5, 0}; + })) + .WillOnce(Return(Api::SysCallSizeResult{-1, SOCKET_ERROR_AGAIN})); EXPECT_CALL(*file_event_, registerEventIfEmulatedEdge(_)); absl::FixedArray buf(10); @@ -123,13 +125,13 @@ TEST_F(Win32SocketHandleImplTest, RecvWithPeekMultipleTimes) { EXPECT_EQ(rc.return_value_, 5); EXPECT_CALL(os_sys_calls, readv(_, _, _)) .WillOnce(Invoke([&](os_fd_t, const iovec* iov, int num_iovs) { - size_t size_to_read = 0; - for (auto i = 0; i < num_iovs; i++) { - size_to_read += iov[i].iov_len; - } - EXPECT_EQ(5, size_to_read); - return Api::SysCallSizeResult{5, 0}; - })); + size_t size_to_read = 0; + for (auto i = 0; i < num_iovs; i++) { + size_to_read += iov[i].iov_len; + } + EXPECT_EQ(5, size_to_read); + return Api::SysCallSizeResult{5, 0}; + })); auto rc2 = io_handle_.recv(buf.data(), buf.size(), MSG_PEEK); EXPECT_EQ(rc2.return_value_, 10); } From 8b4acf3d58d73b9ba0259e2c164c568c6d95e5b0 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Mon, 24 Jan 2022 12:42:43 +0000 Subject: [PATCH 32/35] address comments Signed-off-by: He Jie Xu --- source/common/buffer/buffer_impl.cc | 33 +++++++++--------- .../network/win32_socket_handle_impl.cc | 34 +++++++++---------- .../common/network/win32_socket_handle_impl.h | 2 +- 3 files changed, 35 insertions(+), 34 deletions(-) diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 50313a65587f..3b1eea4054e4 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -124,25 +124,26 @@ uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* dest_slices uint64_t num_bytes_read = 0; uint64_t num_dest_slices_read = 0; uint64_t num_src_slices_read = 0; - uint64_t dest_slice_off = 0; - uint64_t src_slice_off = 0; - for (; num_dest_slices_read < num_slice && num_bytes_read < total_length_to_read;) { - auto& src_slice = slices_[num_src_slices_read]; - auto& dest_slice = dest_slices[num_dest_slices_read]; - auto length_to_copy = std::min( - src_slice.dataSize() - src_slice_off, - std::min(static_cast(dest_slice.len_), total_length_to_read - num_bytes_read)); - memcpy(static_cast(dest_slice.mem_) + dest_slice_off, // NOLINT(safe-memcpy) - src_slice.data() + src_slice_off, length_to_copy); - src_slice_off = src_slice_off + length_to_copy; - dest_slice_off = dest_slice_off + length_to_copy; - if (src_slice_off == src_slice.dataSize()) { + uint64_t dest_slice_offset = 0; + uint64_t src_slice_offset = 0; + while (num_dest_slices_read < num_slice && num_bytes_read < total_length_to_read) { + const Slice& src_slice = slices_[num_src_slices_read]; + const Buffer::RawSlice& dest_slice = dest_slices[num_dest_slices_read]; + uint64_t left_to_read = total_length_to_read - num_bytes_read; + uint64_t left_data_size_in_slice = src_slice.dataSize() - src_slice_offset; + uint64_t length_to_copy = std::min( + left_data_size_in_slice, std::min(static_cast(dest_slice.len_), left_to_read)); + memcpy(static_cast(dest_slice.mem_) + dest_slice_offset, // NOLINT(safe-memcpy) + src_slice.data() + src_slice_offset, length_to_copy); + src_slice_offset = src_slice_offset + length_to_copy; + dest_slice_offset = dest_slice_offset + length_to_copy; + if (src_slice_offset == src_slice.dataSize()) { num_src_slices_read++; - src_slice_off = 0; + src_slice_offset = 0; } - if (dest_slice_off == dest_slice.len_) { + if (dest_slice_offset == dest_slice.len_) { num_dest_slices_read++; - dest_slice_off = 0; + dest_slice_offset = 0; } num_bytes_read += length_to_copy; } diff --git a/source/common/network/win32_socket_handle_impl.cc b/source/common/network/win32_socket_handle_impl.cc index ac3a0bd09eff..986562d339ba 100644 --- a/source/common/network/win32_socket_handle_impl.cc +++ b/source/common/network/win32_socket_handle_impl.cc @@ -18,24 +18,24 @@ namespace Network { Api::IoCallUint64Result Win32SocketHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices, uint64_t num_slice) { - if (peek_buffer_.length() == 0) { - auto result = IoSocketHandleImpl::readv(max_length, slices, num_slice); - reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); - return result; + if (peek_buffer_.length() != 0) { + return readvFromPeekBuffer(max_length, slices, num_slice); } - return readvFromPeekBuffer(max_length, slices, num_slice); + auto result = IoSocketHandleImpl::readv(max_length, slices, num_slice); + reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); + return result; } Api::IoCallUint64Result Win32SocketHandleImpl::read(Buffer::Instance& buffer, absl::optional max_length_opt) { - if (peek_buffer_.length() == 0) { - auto result = IoSocketHandleImpl::read(buffer, max_length_opt); - reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); - return result; + if (peek_buffer_.length() != 0) { + return readFromPeekBuffer(buffer, max_length_opt.value_or(UINT64_MAX)); } - return readFromPeekBuffer(buffer, max_length_opt.value_or(UINT64_MAX)); + auto result = IoSocketHandleImpl::read(buffer, max_length_opt); + reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read); + return result; } Api::IoCallUint64Result Win32SocketHandleImpl::writev(const Buffer::RawSlice* slices, @@ -80,7 +80,7 @@ Api::IoCallUint64Result Win32SocketHandleImpl::recvmmsg(RawSliceArrays& slices, Api::IoCallUint64Result Win32SocketHandleImpl::recv(void* buffer, size_t length, int flags) { if (flags & MSG_PEEK) { - return peek(buffer, length); + return emulatePeek(buffer, length); } if (peek_buffer_.length() == 0) { @@ -92,17 +92,15 @@ Api::IoCallUint64Result Win32SocketHandleImpl::recv(void* buffer, size_t length, } } -Api::IoCallUint64Result Win32SocketHandleImpl::peek(void* buffer, size_t length) { - - // if the `peek_buffer_` has already enough data we can skip reading - // from the wire. +Api::IoCallUint64Result Win32SocketHandleImpl::emulatePeek(void* buffer, size_t length) { + // If there's not enough data in the peek buffer, try reading more. if (length > peek_buffer_.length()) { // The caller is responsible for calling with the larger size // in cases it needs to do so it can't rely on transparent event activation. - // So no in this case we should activate read again unless the read blocked. + // So in this case we should activate read again unless the read blocked. Api::IoCallUint64Result peek_result = drainToPeekBuffer(length); - // Some error happened + // Some error happened. if (!peek_result.ok()) { if (peek_result.wouldBlock() && file_event_) { file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read); @@ -175,6 +173,7 @@ void Win32SocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, Event::FileTriggerType trigger, uint32_t events) { IoSocketHandleImpl::initializeFileEvent(dispatcher, cb, trigger, events); + // Activate the file event directly when we have the data in the peek_buffer_. if ((events & Event::FileReadyType::Read) && peek_buffer_.length() > 0) { activateFileEvents(Event::FileReadyType::Read); } @@ -182,6 +181,7 @@ void Win32SocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, void Win32SocketHandleImpl::enableFileEvents(uint32_t events) { IoSocketHandleImpl::enableFileEvents(events); + // Activate the file event directly when we have the data in the peek_buffer_. if ((events & Event::FileReadyType::Read) && peek_buffer_.length() > 0) { activateFileEvents(Event::FileReadyType::Read); } diff --git a/source/common/network/win32_socket_handle_impl.h b/source/common/network/win32_socket_handle_impl.h index 2ffd7de5860f..e9fe8e27ea2c 100644 --- a/source/common/network/win32_socket_handle_impl.h +++ b/source/common/network/win32_socket_handle_impl.h @@ -55,7 +55,7 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl { // first from the class buffer and then go to the underlying socket. // Implement the peek logic of recv for readability purposes - Api::IoCallUint64Result peek(void* buffer, size_t length); + Api::IoCallUint64Result emulatePeek(void* buffer, size_t length); /** * Drain the socket into `peek_buffer_`. From 77928713a863fb94368a0650d79fd3ac4552e922 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Mon, 24 Jan 2022 12:46:37 +0000 Subject: [PATCH 33/35] address comment Signed-off-by: He Jie Xu --- source/common/network/win32_socket_handle_impl.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/network/win32_socket_handle_impl.h b/source/common/network/win32_socket_handle_impl.h index e9fe8e27ea2c..f8835e5aed54 100644 --- a/source/common/network/win32_socket_handle_impl.h +++ b/source/common/network/win32_socket_handle_impl.h @@ -50,7 +50,7 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl { private: void reEnableEventBasedOnIOResult(const Api::IoCallUint64Result& result, uint32_t event); - // on Windows we use the MSG_PEEK of recv instead of peeking the socket + // On Windows we use the MSG_PEEK on recv instead of peeking the socket // we drain the socket to memory. Subsequent read calls need to read // first from the class buffer and then go to the underlying socket. From cf83aade544271c06bdefbb2a12e5066d216ab4b Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Tue, 1 Feb 2022 23:52:39 +0000 Subject: [PATCH 34/35] fix copyOutToSlices Signed-off-by: He Jie Xu --- source/common/buffer/buffer_impl.cc | 11 ++++++++--- test/common/buffer/owned_impl_test.cc | 25 +++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/source/common/buffer/buffer_impl.cc b/source/common/buffer/buffer_impl.cc index 3b1eea4054e4..a41493e7a51d 100644 --- a/source/common/buffer/buffer_impl.cc +++ b/source/common/buffer/buffer_impl.cc @@ -130,9 +130,12 @@ uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* dest_slices const Slice& src_slice = slices_[num_src_slices_read]; const Buffer::RawSlice& dest_slice = dest_slices[num_dest_slices_read]; uint64_t left_to_read = total_length_to_read - num_bytes_read; - uint64_t left_data_size_in_slice = src_slice.dataSize() - src_slice_offset; - uint64_t length_to_copy = std::min( - left_data_size_in_slice, std::min(static_cast(dest_slice.len_), left_to_read)); + uint64_t left_data_size_in_dst_slice = dest_slice.len_ - dest_slice_offset; + uint64_t left_data_size_in_src_slice = src_slice.dataSize() - src_slice_offset; + // The length to copy should be size of smallest in the source slice available size and + // the dest slice available size. + uint64_t length_to_copy = + std::min(left_data_size_in_src_slice, std::min(left_data_size_in_dst_slice, left_to_read)); memcpy(static_cast(dest_slice.mem_) + dest_slice_offset, // NOLINT(safe-memcpy) src_slice.data() + src_slice_offset, length_to_copy); src_slice_offset = src_slice_offset + length_to_copy; @@ -145,6 +148,8 @@ uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* dest_slices num_dest_slices_read++; dest_slice_offset = 0; } + ASSERT(src_slice_offset <= src_slice.dataSize()); + ASSERT(dest_slice_offset <= dest_slice.len_); num_bytes_read += length_to_copy; } return num_bytes_read; diff --git a/test/common/buffer/owned_impl_test.cc b/test/common/buffer/owned_impl_test.cc index b8e639d95de0..acbf598f2f42 100644 --- a/test/common/buffer/owned_impl_test.cc +++ b/test/common/buffer/owned_impl_test.cc @@ -1174,6 +1174,31 @@ TEST_F(OwnedImplTest, CopyOutToSlicesTests) { reservation.commit(data.size()); EXPECT_EQ(data, dest_buf.toString()); } + { + Buffer::OwnedImpl src_buffer; + // Create a slice with a small amount of data. + const uint32_t small_data_size = 10; + std::string small_data = std::string(small_data_size, 'a'); + src_buffer.prepend(small_data); + + // Add another slice with a large amount of data. + const uint32_t large_data_size = 16384; + std::string large_data = std::string(large_data_size, 'b'); + BufferFragmentImpl frag(large_data.data(), large_data.size(), nullptr); + src_buffer.addBufferFragment(frag); + EXPECT_EQ(small_data_size + large_data_size, src_buffer.length()); + + // Copy-out from the buffer. + Buffer::OwnedImpl dest_buf; + auto reservation = dest_buf.reserveForRead(); + EXPECT_EQ(small_data_size + large_data_size, + src_buffer.copyOutToSlices(small_data_size + large_data_size, reservation.slices(), + reservation.numSlices())); + reservation.commit(small_data_size + large_data_size); + EXPECT_EQ(absl::StrCat(small_data, large_data), dest_buf.toString()); + + src_buffer.drain(small_data_size + large_data_size); + } } // Slice size large enough to prevent slice content from being coalesced into an existing slice From d789aae07f63e818d02632b5fa6d17322ab84a35 Mon Sep 17 00:00:00 2001 From: He Jie Xu Date: Wed, 16 Feb 2022 03:17:32 +0000 Subject: [PATCH 35/35] remove useless log Signed-off-by: He Jie Xu --- .../filters/listener/proxy_protocol/proxy_protocol_test.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc index 4ac4dac611f8..ecece6746e42 100644 --- a/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc +++ b/test/extensions/filters/listener/proxy_protocol/proxy_protocol_test.cc @@ -918,10 +918,8 @@ TEST_P(ProxyProtocolTest, V2Fragmented5Error) { .Times(AnyNumber()) .WillRepeatedly(Invoke([&](os_fd_t fd, const iovec* iov, int num_iov) { if (partial_write) { - ENVOY_LOG_MISC(debug, "inject failure"); return Api::SysCallSizeResult{-1, 0}; } - ENVOY_LOG_MISC(debug, "wire"); const Api::SysCallSizeResult x = os_sys_calls_actual_.readv(fd, iov, num_iov); return x; }));