Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport to v1.15: connection: Remember transport socket read resumption requests and replay them when re-enabling read. (#13772) #14173

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ Changes
-------
* listener: fix crash when disabling or re-enabling listeners due to overload while processing LDS updates.
* proxy_proto: fixed a bug where network filters would not have the correct downstreamRemoteAddress() when accessed from the StreamInfo. This could result in incorrect enforcement of RBAC rules in the RBAC network filter (but not in the RBAC HTTP filter), or incorrect access log addresses from tcp_proxy.
* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers.
32 changes: 23 additions & 9 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
write_buffer_above_high_watermark_(false), detect_early_close_(true),
enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false) {
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
transport_wants_read_(false) {
// Treat the lack of a valid fd (which in practice only happens if we run out of FDs) as an OOM
// condition and just crash.
RELEASE_ASSERT(SOCKET_VALID(ConnectionImpl::ioHandle().fd()), "");
Expand Down Expand Up @@ -189,7 +190,7 @@ Connection::State ConnectionImpl::state() const {

void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }

bool ConnectionImpl::consumerWantsToRead() {
bool ConnectionImpl::filterChainWantsData() {
return read_disable_count_ == 0 ||
(read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered());
}
Expand Down Expand Up @@ -268,7 +269,7 @@ void ConnectionImpl::noDelay(bool enable) {
}

void ConnectionImpl::onRead(uint64_t read_buffer_size) {
if (inDelayedClose() || !consumerWantsToRead()) {
if (inDelayedClose() || !filterChainWantsData()) {
return;
}
ASSERT(ioHandle().isOpen());
Expand Down Expand Up @@ -354,11 +355,17 @@ void ConnectionImpl::readDisable(bool disable) {
file_event_->setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write);
}

if (consumerWantsToRead() && read_buffer_.length() > 0) {
// If the connection has data buffered there's no guarantee there's also data in the kernel
// which will kick off the filter chain. Alternately if the read buffer has data the fd could
// be read disabled. To handle these cases, fake an event to make sure the buffered data gets
// processed regardless and ensure that we dispatch it via onRead.
if (filterChainWantsData() && (read_buffer_.length() > 0 || transport_wants_read_)) {
// If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be
// able to process additional bytes even if there is no data in the kernel to kick off the
// filter chain. Alternately if the read buffer has data the fd could be read disabled. To
// handle these cases, fake an event to make sure the buffered data in the read buffer or in
// transport socket internal buffers gets processed regardless and ensure that we dispatch it
// via onRead.

// Sanity check: resumption with read_disable_count_ > 0 should only happen if the read
// buffer's high watermark has triggered.
ASSERT(read_buffer_.length() > 0 || read_disable_count_ == 0);
dispatch_buffered_data_ = true;
setReadBufferReady();
}
Expand Down Expand Up @@ -553,12 +560,19 @@ void ConnectionImpl::onReadReady() {
// 2) The consumer of connection data called readDisable(true), and instead of reading from the
// socket we simply need to dispatch already read data.
if (read_disable_count_ != 0) {
if (latched_dispatch_buffered_data && consumerWantsToRead()) {
// Do not clear transport_wants_read_ when returning early; the early return skips the transport
// socket doRead call.
if (latched_dispatch_buffered_data && filterChainWantsData()) {
onRead(read_buffer_.length());
}
return;
}

// Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
// the transport socket read resumption happens as requested; onReadReady() returns early without
// reading from the transport if the read buffer is above high watermark at the start of the
// method.
transport_wants_read_ = false;
IoResult result = transport_socket_->doRead(read_buffer_);
uint64_t new_buffer_size = read_buffer_.length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);
Expand Down
19 changes: 13 additions & 6 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// TODO(htuch): While this is the basis for also yielding to other connections to provide some
// fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
// Reconsider how to make fairness happen.
void setReadBufferReady() override { file_event_->activate(Event::FileReadyType::Read); }
void setReadBufferReady() override {
transport_wants_read_ = true;
file_event_->activate(Event::FileReadyType::Read);
}
void flushWriteBuffer() override;

// Obtain global next connection ID. This should only be used in tests.
Expand All @@ -125,11 +128,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// A convenience function which returns true if
// 1) The read disable count is zero or
// 2) The read disable count is one due to the read buffer being overrun.
// In either case the consumer of the data would like to read from the buffer.
// If the read count is greater than one, or equal to one when the buffer is
// not overrun, then the consumer of the data has called readDisable, and does
// not want to read.
bool consumerWantsToRead();
// In either case the filter chain would like to process data from the read buffer or transport
// socket. If the read count is greater than one, or equal to one when the buffer is not overrun,
// then the filter chain has called readDisable, and does not want additional data.
bool filterChainWantsData();

// Network::ConnectionImplBase
void closeConnectionImmediately() override;
Expand Down Expand Up @@ -195,6 +197,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
bool write_end_stream_ : 1;
bool current_write_end_stream_ : 1;
bool dispatch_buffered_data_ : 1;
// True if the most recent call to the transport socket's doRead method invoked setReadBufferReady
// to schedule read resumption after yielding due to shouldDrainReadBuffer(). When true,
// readDisable must schedule read resumption when read_disable_count_ == 0 to ensure that read
// resumption happens when remaining bytes are held in transport socket internal buffers.
bool transport_wants_read_ : 1;
};

/**
Expand Down
191 changes: 191 additions & 0 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1657,6 +1657,197 @@ TEST_F(MockTransportConnectionImplTest, ObjectDestructOrder) {
file_ready_cb_(Event::FileReadyType::Read);
}

// Verify that read resumptions requested via setReadBufferReady() are scheduled once read is
// re-enabled.
TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// No calls to activate when re-enabling if there are no pending read requests.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);

// setReadBufferReady triggers an immediate call to activate.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->setReadBufferReady();

// When processing a sequence of read disable/read enable, changes to the enabled event mask
// happen only when the disable count transitions to/from 0.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
connection_->readDisable(false);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// Expect a read activation since there have been no transport doRead calls since the call to
// setReadBufferReady.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// No calls to doRead when file_ready_cb is invoked while read disabled.
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
file_ready_cb_(Event::FileReadyType::Read);

// Expect a read activate when re-enabling since the file ready cb has not done a read.
EXPECT_CALL(*file_event_, setEnabled(_));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Do a read to clear the transport_wants_read_ flag, verify that no read activation is scheduled.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
file_ready_cb_(Event::FileReadyType::Read);
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
// No read activate call.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Verify that read resumption is scheduled when read is re-enabled while the read buffer is
// non-empty.
TEST_F(MockTransportConnectionImplTest, ReadBufferResumeAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->setBufferLimits(5);
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

// Add some data to the read buffer to trigger read activate calls when re-enabling read.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Invoke([](Buffer::Instance& buffer) -> IoResult {
buffer.add("0123456789");
return {PostIoAction::KeepOpen, 10, false};
}));
// Expect a change to the event mask when hitting the read buffer high-watermark.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue));
EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue));
file_ready_cb_(Event::FileReadyType::Read);

// Already read disabled, expect no changes to enabled events mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
// Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read
// activate but no call to setEnable to change the registration mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Invoke the file event cb while read_disable_count_ == 1 to partially drain the read buffer.
// Expect no transport reads.
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
EXPECT_CALL(*read_filter, onData(_, _))
.WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus {
EXPECT_EQ(10, data.length());
data.drain(data.length() - 1);
return FilterStatus::Continue;
}));
// Partial drain of the read buffer below low watermark triggers an update to the fd enabled mask
// and a read activate since the read buffer is not empty.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
file_ready_cb_(Event::FileReadyType::Read);

// Drain the rest of the buffer and verify there are no spurious read activate calls.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
EXPECT_CALL(*read_filter, onData(_, _))
.WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus {
EXPECT_EQ(1, data.length());
data.drain(1);
return FilterStatus::Continue;
}));
file_ready_cb_(Event::FileReadyType::Read);

EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
// read buffer is empty, no read activate call.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Verify that transport_wants_read_ read resumption is not lost when processing read buffer
// high-watermark resumptions.
TEST_F(MockTransportConnectionImplTest, ResumeWhileAndAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->setBufferLimits(5);
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

// Add some data to the read buffer and also call setReadBufferReady to mimic what transport
// sockets are expected to do when the read buffer high watermark is hit.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult {
buffer.add("0123456789");
connection_->setReadBufferReady();
return {PostIoAction::KeepOpen, 10, false};
}));
// Expect a change to the event mask when hitting the read buffer high-watermark.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
// The setReadBufferReady call adds a spurious read activation.
// TODO(antoniovicente) Skip the read activate in setReadBufferReady when read_disable_count_ > 0.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue));
EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue));
file_ready_cb_(Event::FileReadyType::Read);

// Already read disabled, expect no changes to enabled events mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
// Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read
// activate but no call to setEnable to change the registration mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Invoke the file event cb while read_disable_count_ == 1 and fully drain the read buffer.
// Expect no transport reads. Expect a read resumption due to transport_wants_read_ being true
// when read is re-enabled due to going under the low watermark.
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
EXPECT_CALL(*read_filter, onData(_, _))
.WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus {
EXPECT_EQ(10, data.length());
data.drain(data.length());
return FilterStatus::Continue;
}));
// The buffer is fully drained. Expect a read activation because setReadBufferReady set
// transport_wants_read_ and no transport doRead calls have happened.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
file_ready_cb_(Event::FileReadyType::Read);

EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
file_ready_cb_(Event::FileReadyType::Read);

// Verify there are no read activate calls the event callback does a transport read and clears the
// transport_wants_read_ state.
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Test that BytesSentCb is invoked at the correct times
TEST_F(MockTransportConnectionImplTest, BytesSentCallback) {
uint64_t bytes_sent = 0;
Expand Down
Loading