Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windows: Emulate MSG_PEEK by read #19777

Merged
merged 45 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
fa5a289
Windows: Emulate MSG_PEEK by read
Nov 4, 2021
c2b5b44
fix format
Nov 4, 2021
fd85009
fix compiler warning
Nov 4, 2021
8972610
fix compilation issue
Nov 4, 2021
4aeaeae
more coverage is needed but still testing ci
Nov 5, 2021
544a1c8
Merge remote-tracking branch 'upstream/main' into peekWindows2
Nov 5, 2021
15fc820
fix format and increase coverage
Nov 5, 2021
4e60815
fix format
Nov 5, 2021
fe11b35
fix gcc
Nov 5, 2021
1534dcd
address PR feedack v1
Nov 8, 2021
3eb8880
address comments
Nov 19, 2021
f2fd1fb
fix spelling
Nov 19, 2021
72e8f20
fix format
Nov 19, 2021
db35fbf
add override keyword
Nov 19, 2021
16caa4f
another format fix
Nov 19, 2021
a479ef8
fix string buffer used in fuzzing
Nov 19, 2021
d3dd4d3
fix typo in fuzzer
Nov 19, 2021
23c6341
add override keyword
Nov 19, 2021
1c1223f
fixed another typo
Nov 19, 2021
abe8be2
remove unused parameters
Nov 19, 2021
31aee37
PR comments and tests
Nov 23, 2021
55361a3
add hint for mac compiler
Nov 23, 2021
320e370
Improve the copyOutToSlices method
soulxu Dec 15, 2021
f73213d
Merge branch 'main' into peekWindows2
soulxu Dec 15, 2021
a2e8799
address comment
soulxu Dec 15, 2021
d113f4c
type conversion for macos
soulxu Dec 15, 2021
0c03750
correct the copyOutToSlices method
soulxu Dec 24, 2021
52828ee
Merge branch 'main' into peekWindows2
soulxu Dec 24, 2021
959170b
remove NOT_IMPLEMENTED_GCOVR_EXCL_LINE
soulxu Dec 24, 2021
d023827
Merge branch 'main' into peekWindows2
soulxu Dec 27, 2021
d2fe6e2
address comment
soulxu Jan 6, 2022
d673110
Merge branch 'main' into peekWindows2
soulxu Jan 6, 2022
fc5b188
address comment
soulxu Jan 11, 2022
9ab00eb
Merge branch 'main' into peekWindows2
soulxu Jan 11, 2022
c100290
Merge branch 'main' into peekWindows2
soulxu Jan 12, 2022
9ff8188
Merge branch 'main' into peekWindows2
soulxu Jan 13, 2022
a8db58b
Fix the peek size
soulxu Jan 24, 2022
88311b1
Add fuzz test for copyOutToSlices
soulxu Jan 24, 2022
84d3b81
fix format
soulxu Jan 24, 2022
8b4acf3
address comments
soulxu Jan 24, 2022
7792871
address comment
soulxu Jan 24, 2022
843ca3c
Merge branch 'main' into peekWindows2
soulxu Jan 24, 2022
4bf0aff
Merge branch 'main' into peekWindows3
soulxu Feb 1, 2022
cf83aad
fix copyOutToSlices
soulxu Feb 1, 2022
d789aae
remove useless log
soulxu Feb 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,8 @@ class FakeBuffer : public Buffer::Instance {
MOCK_METHOD(void, prepend, (absl::string_view), (override));
MOCK_METHOD(void, prepend, (Instance&), (override));
MOCK_METHOD(void, copyOut, (size_t, uint64_t, void*), (const, override));
MOCK_METHOD(uint64_t, copyOutToSlices,
(uint64_t size, Buffer::RawSlice* slices, uint64_t num_slice), (const, override));
MOCK_METHOD(void, drain, (uint64_t), (override));
MOCK_METHOD(Buffer::RawSliceVector, getRawSlices, (absl::optional<uint64_t>), (const, override));
MOCK_METHOD(Buffer::RawSlice, frontSlice, (), (const, override));
Expand Down
10 changes: 10 additions & 0 deletions envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,16 @@ class Instance {
*/
virtual void copyOut(size_t start, uint64_t size, void* data) const PURE;

/**
* Copy out a section of the buffer to dynamic array of slices.
* @param size supplies the size of the data that will be copied.
* @param slices supplies the output slices to fill.
* @param num_slice supplies the number of slices to fill.
* @return the number of bytes copied.
*/
virtual uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices,
uint64_t num_slice) const PURE;

/**
* Drain data from the buffer.
* @param size supplies the length of data to drain.
Expand Down
37 changes: 37 additions & 0 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,43 @@ void OwnedImpl::copyOut(size_t start, uint64_t size, void* data) const {
ASSERT(size == 0);
}

uint64_t OwnedImpl::copyOutToSlices(uint64_t size, Buffer::RawSlice* dest_slices,
uint64_t num_slice) const {
uint64_t total_length_to_read = std::min(size, this->length());
uint64_t num_bytes_read = 0;
uint64_t num_dest_slices_read = 0;
uint64_t num_src_slices_read = 0;
uint64_t dest_slice_offset = 0;
uint64_t src_slice_offset = 0;
while (num_dest_slices_read < num_slice && num_bytes_read < total_length_to_read) {
const Slice& src_slice = slices_[num_src_slices_read];
const Buffer::RawSlice& dest_slice = dest_slices[num_dest_slices_read];
uint64_t left_to_read = total_length_to_read - num_bytes_read;
uint64_t left_data_size_in_dst_slice = dest_slice.len_ - dest_slice_offset;
uint64_t left_data_size_in_src_slice = src_slice.dataSize() - src_slice_offset;
// The length to copy should be size of smallest in the source slice available size and
// the dest slice available size.
uint64_t length_to_copy =
std::min(left_data_size_in_src_slice, std::min(left_data_size_in_dst_slice, left_to_read));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I correct the length_to_copy calculation.

memcpy(static_cast<uint8_t*>(dest_slice.mem_) + dest_slice_offset, // NOLINT(safe-memcpy)
src_slice.data() + src_slice_offset, length_to_copy);
src_slice_offset = src_slice_offset + length_to_copy;
dest_slice_offset = dest_slice_offset + length_to_copy;
if (src_slice_offset == src_slice.dataSize()) {
num_src_slices_read++;
src_slice_offset = 0;
}
if (dest_slice_offset == dest_slice.len_) {
num_dest_slices_read++;
dest_slice_offset = 0;
}
ASSERT(src_slice_offset <= src_slice.dataSize());
ASSERT(dest_slice_offset <= dest_slice.len_);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add two assertions for defending.

num_bytes_read += length_to_copy;
}
return num_bytes_read;
}

void OwnedImpl::drain(uint64_t size) { drainImpl(size); }

void OwnedImpl::drainImpl(uint64_t size) {
Expand Down
2 changes: 2 additions & 0 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,8 @@ class OwnedImpl : public LibEventInstance {
void prepend(absl::string_view data) override;
void prepend(Instance& data) override;
void copyOut(size_t start, uint64_t size, void* data) const override;
uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices,
uint64_t num_slice) const override;
void drain(uint64_t size) override;
RawSliceVector getRawSlices(absl::optional<uint64_t> max_slices = absl::nullopt) const override;
RawSlice frontSlice() const override;
Expand Down
1 change: 1 addition & 0 deletions source/common/network/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ envoy_cc_library(
"//envoy/event:dispatcher_interface",
"//envoy/network:io_handle_interface",
"//source/common/api:os_sys_calls_lib",
"//source/common/buffer:buffer_lib",
"//source/common/event:dispatcher_includes",
"@envoy_api//envoy/extensions/network/socket_interface/v3:pkg_cc_proto",
],
Expand Down
109 changes: 106 additions & 3 deletions source/common/network/win32_socket_handle_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,21 @@ namespace Network {

Api::IoCallUint64Result Win32SocketHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices,
uint64_t num_slice) {
if (peek_buffer_.length() != 0) {
return readvFromPeekBuffer(max_length, slices, num_slice);
}

auto result = IoSocketHandleImpl::readv(max_length, slices, num_slice);
reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read);
return result;
}

Api::IoCallUint64Result Win32SocketHandleImpl::read(Buffer::Instance& buffer,
absl::optional<uint64_t> max_length_opt) {
if (peek_buffer_.length() != 0) {
return readFromPeekBuffer(buffer, max_length_opt.value_or(UINT64_MAX));
}

auto result = IoSocketHandleImpl::read(buffer, max_length_opt);
reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read);
return result;
Expand Down Expand Up @@ -71,10 +79,41 @@ Api::IoCallUint64Result Win32SocketHandleImpl::recvmmsg(RawSliceArrays& slices,
}

Api::IoCallUint64Result Win32SocketHandleImpl::recv(void* buffer, size_t length, int flags) {
if (flags & MSG_PEEK) {
return emulatePeek(buffer, length);
}

Api::IoCallUint64Result result = IoSocketHandleImpl::recv(buffer, length, flags);
reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read);
return result;
if (peek_buffer_.length() == 0) {
Api::IoCallUint64Result result = IoSocketHandleImpl::recv(buffer, length, flags);
reEnableEventBasedOnIOResult(result, Event::FileReadyType::Read);
return result;
} else {
return readFromPeekBuffer(buffer, length);
}
}

Api::IoCallUint64Result Win32SocketHandleImpl::emulatePeek(void* buffer, size_t length) {
// If there's not enough data in the peek buffer, try reading more.
if (length > peek_buffer_.length()) {
// The caller is responsible for calling with the larger size
// in cases it needs to do so it can't rely on transparent event activation.
// So in this case we should activate read again unless the read blocked.
Api::IoCallUint64Result peek_result = drainToPeekBuffer(length);

// Some error happened.
if (!peek_result.ok()) {
if (peek_result.wouldBlock() && file_event_) {
file_event_->registerEventIfEmulatedEdge(Event::FileReadyType::Read);
if (peek_buffer_.length() == 0) {
return peek_result;
}
} else {
return peek_result;
}
}
}

return peekFromPeekBuffer(buffer, length);
}

void Win32SocketHandleImpl::reEnableEventBasedOnIOResult(const Api::IoCallUint64Result& result,
Expand All @@ -84,5 +123,69 @@ void Win32SocketHandleImpl::reEnableEventBasedOnIOResult(const Api::IoCallUint64
}
}

Api::IoCallUint64Result Win32SocketHandleImpl::drainToPeekBuffer(size_t length) {
size_t total_bytes_read = 0;
while (peek_buffer_.length() < length) {
Buffer::Reservation reservation = peek_buffer_.reserveForRead();
uint64_t bytes_to_read = std::min<uint64_t>(
static_cast<uint64_t>(length - peek_buffer_.length()), reservation.length());
Api::IoCallUint64Result result =
IoSocketHandleImpl::readv(bytes_to_read, reservation.slices(), reservation.numSlices());
uint64_t bytes_to_commit = result.ok() ? result.return_value_ : 0;
reservation.commit(bytes_to_commit);
total_bytes_read += bytes_to_commit;
if (!result.ok() || bytes_to_commit == 0) {
return result;
}
}
return Api::IoCallUint64Result(total_bytes_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}

Api::IoCallUint64Result Win32SocketHandleImpl::readFromPeekBuffer(void* buffer, size_t length) {
uint64_t copy_size = std::min(peek_buffer_.length(), static_cast<uint64_t>(length));
peek_buffer_.copyOut(0, copy_size, buffer);
peek_buffer_.drain(copy_size);
return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}

Api::IoCallUint64Result Win32SocketHandleImpl::readvFromPeekBuffer(uint64_t max_length,
Buffer::RawSlice* slices,
uint64_t num_slice) {
uint64_t bytes_read = peek_buffer_.copyOutToSlices(max_length, slices, num_slice);
peek_buffer_.drain(bytes_read);
return Api::IoCallUint64Result(bytes_read, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}

Api::IoCallUint64Result Win32SocketHandleImpl::readFromPeekBuffer(Buffer::Instance& buffer,
size_t length) {
auto lenght_to_move = std::min(peek_buffer_.length(), static_cast<uint64_t>(length));
buffer.move(peek_buffer_, lenght_to_move);
return Api::IoCallUint64Result(lenght_to_move, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}

Api::IoCallUint64Result Win32SocketHandleImpl::peekFromPeekBuffer(void* buffer, size_t length) {
uint64_t copy_size = std::min(peek_buffer_.length(), static_cast<uint64_t>(length));
peek_buffer_.copyOut(0, copy_size, buffer);
return Api::IoCallUint64Result(copy_size, Api::IoErrorPtr(nullptr, [](Api::IoError*) {}));
}

void Win32SocketHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher,
Event::FileReadyCb cb,
Event::FileTriggerType trigger, uint32_t events) {
IoSocketHandleImpl::initializeFileEvent(dispatcher, cb, trigger, events);
// Activate the file event directly when we have the data in the peek_buffer_.
if ((events & Event::FileReadyType::Read) && peek_buffer_.length() > 0) {
activateFileEvents(Event::FileReadyType::Read);
}
}

void Win32SocketHandleImpl::enableFileEvents(uint32_t events) {
IoSocketHandleImpl::enableFileEvents(events);
// Activate the file event directly when we have the data in the peek_buffer_.
if ((events & Event::FileReadyType::Read) && peek_buffer_.length() > 0) {
activateFileEvents(Event::FileReadyType::Read);
}
}

} // namespace Network
} // namespace Envoy
30 changes: 30 additions & 0 deletions source/common/network/win32_socket_handle_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "envoy/event/dispatcher.h"
#include "envoy/network/io_handle.h"

#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/logger.h"
#include "source/common/network/io_socket_error_impl.h"
#include "source/common/network/io_socket_handle_impl.h"
Expand Down Expand Up @@ -42,8 +43,37 @@ class Win32SocketHandleImpl : public IoSocketHandleImpl {
RecvMsgOutput& output) override;
Api::IoCallUint64Result recv(void* buffer, size_t length, int flags) override;

void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
Event::FileTriggerType trigger, uint32_t events) override;
void enableFileEvents(uint32_t events) override;

private:
void reEnableEventBasedOnIOResult(const Api::IoCallUint64Result& result, uint32_t event);

// On Windows we use the MSG_PEEK on recv instead of peeking the socket
// we drain the socket to memory. Subsequent read calls need to read
// first from the class buffer and then go to the underlying socket.

// Implement the peek logic of recv for readability purposes
Api::IoCallUint64Result emulatePeek(void* buffer, size_t length);

/**
* Drain the socket into `peek_buffer_`.
* @param length is the desired length of data drained into the `peek_buffer_`.
* @return the actual length of data drained into the `peek_buffer_`.
*/
Api::IoCallUint64Result drainToPeekBuffer(size_t length);

// Useful functions to read from the peek buffer based on
// the signatures of readv/read/recv OS socket functions.
Api::IoCallUint64Result readFromPeekBuffer(void* buffer, size_t length);
Api::IoCallUint64Result readFromPeekBuffer(Buffer::Instance& buffer, size_t length);
Api::IoCallUint64Result readvFromPeekBuffer(uint64_t max_length, Buffer::RawSlice* slices,
uint64_t num_slice);
Api::IoCallUint64Result peekFromPeekBuffer(void* buffer, size_t length);

// For windows mimic MSG_PEEK
Buffer::OwnedImpl peek_buffer_;
};
} // namespace Network
} // namespace Envoy
3 changes: 3 additions & 0 deletions test/common/buffer/buffer_corpus/basic

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions test/common/buffer/buffer_fuzz.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,21 @@ class StringBuffer : public Buffer::Instance {
::memcpy(data, this->start() + start, size);
}

uint64_t copyOutToSlices(uint64_t length, Buffer::RawSlice* slices,
uint64_t num_slices) const override {
uint64_t size_copied = 0;
uint64_t num_slices_copied = 0;
while (size_copied < length && num_slices_copied < num_slices) {
auto copy_length = std::min((length - size_copied), slices[num_slices_copied].len_);
::memcpy(slices[num_slices_copied].mem_, this->start(), copy_length);
size_copied += copy_length;
if (copy_length == slices[num_slices_copied].len_) {
num_slices_copied++;
}
}
return size_copied;
}

void drain(uint64_t size) override {
FUZZ_ASSERT(size <= size_);
start_ += size;
Expand Down Expand Up @@ -318,6 +333,18 @@ uint32_t bufferAction(Context& ctxt, char insert_value, uint32_t max_alloc, Buff
FUZZ_ASSERT(::memcmp(copy_buffer, data.data() + start, length) == 0);
break;
}
case test::common::buffer::Action::kCopyOutToSlices: {
const uint32_t length =
std::min(static_cast<uint32_t>(target_buffer.length()), action.copy_out_to_slices());
Buffer::OwnedImpl buffer;
auto reservation = buffer.reserveForRead();
auto rc = target_buffer.copyOutToSlices(length, reservation.slices(), reservation.numSlices());
reservation.commit(rc);
const std::string data = buffer.toString();
const std::string target_data = target_buffer.toString();
FUZZ_ASSERT(::memcmp(data.data(), target_data.data(), reservation.length()) == 0);
break;
}
case test::common::buffer::Action::kDrain: {
const uint32_t previous_length = target_buffer.length();
const uint32_t drain_length =
Expand Down
1 change: 1 addition & 0 deletions test/common/buffer/buffer_fuzz.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ message Action {
uint32 get_raw_slices = 14;
Search search = 15;
string starts_with = 16;
uint32 copy_out_to_slices = 17;
}
}

Expand Down
Loading