From 7c11e9296535f2ab9d48df4b7606f08d9706df88 Mon Sep 17 00:00:00 2001 From: Stephan Zuercher Date: Mon, 23 Jul 2018 14:13:03 -0700 Subject: [PATCH] tcp/conn_pool: improve interface for callers (#3903) tcp/conn_pool: improve interface for callers Provides additional pool failure reasons to allow Tcp::ConnectionPool callers to distinguish between time outs and connection failures. Passes through connection and buffer watermark events. A subsequent PR will switch the TCP proxy to use the TCP connection pool (and makes use of these features). Relates to #3818. *Risk Level*: low *Testing*: unit tests *Docs Changes*: n/a *Release Notes*: n/a Signed-off-by: Stephan Zuercher --- include/envoy/tcp/conn_pool.h | 22 +++--- source/common/tcp/conn_pool.cc | 45 ++++++++++-- source/common/tcp/conn_pool.h | 9 ++- test/common/tcp/conn_pool_test.cc | 68 ++++++++++++++++++- .../tcp_conn_pool_integration_test.cc | 5 ++ test/mocks/tcp/BUILD | 1 + test/mocks/tcp/mocks.cc | 37 +++++++++- test/mocks/tcp/mocks.h | 31 ++++++++- 8 files changed, 195 insertions(+), 23 deletions(-) diff --git a/include/envoy/tcp/conn_pool.h b/include/envoy/tcp/conn_pool.h index 42bad77e44ce..c89b25eb9143 100644 --- a/include/envoy/tcp/conn_pool.h +++ b/include/envoy/tcp/conn_pool.h @@ -26,19 +26,25 @@ class Cancellable { }; /** - * Reason that a pool stream could not be obtained. + * Reason that a pool connection could not be obtained. */ enum class PoolFailureReason { - // A resource overflowed and policy prevented a new stream from being created. + // A resource overflowed and policy prevented a new connection from being created. Overflow, - // A connection failure took place and the stream could not be bound. - ConnectionFailure + // A local connection failure took place while creating a new connection. + LocalConnectionFailure, + // A remote connection failure took place while creating a new connection. + RemoteConnectionFailure, + // A timeout occurred while creating a new connection. + Timeout, }; /* - * UpstreamCallbacks for connection pool upstream connection callbacks. + * UpstreamCallbacks for connection pool upstream connection callbacks and data. Note that + * onEvent(Connected) is never triggered since the event always occurs before a ConnectionPool + * caller is assigned a connection. */ -class UpstreamCallbacks { +class UpstreamCallbacks : public Network::ConnectionCallbacks { public: virtual ~UpstreamCallbacks() {} @@ -122,14 +128,14 @@ class Instance : public Event::DeferredDeletable { /** * Register a callback that gets called when the connection pool is fully drained. No actual * draining is done. The owner of the connection pool is responsible for not creating any - * new streams. + * new connections. */ virtual void addDrainedCallback(DrainedCb cb) PURE; /** * Actively drain all existing connection pool connections. This method can be used in cases * where the connection pool is not being destroyed, but the caller wishes to make sure that - * all new streams take place on a new connection. For example, when a health check failure + * all new requests take place on a new connection. For example, when a health check failure * occurs. */ virtual void drainConnections() PURE; diff --git a/source/common/tcp/conn_pool.cc b/source/common/tcp/conn_pool.cc index 077c7840aea1..d6c60c174c46 100644 --- a/source/common/tcp/conn_pool.cc +++ b/source/common/tcp/conn_pool.cc @@ -104,6 +104,15 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent if (event == Network::ConnectionEvent::RemoteClose || event == Network::ConnectionEvent::LocalClose) { ENVOY_CONN_LOG(debug, "client disconnected", *conn.conn_); + + if (event == Network::ConnectionEvent::LocalClose) { + host_->cluster().stats().upstream_cx_destroy_local_.inc(); + } + if (event == Network::ConnectionEvent::RemoteClose) { + host_->cluster().stats().upstream_cx_destroy_remote_.inc(); + } + host_->cluster().stats().upstream_cx_destroy_.inc(); + ActiveConnPtr removed; bool check_for_drained = true; if (conn.wrapper_ != nullptr) { @@ -135,13 +144,21 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent // do with the request. // NOTE: We move the existing pending requests to a temporary list. This is done so that // if retry logic submits a new request to the pool, we don't fail it inline. + ConnectionPool::PoolFailureReason reason; + if (conn.timed_out_) { + reason = ConnectionPool::PoolFailureReason::Timeout; + } else if (event == Network::ConnectionEvent::RemoteClose) { + reason = ConnectionPool::PoolFailureReason::RemoteConnectionFailure; + } else { + reason = ConnectionPool::PoolFailureReason::LocalConnectionFailure; + } + std::list pending_requests_to_purge(std::move(pending_requests_)); while (!pending_requests_to_purge.empty()) { PendingRequestPtr request = pending_requests_to_purge.front()->removeFromList(pending_requests_to_purge); host_->cluster().stats().upstream_rq_pending_failure_eject_.inc(); - request->callbacks_.onPoolFailure(ConnectionPool::PoolFailureReason::ConnectionFailure, - conn.real_host_description_); + request->callbacks_.onPoolFailure(reason, conn.real_host_description_); } } @@ -277,7 +294,7 @@ ConnPoolImpl::PendingRequest::~PendingRequest() { ConnPoolImpl::ActiveConn::ActiveConn(ConnPoolImpl& parent) : parent_(parent), connect_timer_(parent_.dispatcher_.createTimer([this]() -> void { onConnectTimeout(); })), - remaining_requests_(parent_.host_->cluster().maxRequestsPerConnection()) { + remaining_requests_(parent_.host_->cluster().maxRequestsPerConnection()), timed_out_(false) { parent_.conn_connect_ms_.reset( new Stats::Timespan(parent_.host_->cluster().stats().upstream_cx_connect_ms_)); @@ -297,7 +314,6 @@ ConnPoolImpl::ActiveConn::ActiveConn(ConnPoolImpl& parent) parent_.host_->cluster().stats().upstream_cx_total_.inc(); parent_.host_->cluster().stats().upstream_cx_active_.inc(); - parent_.host_->cluster().stats().upstream_cx_http1_total_.inc(); parent_.host_->stats().cx_total_.inc(); parent_.host_->stats().cx_active_.inc(); conn_length_.reset(new Stats::Timespan(parent_.host_->cluster().stats().upstream_cx_length_ms_)); @@ -328,6 +344,7 @@ void ConnPoolImpl::ActiveConn::onConnectTimeout() { // We just close the connection at this point. This will result in both a timeout and a connect // failure and will fold into all the normal connect failure logic. ENVOY_CONN_LOG(debug, "connect timeout", *conn_); + timed_out_ = true; parent_.host_->cluster().stats().upstream_cx_connect_timeout_.inc(); conn_->close(Network::ConnectionCloseType::NoFlush); } @@ -343,5 +360,25 @@ void ConnPoolImpl::ActiveConn::onUpstreamData(Buffer::Instance& data, bool end_s } } +void ConnPoolImpl::ActiveConn::onEvent(Network::ConnectionEvent event) { + if (wrapper_ != nullptr && wrapper_->callbacks_ != nullptr) { + wrapper_->callbacks_->onEvent(event); + } + + parent_.onConnectionEvent(*this, event); +} + +void ConnPoolImpl::ActiveConn::onAboveWriteBufferHighWatermark() { + if (wrapper_ != nullptr && wrapper_->callbacks_ != nullptr) { + wrapper_->callbacks_->onAboveWriteBufferHighWatermark(); + } +} + +void ConnPoolImpl::ActiveConn::onBelowWriteBufferLowWatermark() { + if (wrapper_ != nullptr && wrapper_->callbacks_ != nullptr) { + wrapper_->callbacks_->onBelowWriteBufferLowWatermark(); + } +} + } // namespace Tcp } // namespace Envoy diff --git a/source/common/tcp/conn_pool.h b/source/common/tcp/conn_pool.h index 97b412d47e14..aa06d8b2d8b4 100644 --- a/source/common/tcp/conn_pool.h +++ b/source/common/tcp/conn_pool.h @@ -72,11 +72,9 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: void onUpstreamData(Buffer::Instance& data, bool end_stream); // Network::ConnectionCallbacks - void onEvent(Network::ConnectionEvent event) override { - parent_.onConnectionEvent(*this, event); - } - void onAboveWriteBufferHighWatermark() override {} - void onBelowWriteBufferLowWatermark() override {} + void onEvent(Network::ConnectionEvent event) override; + void onAboveWriteBufferHighWatermark() override; + void onBelowWriteBufferLowWatermark() override; ConnPoolImpl& parent_; Upstream::HostDescriptionConstSharedPtr real_host_description_; @@ -85,6 +83,7 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: Event::TimerPtr connect_timer_; Stats::TimespanPtr conn_length_; uint64_t remaining_requests_; + bool timed_out_; }; typedef std::unique_ptr ActiveConnPtr; diff --git a/test/common/tcp/conn_pool_test.cc b/test/common/tcp/conn_pool_test.cc index db4ad249bf03..90961972724e 100644 --- a/test/common/tcp/conn_pool_test.cc +++ b/test/common/tcp/conn_pool_test.cc @@ -41,15 +41,17 @@ struct ConnPoolCallbacks : public Tcp::ConnectionPool::Callbacks { pool_ready_.ready(); } - void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason, + void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason, Upstream::HostDescriptionConstSharedPtr host) override { + reason_ = reason; host_ = host; pool_failure_.ready(); } ReadyWatcher pool_failure_; ReadyWatcher pool_ready_; - ConnectionPool::ConnectionData* conn_data_; + ConnectionPool::ConnectionData* conn_data_{}; + absl::optional reason_; Upstream::HostDescriptionConstSharedPtr host_; }; @@ -322,6 +324,16 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) { EXPECT_EQ(Network::FilterStatus::StopIteration, conn_pool_.test_conns_[0].filter_->onData(buffer, false)); + EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); + for (auto* cb : conn_pool_.test_conns_[0].connection_->callbacks_) { + cb->onAboveWriteBufferHighWatermark(); + } + + EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark()); + for (auto* cb : conn_pool_.test_conns_[0].connection_->callbacks_) { + cb->onBelowWriteBufferLowWatermark(); + } + // Shutdown normally. EXPECT_CALL(conn_pool_, onConnReleasedForTest()); c1.releaseConn(); @@ -331,6 +343,23 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) { dispatcher_.clearDeferredDeleteList(); } +TEST_F(TcpConnPoolImplTest, UpstreamCallbacksCloseEvent) { + Buffer::OwnedImpl buffer; + + InSequence s; + ConnectionPool::MockUpstreamCallbacks callbacks; + + // Create connection, set UpstreamCallbacks + ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection); + c1.callbacks_.conn_data_->addUpstreamCallbacks(callbacks); + + EXPECT_CALL(callbacks, onEvent(Network::ConnectionEvent::RemoteClose)); + + EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); + conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + dispatcher_.clearDeferredDeleteList(); +} + TEST_F(TcpConnPoolImplTest, NoUpstreamCallbacks) { Buffer::OwnedImpl buffer; @@ -394,6 +423,8 @@ TEST_F(TcpConnPoolImplTest, MaxPendingRequests) { conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); dispatcher_.clearDeferredDeleteList(); + EXPECT_EQ(ConnectionPool::PoolFailureReason::Overflow, callbacks2.reason_); + EXPECT_EQ(1U, cluster_->stats_.upstream_rq_pending_overflow_.value()); } @@ -401,7 +432,7 @@ TEST_F(TcpConnPoolImplTest, MaxPendingRequests) { * Tests a connection failure before a request is bound which should result in the pending request * getting purged. */ -TEST_F(TcpConnPoolImplTest, ConnectFailure) { +TEST_F(TcpConnPoolImplTest, RemoteConnectFailure) { InSequence s; // Request 1 should kick off a new connection. @@ -417,6 +448,34 @@ TEST_F(TcpConnPoolImplTest, ConnectFailure) { conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); dispatcher_.clearDeferredDeleteList(); + EXPECT_EQ(ConnectionPool::PoolFailureReason::RemoteConnectionFailure, callbacks.reason_); + + EXPECT_EQ(1U, cluster_->stats_.upstream_cx_connect_fail_.value()); + EXPECT_EQ(1U, cluster_->stats_.upstream_rq_pending_failure_eject_.value()); +} + +/** + * Tests a connection failure before a request is bound which should result in the pending request + * getting purged. + */ +TEST_F(TcpConnPoolImplTest, LocalConnectFailure) { + InSequence s; + + // Request 1 should kick off a new connection. + ConnPoolCallbacks callbacks; + conn_pool_.expectConnCreate(); + Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(callbacks); + EXPECT_NE(nullptr, handle); + + EXPECT_CALL(callbacks.pool_failure_, ready()); + EXPECT_CALL(*conn_pool_.test_conns_[0].connect_timer_, disableTimer()); + + EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); + conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::LocalClose); + dispatcher_.clearDeferredDeleteList(); + + EXPECT_EQ(ConnectionPool::PoolFailureReason::LocalConnectionFailure, callbacks.reason_); + EXPECT_EQ(1U, cluster_->stats_.upstream_cx_connect_fail_.value()); EXPECT_EQ(1U, cluster_->stats_.upstream_rq_pending_failure_eject_.value()); } @@ -446,6 +505,9 @@ TEST_F(TcpConnPoolImplTest, ConnectTimeout) { EXPECT_CALL(conn_pool_, onConnDestroyedForTest()).Times(2); dispatcher_.clearDeferredDeleteList(); + EXPECT_EQ(ConnectionPool::PoolFailureReason::Timeout, callbacks1.reason_); + EXPECT_EQ(ConnectionPool::PoolFailureReason::Timeout, callbacks2.reason_); + EXPECT_EQ(2U, cluster_->stats_.upstream_cx_connect_fail_.value()); EXPECT_EQ(2U, cluster_->stats_.upstream_cx_connect_timeout_.value()); } diff --git a/test/integration/tcp_conn_pool_integration_test.cc b/test/integration/tcp_conn_pool_integration_test.cc index 5bb99fbb0362..4ae2b50eccfc 100644 --- a/test/integration/tcp_conn_pool_integration_test.cc +++ b/test/integration/tcp_conn_pool_integration_test.cc @@ -46,6 +46,7 @@ class TestFilter : public Network::ReadFilter { public: Request(TestFilter& parent, Buffer::Instance& data) : parent_(parent) { data_.move(data); } + // Tcp::ConnectionPool::Callbacks void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason, Upstream::HostDescriptionConstSharedPtr) override { ASSERT(false); @@ -59,6 +60,7 @@ class TestFilter : public Network::ReadFilter { upstream_->connection().write(data_, false); } + // Tcp::ConnectionPool::UpstreamCallbacks void onUpstreamData(Buffer::Instance& data, bool end_stream) override { UNREFERENCED_PARAMETER(end_stream); @@ -67,6 +69,9 @@ class TestFilter : public Network::ReadFilter { upstream_->release(); } + void onEvent(Network::ConnectionEvent) override {} + void onAboveWriteBufferHighWatermark() override {} + void onBelowWriteBufferLowWatermark() override {} TestFilter& parent_; Buffer::OwnedImpl data_; diff --git a/test/mocks/tcp/BUILD b/test/mocks/tcp/BUILD index 0988722b71d7..8634b86e9c5c 100644 --- a/test/mocks/tcp/BUILD +++ b/test/mocks/tcp/BUILD @@ -15,6 +15,7 @@ envoy_cc_mock( deps = [ "//include/envoy/buffer:buffer_interface", "//include/envoy/tcp:conn_pool_interface", + "//test/mocks/network:network_mocks", "//test/mocks/upstream:host_mocks", ], ) diff --git a/test/mocks/tcp/mocks.cc b/test/mocks/tcp/mocks.cc index f8586de4f041..1713d04e6128 100644 --- a/test/mocks/tcp/mocks.cc +++ b/test/mocks/tcp/mocks.cc @@ -3,6 +3,10 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +using testing::Invoke; +using testing::ReturnRef; +using testing::_; + namespace Envoy { namespace Tcp { namespace ConnectionPool { @@ -13,9 +17,40 @@ MockCancellable::~MockCancellable() {} MockUpstreamCallbacks::MockUpstreamCallbacks() {} MockUpstreamCallbacks::~MockUpstreamCallbacks() {} -MockInstance::MockInstance() {} +MockConnectionData::MockConnectionData() { + ON_CALL(*this, connection()).WillByDefault(ReturnRef(connection_)); +} +MockConnectionData::~MockConnectionData() {} + +MockInstance::MockInstance() { + ON_CALL(*this, newConnection(_)).WillByDefault(Invoke([&](Callbacks& cb) -> Cancellable* { + return newConnectionImpl(cb); + })); +} MockInstance::~MockInstance() {} +MockCancellable* MockInstance::newConnectionImpl(Callbacks& cb) { + handles_.emplace_back(); + callbacks_.push_back(&cb); + return &handles_.back(); +} + +void MockInstance::poolFailure(PoolFailureReason reason) { + Callbacks* cb = callbacks_.front(); + callbacks_.pop_front(); + handles_.pop_front(); + + cb->onPoolFailure(reason, host_); +} + +void MockInstance::poolReady() { + Callbacks* cb = callbacks_.front(); + callbacks_.pop_front(); + handles_.pop_front(); + + cb->onPoolReady(connection_data_, host_); +} + } // namespace ConnectionPool } // namespace Tcp } // namespace Envoy diff --git a/test/mocks/tcp/mocks.h b/test/mocks/tcp/mocks.h index ed90509a442c..a9cc52fecbbb 100644 --- a/test/mocks/tcp/mocks.h +++ b/test/mocks/tcp/mocks.h @@ -3,11 +3,14 @@ #include "envoy/tcp/conn_pool.h" #include "test/mocks/common.h" +#include "test/mocks/network/mocks.h" #include "test/mocks/upstream/host.h" #include "test/test_common/printers.h" #include "gmock/gmock.h" +using testing::NiceMock; + namespace Envoy { namespace Tcp { namespace ConnectionPool { @@ -28,6 +31,22 @@ class MockUpstreamCallbacks : public UpstreamCallbacks { // Tcp::ConnectionPool::UpstreamCallbacks MOCK_METHOD2(onUpstreamData, void(Buffer::Instance& data, bool end_stream)); + MOCK_METHOD1(onEvent, void(Network::ConnectionEvent event)); + MOCK_METHOD0(onAboveWriteBufferHighWatermark, void()); + MOCK_METHOD0(onBelowWriteBufferLowWatermark, void()); +}; + +class MockConnectionData : public ConnectionData { +public: + MockConnectionData(); + ~MockConnectionData(); + + // Tcp::ConnectionPool::ConnectionData + MOCK_METHOD0(connection, Network::ClientConnection&()); + MOCK_METHOD1(addUpstreamCallbacks, void(ConnectionPool::UpstreamCallbacks&)); + MOCK_METHOD0(release, void()); + + NiceMock connection_; }; class MockInstance : public Instance { @@ -40,8 +59,16 @@ class MockInstance : public Instance { MOCK_METHOD0(drainConnections, void()); MOCK_METHOD1(newConnection, Cancellable*(Tcp::ConnectionPool::Callbacks& callbacks)); - std::shared_ptr> host_{ - new testing::NiceMock()}; + MockCancellable* newConnectionImpl(Callbacks& cb); + void poolFailure(PoolFailureReason reason); + void poolReady(); + + std::list> handles_; + std::list callbacks_; + + std::shared_ptr> host_{ + new NiceMock()}; + NiceMock connection_data_; }; } // namespace ConnectionPool