Skip to content

Commit

Permalink
Windows: Emulate MSG_PEEK by read (#19777)
Browse files Browse the repository at this point in the history
Windows: Emulate MSG_PEEK by read

The way we implement i/o events on windows poses problems with peeking messages.

The reason for that is that peeking does not drain the buffer which is an implicit
requirement. This can cause slow clients to make envoy connections to hang.

We solve this issue by implementing peek by draining the socket and storing the
content of the socket to a buffer.

Additional Description: See #17395
Risk Level: Medium
Testing: Automated
Platform Specific Features: Windows only

Signed-off-by: Sotiris Nanopoulos <sonanopo@microsoft.com>
Signed-off-by: He Jie Xu <hejie.xu@intel.com>

Co-authored-by: Sotiris Nanopoulos <sonanopo@microsoft.com>
Co-authored-by: He Jie Xu <hejie.xu@intel.com>
  • Loading branch information
soulxu and Sotiris Nanopoulos authored Feb 16, 2022
1 parent dca672a commit 7cddecb
Show file tree
Hide file tree
Showing 14 changed files with 606 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,8 @@ class FakeBuffer : public Buffer::Instance {
MOCK_METHOD(void, prepend, (absl::string_view), (override));
MOCK_METHOD(void, prepend, (Instance&), (override));
MOCK_METHOD(void, copyOut, (size_t, uint64_t, void*), (const, override));
MOCK_METHOD(uint64_t, copyOutToSlices,
(uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice), (const, override));
MOCK_METHOD(void, drain, (uint64_t), (override));
MOCK_METHOD(Buffer::RawSliceVector, getRawSlices, (absl::optional<uint64_t>), (const, override));
MOCK_METHOD(Buffer::RawSlice, frontSlice, (), (const, override));
Expand Down
10 changes: 10 additions & 0 deletions envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,16 @@ class Instance {
*/
virtual void copyOut(size_t start, uint64_t size, void* data) const PURE;

/**
* Copy out a section of the buffer to dynamic array of slices.
* @param size supplies the size of the data that will be copied.
* @param slices supplies the output slices to fill.
* @param num_slice supplies the number of slices to fill.
* @return the number of bytes copied.
*/
virtual uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices,
uint64_t num_slice) const PURE;

/**
* Drain data from the buffer.
* @param size supplies the length of data to drain.
Expand Down
37 changes: 37 additions & 0 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,43 @@ void OwnedImpl::copyOut(size_t start, uint64_t size, void* data) const {
ASSERT(size == 0);
}

uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* dest_slices,
uint64_t num_slice) const {
uint64_t total_length_to_read = std::min(size, this->length());
uint64_t num_bytes_read = 0;
uint64_t num_dest_slices_read = 0;
uint64_t num_src_slices_read = 0;
uint64_t dest_slice_offset = 0;
uint64_t src_slice_offset = 0;
while (num_dest_slices_read < num_slice && num_bytes_read < total_length_to_read) {
const Slice& src_slice = slices_[num_src_slices_read];
const Buffer::RawSlice& dest_slice = dest_slices[num_dest_slices_read];
uint64_t left_to_read = total_length_to_read - num_bytes_read;
uint64_t left_data_size_in_dst_slice = dest_slice.len_ - dest_slice_offset;
uint64_t left_data_size_in_src_slice = src_slice.dataSize() - src_slice_offset;
// The length to copy should be size of smallest in the source slice available size and
// the dest slice available size.
uint64_t length_to_copy =
std::min(left_data_size_in_src_slice, std::min(left_data_size_in_dst_slice, left_to_read));
memcpy(static_cast<uint8_t*>(dest_slice.mem_) + dest_slice_offset, // NOLINT(safe-memcpy)
src_slice.data() + src_slice_offset, length_to_copy);
src_slice_offset = src_slice_offset + length_to_copy;
dest_slice_offset = dest_slice_offset + length_to_copy;
if (src_slice_offset == src_slice.dataSize()) {
num_src_slices_read++;
src_slice_offset = 0;
}
if (dest_slice_offset == dest_slice.len_) {
num_dest_slices_read++;
dest_slice_offset = 0;
}
ASSERT(src_slice_offset <= src_slice.dataSize());
ASSERT(dest_slice_offset <= dest_slice.len_);
num_bytes_read += length_to_copy;
}
return num_bytes_read;
}

void OwnedImpl::drain(uint64_t size) { drainImpl(size); }

void OwnedImpl::drainImpl(uint64_t size) {
Expand Down
2 changes: 2 additions & 0 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,8 @@ class OwnedImpl : public LibEventInstance {
void prepend(absl::string_view data) override;
void prepend(Instance& data) override;
void copyOut(size_t start, uint64_t size, void* data) const override;
uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices,
uint64_t num_slice) const override;
void drain(uint64_t size) override;
RawSliceVector getRawSlices(absl::optional<uint64_t> max_slices = absl::nullopt) const override;
RawSlice frontSlice() const override;
Expand Down
1 change: 1 addition & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ envoy_cc_library(
"//envoy/event:dispatcher_interface",
"//envoy/network:io_handle_interface",
"//source/common/api:os_sys_calls_lib",
"//source/common/buffer:buffer_lib",
"//source/common/event:dispatcher_includes",
"@envoy_api//envoy/extensions/network/socket_interface/v3:pkg_cc_proto",
],
Expand Down
109 changes: 106 additions & 3 deletions source/common/network/win32_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@ namespace Network {

Api::IoCallUint64Result Win32SocketHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices,
uint64_t num_slice) {
if (peek_buffer_.length() != 0) {
return readvFromPeekBuffer(max_length, slices, num_slice);
}

auto result = IoSocketHandleImpl::readv(max_length, slices, num_slice);
reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read);
return result;
}

Api::IoCallUint64Result Win32SocketHandleImpl::read(Buffer::Instance& buffer,
absl::optional<uint64_t> max_length_opt) {
if (peek_buffer_.length() != 0) {
return readFromPeekBuffer(buffer, max_length_opt.value_or(UINT64_MAX));
}

auto result = IoSocketHandleImpl::read(buffer, max_length_opt);
reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read);
return result;
Expand Down Expand Up @@ -71,10 +79,41 @@ Api::IoCallUint64Result Win32SocketHandleImpl::recvmmsg(RawSliceArrays& slices,
}

Api::IoCallUint64Result Win32SocketHandleImpl::recv(void* buffer, size_t length, int flags) {
if (flags & MSG_PEEK) {
return emulatePeek(buffer, length);
}

Api::IoCallUint64Result result = IoSocketHandleImpl::recv(buffer, length, flags);
reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read);
return result;
if (peek_buffer_.length() == 0) {
Api::IoCallUint64Result result = IoSocketHandleImpl::recv(buffer, length, flags);
reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read);
return result;
} else {
return readFromPeekBuffer(buffer, length);
}
}

Api::IoCallUint64Result Win32SocketHandleImpl::emulatePeek(void* buffer, size_t length) {
// If there's not enough data in the peek buffer, try reading more.
if (length > peek_buffer_.length()) {
// The caller is responsible for calling with the larger size
// in cases it needs to do so it can't rely on transparent event activation.
// So in this case we should activate read again unless the read blocked.
Api::IoCallUint64Result peek_result = drainToPeekBuffer(length);

// Some error happened.
if (!peek_result.ok()) {
if (peek_result.wouldBlock() && file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
if (peek_buffer_.length() == 0) {
return peek_result;
}
} else {
return peek_result;
}
}
}

return peekFromPeekBuffer(buffer, length);
}

void Win32SocketHandleImpl::reEnableEventBasedOnIOResult(const Api::IoCallUint64Result& result,
Expand All @@ -84,5 +123,69 @@ void Win32SocketHandleImpl::reEnableEventBasedOnIOResult(const Api::IoCallUint64
}
}

Api::IoCallUint64Result Win32SocketHandleImpl::drainToPeekBuffer(size_t length) {
size_t total_bytes_read = 0;
while (peek_buffer_.length() < length) {
Buffer::Reservation reservation = peek_buffer_.reserveForRead();
uint64_t bytes_to_read = std::min<uint64_t>(
static_cast<uint64_t>(length - peek_buffer_.length()), reservation.length());
Api::IoCallUint64Result result =
IoSocketHandleImpl::readv(bytes_to_read, reservation.slices(), reservation.numSlices());
uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0;
reservation.commit(bytes_to_commit);
total_bytes_read += bytes_to_commit;
if (!result.ok() || bytes_to_commit == 0) {
return result;
}
}
return Api::IoCallUint64Result(total_bytes_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}

Api::IoCallUint64Result Win32SocketHandleImpl::readFromPeekBuffer(void* buffer, size_t length) {
uint64_t copy_size = std::min(peek_buffer_.length(), static_cast<uint64_t>(length));
peek_buffer_.copyOut(0, copy_size, buffer);
peek_buffer_.drain(copy_size);
return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}

Api::IoCallUint64Result Win32SocketHandleImpl::readvFromPeekBuffer(uint64_t max_length,
Buffer::RawSlice* slices,
uint64_t num_slice) {
uint64_t bytes_read = peek_buffer_.copyOutToSlices(max_length, slices, num_slice);
peek_buffer_.drain(bytes_read);
return Api::IoCallUint64Result(bytes_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}

Api::IoCallUint64Result Win32SocketHandleImpl::readFromPeekBuffer(Buffer::Instance& buffer,
size_t length) {
auto lenght_to_move = std::min(peek_buffer_.length(), static_cast<uint64_t>(length));
buffer.move(peek_buffer_, lenght_to_move);
return Api::IoCallUint64Result(lenght_to_move, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}

Api::IoCallUint64Result Win32SocketHandleImpl::peekFromPeekBuffer(void* buffer, size_t length) {
uint64_t copy_size = std::min(peek_buffer_.length(), static_cast<uint64_t>(length));
peek_buffer_.copyOut(0, copy_size, buffer);
return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}

void Win32SocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher,
Event::FileReadyCb cb,
Event::FileTriggerType trigger, uint32_t events) {
IoSocketHandleImpl::initializeFileEvent(dispatcher, cb, trigger, events);
// Activate the file event directly when we have the data in the peek_buffer_.
if ((events & Event::FileReadyType::Read) && peek_buffer_.length() > 0) {
activateFileEvents(Event::FileReadyType::Read);
}
}

void Win32SocketHandleImpl::enableFileEvents(uint32_t events) {
IoSocketHandleImpl::enableFileEvents(events);
// Activate the file event directly when we have the data in the peek_buffer_.
if ((events & Event::FileReadyType::Read) && peek_buffer_.length() > 0) {
activateFileEvents(Event::FileReadyType::Read);
}
}

} // namespace Network
} // namespace Envoy
30 changes: 30 additions & 0 deletions source/common/network/win32_socket_handle_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/event/dispatcher.h"
#include "envoy/network/io_handle.h"

#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/logger.h"
#include "source/common/network/io_socket_error_impl.h"
#include "source/common/network/io_socket_handle_impl.h"
Expand Down Expand Up @@ -42,8 +43,37 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl {
RecvMsgOutput& output) override;
Api::IoCallUint64Result recv(void* buffer, size_t length, int flags) override;

void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
Event::FileTriggerType trigger, uint32_t events) override;
void enableFileEvents(uint32_t events) override;

private:
void reEnableEventBasedOnIOResult(const Api::IoCallUint64Result& result, uint32_t event);

// On Windows we use the MSG_PEEK on recv instead of peeking the socket
// we drain the socket to memory. Subsequent read calls need to read
// first from the class buffer and then go to the underlying socket.

// Implement the peek logic of recv for readability purposes
Api::IoCallUint64Result emulatePeek(void* buffer, size_t length);

/**
* Drain the socket into `peek_buffer_`.
* @param length is the desired length of data drained into the `peek_buffer_`.
* @return the actual length of data drained into the `peek_buffer_`.
*/
Api::IoCallUint64Result drainToPeekBuffer(size_t length);

// Useful functions to read from the peek buffer based on
// the signatures of readv/read/recv OS socket functions.
Api::IoCallUint64Result readFromPeekBuffer(void* buffer, size_t length);
Api::IoCallUint64Result readFromPeekBuffer(Buffer::Instance& buffer, size_t length);
Api::IoCallUint64Result readvFromPeekBuffer(uint64_t max_length, Buffer::RawSlice* slices,
uint64_t num_slice);
Api::IoCallUint64Result peekFromPeekBuffer(void* buffer, size_t length);

// For windows mimic MSG_PEEK
Buffer::OwnedImpl peek_buffer_;
};
} // namespace Network
} // namespace Envoy
3 changes: 3 additions & 0 deletions test/common/buffer/buffer_corpus/basic

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions test/common/buffer/buffer_fuzz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ class StringBuffer : public Buffer::Instance {
::memcpy(data, this->start() + start, size);
}

uint64_t copyOutToSlices(uint64_t length, Buffer::RawSlice* slices,
uint64_t num_slices) const override {
uint64_t size_copied = 0;
uint64_t num_slices_copied = 0;
while (size_copied < length && num_slices_copied < num_slices) {
auto copy_length = std::min((length - size_copied), slices[num_slices_copied].len_);
::memcpy(slices[num_slices_copied].mem_, this->start(), copy_length);
size_copied += copy_length;
if (copy_length == slices[num_slices_copied].len_) {
num_slices_copied++;
}
}
return size_copied;
}

void drain(uint64_t size) override {
FUZZ_ASSERT(size <= size_);
start_ += size;
Expand Down Expand Up @@ -318,6 +333,18 @@ uint32_t bufferAction(Context& ctxt, char insert_value, uint32_t max_alloc, Buff
FUZZ_ASSERT(::memcmp(copy_buffer, data.data() + start, length) == 0);
break;
}
case test::common::buffer::Action::kCopyOutToSlices: {
const uint32_t length =
std::min(static_cast<uint32_t>(target_buffer.length()), action.copy_out_to_slices());
Buffer::OwnedImpl buffer;
auto reservation = buffer.reserveForRead();
auto rc = target_buffer.copyOutToSlices(length, reservation.slices(), reservation.numSlices());
reservation.commit(rc);
const std::string data = buffer.toString();
const std::string target_data = target_buffer.toString();
FUZZ_ASSERT(::memcmp(data.data(), target_data.data(), reservation.length()) == 0);
break;
}
case test::common::buffer::Action::kDrain: {
const uint32_t previous_length = target_buffer.length();
const uint32_t drain_length =
Expand Down
1 change: 1 addition & 0 deletions test/common/buffer/buffer_fuzz.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ message Action {
uint32 get_raw_slices = 14;
Search search = 15;
string starts_with = 16;
uint32 copy_out_to_slices = 17;
}
}

Expand Down
Loading

0 comments on commit 7cddecb

Please sign in to comment.