Skip to content

Commit

Permalink
backport to v1.14: connection: Remember transport socket read resumpt…
Browse files Browse the repository at this point in the history
…ion requests and replay them when re-enabling read. (#13772) (#14173) (#14254)

Fixes SslSocket read resumption after readDisable when processing the SSL record that contains the last bytes of the HTTP message

Signed-off-by: Antonio Vicente <avd@google.com>
  • Loading branch information
antoniovicente authored Dec 4, 2020
1 parent 21ad0d9 commit d3d745c
Show file tree
Hide file tree
Showing 9 changed files with 248 additions and 13 deletions.
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Version history
Changes
-------
* listener: fix crash when disabling or re-enabling listeners due to overload while processing LDS updates.
* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers.
* udp: fixed issue in which receiving truncated UDP datagrams would cause Envoy to crash.

1.14.5 (September 29, 2020)
Expand Down
21 changes: 17 additions & 4 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
[this]() -> void { this->onHighWatermark(); })),
read_enabled_(true), above_high_watermark_(false), detect_early_close_(true),
enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false) {
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
transport_wants_read_(false) {
// Treat the lack of a valid fd (which in practice only happens if we run out of FDs) as an OOM
// condition and just crash.
RELEASE_ASSERT(SOCKET_VALID(ioHandle().fd()), "");
RELEASE_ASSERT(SOCKET_VALID(socket_->ioHandle().fd()), "");

if (!connected) {
connecting_ = true;
Expand All @@ -71,7 +72,7 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
file_event_ = dispatcher_.createFileEvent(
ioHandle().fd(), [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
socket_->ioHandle().fd(), [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
Event::FileReadyType::Read | Event::FileReadyType::Write);

transport_socket_->setTransportSocketCallbacks(*this);
Expand Down Expand Up @@ -360,7 +361,13 @@ void ConnectionImpl::readDisable(bool disable) {
// If the connection has data buffered there's no guarantee there's also data in the kernel
// which will kick off the filter chain. Instead fake an event to make sure the buffered data
// gets processed regardless and ensure that we dispatch it via onRead.
if (read_buffer_.length() > 0) {
if (read_buffer_.length() > 0 || transport_wants_read_) {
// If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be
// able to process additional bytes even if there is no data in the kernel to kick off the
// filter chain. Alternately if the read buffer has data the fd could be read disabled. To
// handle these cases, fake an event to make sure the buffered data in the read buffer or in
// transport socket internal buffers gets processed regardless and ensure that we dispatch it
// via onRead.
dispatch_buffered_data_ = true;
file_event_->activate(Event::FileReadyType::Read);
}
Expand Down Expand Up @@ -529,9 +536,15 @@ void ConnectionImpl::onFileEvent(uint32_t events) {

void ConnectionImpl::onReadReady() {
ENVOY_CONN_LOG(trace, "read ready", *this);
ASSERT(read_enabled_);

ASSERT(!connecting_);

// Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
// the transport socket read resumption happens as requested; onReadReady() returns early without
// reading from the transport if the read buffer is above high watermark at the start of the
// method.
transport_wants_read_ = false;
IoResult result = transport_socket_->doRead(read_buffer_);
uint64_t new_buffer_size = read_buffer_.length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);
Expand Down
10 changes: 9 additions & 1 deletion source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// TODO(htuch): While this is the basis for also yielding to other connections to provide some
// fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
// Reconsider how to make fairness happen.
void setReadBufferReady() override { file_event_->activate(Event::FileReadyType::Read); }
void setReadBufferReady() override {
transport_wants_read_ = true;
file_event_->activate(Event::FileReadyType::Read);
}
void flushWriteBuffer() override;

// Obtain global next connection ID. This should only be used in tests.
Expand Down Expand Up @@ -183,6 +186,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
bool write_end_stream_ : 1;
bool current_write_end_stream_ : 1;
bool dispatch_buffered_data_ : 1;
// True if the most recent call to the transport socket's doRead method invoked setReadBufferReady
// to schedule read resumption after yielding due to shouldDrainReadBuffer(). When true,
// readDisable must schedule read resumption when read_disable_count_ == 0 to ensure that read
// resumption happens when remaining bytes are held in transport socket internal buffers.
bool transport_wants_read_ : 1;
};

/**
Expand Down
57 changes: 56 additions & 1 deletion test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ TEST_P(ConnectionImplDeathTest, BadFd) {
ConnectionImpl(*dispatcher,
std::make_unique<ConnectionSocketImpl>(std::move(io_handle), nullptr, nullptr),
Network::Test::createRawBufferSocket(), stream_info, false),
".*assert failure: SOCKET_VALID\\(ioHandle\\(\\)\\.fd\\(\\)\\).*");
".*assert failure: SOCKET_VALID\\(socket_->ioHandle\\(\\)\\.fd\\(\\)\\).*");
}

class ConnectionImplTest : public testing::TestWithParam<Address::IpVersion> {
Expand Down Expand Up @@ -1511,6 +1511,61 @@ TEST_F(MockTransportConnectionImplTest, ObjectDestructOrder) {
file_ready_cb_(Event::FileReadyType::Read);
}

// Verify that read resumptions requested via setReadBufferReady() are scheduled once read is
// re-enabled.
TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// No calls to activate when re-enabling if there are no pending read requests.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);

// setReadBufferReady triggers an immediate call to activate.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->setReadBufferReady();

// When processing a sequence of read disable/read enable, changes to the enabled event mask
// happen only when the disable count transitions to/from 0.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
connection_->readDisable(false);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// Expect a read activation since there have been no transport doRead calls since the call to
// setReadBufferReady.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Disable read.
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);

// Expect a read activate when re-enabling since the file ready cb has not done a read.
EXPECT_CALL(*file_event_, setEnabled(_));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Do a read to clear the transport_wants_read_ flag, verify that no read activation is scheduled.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
file_ready_cb_(Event::FileReadyType::Read);
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
// No read activate call.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Test that BytesSentCb is invoked at the correct times
TEST_F(MockTransportConnectionImplTest, BytesSentCallback) {
uint64_t bytes_sent = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "extensions/transport_sockets/tls/context_config_impl.h"
#include "extensions/transport_sockets/tls/context_manager_impl.h"

#include "test/integration/autonomous_upstream.h"
#include "test/integration/integration.h"
#include "test/integration/utility.h"
#include "test/test_common/network_utility.h"
Expand Down Expand Up @@ -177,6 +178,103 @@ TEST_P(SslIntegrationTest, AdminCertEndpoint) {
EXPECT_EQ("200", response->headers().Status()->value().getStringView());
}

class RawWriteSslIntegrationTest : public SslIntegrationTest {
protected:
std::unique_ptr<Http::TestRequestHeaderMapImpl>
testFragmentedRequestWithBufferLimit(std::list<std::string> request_chunks,
uint32_t buffer_limit) {
autonomous_upstream_ = true;
config_helper_.setBufferLimits(buffer_limit, buffer_limit);
initialize();

// write_request_cb will write each of the items in request_chunks as a separate SSL_write.
auto write_request_cb = [&request_chunks](Network::ClientConnection& client) {
if (!request_chunks.empty()) {
Buffer::OwnedImpl buffer(request_chunks.front());
client.write(buffer, false);
request_chunks.pop_front();
}
};

auto client_transport_socket_factory_ptr =
createClientSslTransportSocketFactory({}, *context_manager_, *api_);
std::string response;
auto connection = createConnectionDriver(
lookupPort("http"), write_request_cb,
[&](Network::ClientConnection&, const Buffer::Instance& data) -> void {
response.append(data.toString());
},
client_transport_socket_factory_ptr->createTransportSocket({}));

// Drive the connection until we get a response.
while (response.empty()) {
connection->run(Event::Dispatcher::RunType::NonBlock);
}
EXPECT_THAT(response, testing::HasSubstr("HTTP/1.1 200 OK\r\n"));

connection->close();
return reinterpret_cast<AutonomousUpstream*>(fake_upstreams_.front().get())
->lastRequestHeaders();
}
};

INSTANTIATE_TEST_SUITE_P(IpVersions, RawWriteSslIntegrationTest,
testing::ValuesIn(TestEnvironment::getIpVersionsForTest()),
TestUtility::ipTestParamsToString);

// Regression test for https://github.com/envoyproxy/envoy/issues/12304
TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcessingHeaders) {
// The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were
// picked such that the connection's high watermark will trigger while processing the last SSL
// record containing the request headers. Verify that read resumption works correctly after
// hitting the receive buffer high watermark.
std::list<std::string> request_chunks = {
"GET / HTTP/1.1\r\nHost: host\r\n",
"key1:" + std::string(14000, 'a') + "\r\n",
"key2:" + std::string(16000, 'b') + "\r\n\r\n",
};

std::unique_ptr<Http::TestRequestHeaderMapImpl> upstream_headers =
testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024);
ASSERT_TRUE(upstream_headers != nullptr);
EXPECT_EQ(upstream_headers->Host()->value(), "host");
EXPECT_EQ(std::string(14000, 'a'),
upstream_headers->get(Envoy::Http::LowerCaseString("key1"))->value().getStringView());
EXPECT_EQ(std::string(16000, 'b'),
upstream_headers->get(Envoy::Http::LowerCaseString("key2"))->value().getStringView());
}

// Regression test for https://github.com/envoyproxy/envoy/issues/12304
TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingBody) {
// The raw writer will perform a separate SSL_write for each of the chunks below. Chunk sizes were
// picked such that the connection's high watermark will trigger while processing the last SSL
// record containing the POST body. Verify that read resumption works correctly after hitting the
// receive buffer high watermark.
std::list<std::string> request_chunks = {
"POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 30000\r\n\r\n",
std::string(14000, 'a'),
std::string(16000, 'a'),
};

std::unique_ptr<Http::TestRequestHeaderMapImpl> upstream_headers =
testFragmentedRequestWithBufferLimit(request_chunks, 15 * 1024);
ASSERT_TRUE(upstream_headers != nullptr);
}

// Regression test for https://github.com/envoyproxy/envoy/issues/12304
TEST_P(RawWriteSslIntegrationTest, HighWatermarkReadResumptionProcesingLargerBody) {
std::list<std::string> request_chunks = {
"POST / HTTP/1.1\r\nHost: host\r\ncontent-length: 150000\r\n\r\n",
};
for (int i = 0; i < 10; ++i) {
request_chunks.push_back(std::string(15000, 'a'));
}

std::unique_ptr<Http::TestRequestHeaderMapImpl> upstream_headers =
testFragmentedRequestWithBufferLimit(request_chunks, 16 * 1024);
ASSERT_TRUE(upstream_headers != nullptr);
}

// Validate certificate selection across different certificate types and client TLS versions.
class SslCertficateIntegrationTest
: public testing::TestWithParam<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ class SslIntegrationTestBase : public HttpIntegrationTest {
// Set this true to debug SSL handshake issues with openssl s_client. The
// verbose trace will be in the logs, openssl must be installed separately.
bool debug_with_s_client_{false};

private:
std::unique_ptr<ContextManager> context_manager_;
};

Expand Down
16 changes: 16 additions & 0 deletions test/integration/integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,22 @@ class BaseIntegrationTest : protected Logger::Loggable<Logger::Id::testing> {
void sendRawHttpAndWaitForResponse(int port, const char* raw_http, std::string* response,
bool disconnect_after_headers_complete = false);

/**
* Helper to create ConnectionDriver.
*
* @param port the port to connect to.
* @param write_request_cb callback used to send data.
* @param data_callback the callback on the received data.
* @param transport_socket transport socket to use for the client connection
**/
std::unique_ptr<RawConnectionDriver> createConnectionDriver(
uint32_t port, RawConnectionDriver::DoWriteCallback write_request_cb,
std::function<void(Network::ClientConnection&, const Buffer::Instance&)>&& data_callback,
Network::TransportSocketPtr transport_socket) {
return std::make_unique<RawConnectionDriver>(port, write_request_cb, data_callback, version_,
std::move(transport_socket));
}

protected:
// Create the envoy server in another thread and start it.
// Will not return until that server is listening.
Expand Down
27 changes: 26 additions & 1 deletion test/integration/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "absl/strings/match.h"

namespace Envoy {

void BufferingStreamDecoder::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
ASSERT(!complete_);
complete_ = end_stream;
Expand Down Expand Up @@ -118,7 +119,7 @@ RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initia
api_ = Api::createApiForTest(stats_store_);
Event::GlobalTimeSystem time_system;
dispatcher_ = api_->allocateDispatcher();
callbacks_ = std::make_unique<ConnectionCallbacks>();
callbacks_ = std::make_unique<ConnectionCallbacks>([]() {});
client_ = dispatcher_->createClientConnection(
Network::Utility::resolveUrl(
fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)),
Expand All @@ -129,6 +130,30 @@ RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& initia
client_->connect();
}

RawConnectionDriver::RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback,
ReadCallback response_data_callback,
Network::Address::IpVersion version,
Network::TransportSocketPtr transport_socket) {
api_ = Api::createApiForTest(stats_store_);
Event::GlobalTimeSystem time_system;
dispatcher_ = api_->allocateDispatcher();
callbacks_ = std::make_unique<ConnectionCallbacks>(
[this, write_request_callback]() { write_request_callback(*client_); });
client_ = dispatcher_->createClientConnection(
Network::Utility::resolveUrl(
fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)),
Network::Address::InstanceConstSharedPtr(), std::move(transport_socket), nullptr);
// ConnectionCallbacks will call write_request_callback from the connect and low-watermark
// callbacks. Set a small buffer limit so high-watermark is triggered after every write and
// low-watermark is triggered every time the buffer is drained.
client_->setBufferLimits(1);

client_->addConnectionCallbacks(*callbacks_);
client_->addReadFilter(
Network::ReadFilterSharedPtr{new ForwardingFilter(*this, response_data_callback)});
client_->connect();
}

RawConnectionDriver::~RawConnectionDriver() = default;

void RawConnectionDriver::run(Event::Dispatcher::RunType run_type) { dispatcher_->run(run_type); }
Expand Down
Loading

0 comments on commit d3d745c

Please sign in to comment.