Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syscall: latch errno deeper in the buffer implementation #3880

Merged
merged 3 commits into from
Jul 19, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <functional>
#include <memory>
#include <string>
#include <tuple>

#include "envoy/common/pure.h"

Expand Down Expand Up @@ -142,9 +143,10 @@ class Instance {
* Read from a file descriptor directly into the buffer.
* @param fd supplies the descriptor to read from.
* @param max_length supplies the maximum length to read.
* @return the number of bytes read or -1 if there was an error.
* @return a tuple with the number of bytes read and the errno. If an error occurs, the number
* of bytes read would indicate -1 and the errno would be non-zero.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about
"a tuple with the number of bytes read and the errno. If an error occurred, the number of bytes read would indicate -1 and the errno would be non-zero. Otherwise if bytes were read, errno should not be used."
?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in e5c8ed1.

*/
virtual int read(int fd, uint64_t max_length) PURE;
virtual std::tuple<int, int> read(int fd, uint64_t max_length) PURE;

/**
* Reserve space in the buffer.
Expand Down Expand Up @@ -173,9 +175,10 @@ class Instance {
/**
* Write the buffer out to a file descriptor.
* @param fd supplies the descriptor to write to.
* @return the number of bytes written or -1 if there was an error.
* @return a tuple with the number of bytes written and the errno. If an error occurs, the
* number of bytes written would indicate -1 and the errno would be non-zero.
*/
virtual int write(int fd) PURE;
virtual std::tuple<int, int> write(int fd) PURE;
};

typedef std::unique_ptr<Instance> InstancePtr;
Expand Down
16 changes: 9 additions & 7 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ void OwnedImpl::move(Instance& rhs, uint64_t length) {
static_cast<LibEventInstance&>(rhs).postProcess();
}

int OwnedImpl::read(int fd, uint64_t max_length) {
std::tuple<int, int> OwnedImpl::read(int fd, uint64_t max_length) {
if (max_length == 0) {
return 0;
return std::make_tuple(0, 0);
}
constexpr uint64_t MaxSlices = 2;
RawSlice slices[MaxSlices];
Expand All @@ -115,8 +115,9 @@ int OwnedImpl::read(int fd, uint64_t max_length) {
ASSERT(num_bytes_to_read <= max_length);
auto& os_syscalls = Api::OsSysCallsSingleton::get();
const ssize_t rc = os_syscalls.readv(fd, iov, static_cast<int>(num_slices_to_read));
const int error = errno;
if (rc < 0) {
return rc;
return std::make_tuple(rc, error);
}
uint64_t num_slices_to_commit = 0;
uint64_t bytes_to_commit = rc;
Expand All @@ -130,7 +131,7 @@ int OwnedImpl::read(int fd, uint64_t max_length) {
}
ASSERT(num_slices_to_commit <= num_slices);
commit(slices, num_slices_to_commit);
return rc;
return std::make_tuple(rc, error);
}

uint64_t OwnedImpl::reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) {
Expand All @@ -151,7 +152,7 @@ ssize_t OwnedImpl::search(const void* data, uint64_t size, size_t start) const {
return result_ptr.pos;
}

int OwnedImpl::write(int fd) {
std::tuple<int, int> OwnedImpl::write(int fd) {
constexpr uint64_t MaxSlices = 16;
RawSlice slices[MaxSlices];
const uint64_t num_slices = std::min(getRawSlices(slices, MaxSlices), MaxSlices);
Expand All @@ -165,14 +166,15 @@ int OwnedImpl::write(int fd) {
}
}
if (num_slices_to_write == 0) {
return 0;
return std::make_tuple(0, 0);
}
auto& os_syscalls = Api::OsSysCallsSingleton::get();
const ssize_t rc = os_syscalls.writev(fd, iov, num_slices_to_write);
const int error = errno;
if (rc > 0) {
drain(static_cast<uint64_t>(rc));
}
return static_cast<int>(rc);
return std::make_tuple(static_cast<int>(rc), error);
}

OwnedImpl::OwnedImpl() : buffer_(evbuffer_new()) {}
Expand Down
4 changes: 2 additions & 2 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ class OwnedImpl : public LibEventInstance {
void* linearize(uint32_t size) override;
void move(Instance& rhs) override;
void move(Instance& rhs, uint64_t length) override;
int read(int fd, uint64_t max_length) override;
std::tuple<int, int> read(int fd, uint64_t max_length) override;
uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override;
ssize_t search(const void* data, uint64_t size, size_t start) const override;
int write(int fd) override;
std::tuple<int, int> write(int fd) override;
void postProcess() override {}
std::string toString() const override;

Expand Down
12 changes: 6 additions & 6 deletions source/common/buffer/watermark_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ void WatermarkBuffer::move(Instance& rhs, uint64_t length) {
checkHighWatermark();
}

int WatermarkBuffer::read(int fd, uint64_t max_length) {
int bytes_read = OwnedImpl::read(fd, max_length);
std::tuple<int, int> WatermarkBuffer::read(int fd, uint64_t max_length) {
std::tuple<int, int> result = OwnedImpl::read(fd, max_length);
checkHighWatermark();
return bytes_read;
return result;
}

uint64_t WatermarkBuffer::reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) {
Expand All @@ -52,10 +52,10 @@ uint64_t WatermarkBuffer::reserve(uint64_t length, RawSlice* iovecs, uint64_t nu
return bytes_reserved;
}

int WatermarkBuffer::write(int fd) {
int bytes_written = OwnedImpl::write(fd);
std::tuple<int, int> WatermarkBuffer::write(int fd) {
std::tuple<int, int> result = OwnedImpl::write(fd);
checkLowWatermark();
return bytes_written;
return result;
}

void WatermarkBuffer::setWatermarks(uint32_t low_watermark, uint32_t high_watermark) {
Expand Down
4 changes: 2 additions & 2 deletions source/common/buffer/watermark_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class WatermarkBuffer : public OwnedImpl {
void drain(uint64_t size) override;
void move(Instance& rhs) override;
void move(Instance& rhs, uint64_t length) override;
int read(int fd, uint64_t max_length) override;
std::tuple<int, int> read(int fd, uint64_t max_length) override;
uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override;
int write(int fd) override;
std::tuple<int, int> write(int fd) override;
void postProcess() override { checkLowWatermark(); }

void setWatermarks(uint32_t watermark) { setWatermarks(watermark / 2, watermark); }
Expand Down
10 changes: 6 additions & 4 deletions source/common/network/raw_buffer_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ IoResult RawBufferSocket::doRead(Buffer::Instance& buffer) {
bool end_stream = false;
do {
// 16K read is arbitrary. TODO(mattklein123) PERF: Tune the read size.
int rc = buffer.read(callbacks_->fd(), 16384);
const int error = errno; // Latch errno before any logging calls can overwrite it.
std::tuple<int, int> result = buffer.read(callbacks_->fd(), 16384);
const int rc = std::get<0>(result);
const int error = std::get<1>(result);
ENVOY_CONN_LOG(trace, "read returns: {}", callbacks_->connection(), rc);

if (rc == 0) {
Expand Down Expand Up @@ -60,8 +61,9 @@ IoResult RawBufferSocket::doWrite(Buffer::Instance& buffer, bool end_stream) {
action = PostIoAction::KeepOpen;
break;
}
int rc = buffer.write(callbacks_->fd());
const int error = errno; // Latch errno before any logging calls can overwrite it.
std::tuple<int, int> result = buffer.write(callbacks_->fd());
const int rc = std::get<0>(result);
const int error = std::get<1>(result);
ENVOY_CONN_LOG(trace, "write returns: {}", callbacks_->connection(), rc);
if (rc == -1) {
ENVOY_CONN_LOG(trace, "write error: {} ({})", callbacks_->connection(), error,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ class BufferWrapper : public Buffer::Instance {
}
void move(Buffer::Instance&) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
void move(Buffer::Instance&, uint64_t) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
int read(int, uint64_t) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
std::tuple<int, int> read(int, uint64_t) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
uint64_t reserve(uint64_t, Buffer::RawSlice*, uint64_t) override {
NOT_IMPLEMENTED_GCOVR_EXCL_LINE;
}
ssize_t search(const void*, uint64_t, size_t) const override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
int write(int) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
std::tuple<int, int> write(int) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
std::string toString() const override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }

private:
Expand Down
36 changes: 18 additions & 18 deletions test/common/buffer/owned_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,34 +83,34 @@ TEST_F(OwnedImplTest, Write) {
Buffer::OwnedImpl buffer;
buffer.add("example");
EXPECT_CALL(os_sys_calls, writev(_, _, _)).WillOnce(Return(7));
int rc = buffer.write(-1);
EXPECT_EQ(7, rc);
std::tuple<int, int> result = buffer.write(-1);
EXPECT_EQ(7, std::get<0>(result));
EXPECT_EQ(0, buffer.length());

buffer.add("example");
EXPECT_CALL(os_sys_calls, writev(_, _, _)).WillOnce(Return(6));
rc = buffer.write(-1);
EXPECT_EQ(6, rc);
result = buffer.write(-1);
EXPECT_EQ(6, std::get<0>(result));
EXPECT_EQ(1, buffer.length());

EXPECT_CALL(os_sys_calls, writev(_, _, _)).WillOnce(Return(0));
rc = buffer.write(-1);
EXPECT_EQ(0, rc);
result = buffer.write(-1);
EXPECT_EQ(0, std::get<0>(result));
EXPECT_EQ(1, buffer.length());

EXPECT_CALL(os_sys_calls, writev(_, _, _)).WillOnce(Return(-1));
rc = buffer.write(-1);
EXPECT_EQ(-1, rc);
result = buffer.write(-1);
EXPECT_EQ(-1, std::get<0>(result));
EXPECT_EQ(1, buffer.length());

EXPECT_CALL(os_sys_calls, writev(_, _, _)).WillOnce(Return(1));
rc = buffer.write(-1);
EXPECT_EQ(1, rc);
result = buffer.write(-1);
EXPECT_EQ(1, std::get<0>(result));
EXPECT_EQ(0, buffer.length());

EXPECT_CALL(os_sys_calls, writev(_, _, _)).Times(0);
rc = buffer.write(-1);
EXPECT_EQ(0, rc);
result = buffer.write(-1);
EXPECT_EQ(0, std::get<0>(result));
EXPECT_EQ(0, buffer.length());
}

Expand All @@ -120,18 +120,18 @@ TEST_F(OwnedImplTest, Read) {

Buffer::OwnedImpl buffer;
EXPECT_CALL(os_sys_calls, readv(_, _, _)).WillOnce(Return(0));
int rc = buffer.read(-1, 100);
EXPECT_EQ(0, rc);
std::tuple<int, int> result = buffer.read(-1, 100);
EXPECT_EQ(0, std::get<0>(result));
EXPECT_EQ(0, buffer.length());

EXPECT_CALL(os_sys_calls, readv(_, _, _)).WillOnce(Return(-1));
rc = buffer.read(-1, 100);
EXPECT_EQ(-1, rc);
result = buffer.read(-1, 100);
EXPECT_EQ(-1, std::get<0>(result));
EXPECT_EQ(0, buffer.length());

EXPECT_CALL(os_sys_calls, readv(_, _, _)).Times(0);
rc = buffer.read(-1, 0);
EXPECT_EQ(0, rc);
result = buffer.read(-1, 0);
EXPECT_EQ(0, std::get<0>(result));
EXPECT_EQ(0, buffer.length());
}

Expand Down
6 changes: 4 additions & 2 deletions test/common/buffer/watermark_buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ TEST_F(WatermarkBufferTest, WatermarkFdFunctions) {

int bytes_written_total = 0;
while (bytes_written_total < 20) {
int bytes_written = buffer_.write(pipe_fds[1]);
std::tuple<int, int> result = buffer_.write(pipe_fds[1]);
int bytes_written = std::get<0>(result);
if (bytes_written < 0) {
ASSERT_EQ(EAGAIN, errno);
} else {
Expand All @@ -144,7 +145,8 @@ TEST_F(WatermarkBufferTest, WatermarkFdFunctions) {

int bytes_read_total = 0;
while (bytes_read_total < 20) {
bytes_read_total += buffer_.read(pipe_fds[0], 20);
std::tuple<int, int> result = buffer_.read(pipe_fds[0], 20);
bytes_read_total += std::get<0>(result);
}
EXPECT_EQ(2, times_high_watermark_called_);
EXPECT_EQ(20, buffer_.length());
Expand Down
4 changes: 2 additions & 2 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ TEST_P(ConnectionImplTest, WriteWithWatermarks) {
EXPECT_CALL(*client_write_buffer_, move(_))
.WillRepeatedly(DoAll(AddBufferToStringWithoutDraining(&data_written),
Invoke(client_write_buffer_, &MockWatermarkBuffer::baseMove)));
EXPECT_CALL(*client_write_buffer_, write(_)).WillOnce(Invoke([&](int fd) -> int {
EXPECT_CALL(*client_write_buffer_, write(_)).WillOnce(Invoke([&](int fd) -> std::tuple<int, int> {
dispatcher_->exit();
return client_write_buffer_->failWrite(fd);
}));
Expand Down Expand Up @@ -764,7 +764,7 @@ TEST_P(ConnectionImplTest, WatermarkFuzzing) {
.WillOnce(Invoke(client_write_buffer_, &MockWatermarkBuffer::baseMove));
EXPECT_CALL(*client_write_buffer_, write(_))
.WillOnce(DoAll(Invoke([&](int) -> void { client_write_buffer_->drain(bytes_to_flush); }),
Return(bytes_to_flush)))
Return(std::make_tuple(bytes_to_flush, 0))))
.WillRepeatedly(testing::Invoke(client_write_buffer_, &MockWatermarkBuffer::failWrite));
client_connection_->write(buffer_to_write, false);
dispatcher_->run(Event::Dispatcher::RunType::NonBlock);
Expand Down
14 changes: 6 additions & 8 deletions test/mocks/buffer/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ template <class BaseClass> class MockBufferBase : public BaseClass {
MockBufferBase();
MockBufferBase(std::function<void()> below_low, std::function<void()> above_high);

MOCK_METHOD1(write, int(int fd));
MOCK_METHOD1(write, std::tuple<int, int>(int fd));
MOCK_METHOD1(move, void(Buffer::Instance& rhs));
MOCK_METHOD2(move, void(Buffer::Instance& rhs, uint64_t length));
MOCK_METHOD1(drain, void(uint64_t size));

void baseMove(Buffer::Instance& rhs) { BaseClass::move(rhs); }
void baseDrain(uint64_t size) { BaseClass::drain(size); }

int trackWrites(int fd) {
int bytes_written = BaseClass::write(fd);
std::tuple<int, int> trackWrites(int fd) {
std::tuple<int, int> result = BaseClass::write(fd);
int bytes_written = std::get<0>(result);
if (bytes_written > 0) {
bytes_written_ += bytes_written;
}
return bytes_written;
return result;
}

void trackDrains(uint64_t size) {
Expand All @@ -38,10 +39,7 @@ template <class BaseClass> class MockBufferBase : public BaseClass {
}

// A convenience function to invoke on write() which fails the write with EAGAIN.
int failWrite(int) {
errno = EAGAIN;
return -1;
}
std::tuple<int, int> failWrite(int) { return std::make_tuple(-1, EAGAIN); }

int bytes_written() const { return bytes_written_; }
uint64_t bytes_drained() const { return bytes_drained_; }
Expand Down