diff --git a/include/envoy/tcp/conn_pool.h b/include/envoy/tcp/conn_pool.h index 15e2eaacc377..6c8515c6ac94 100644 --- a/include/envoy/tcp/conn_pool.h +++ b/include/envoy/tcp/conn_pool.h @@ -12,6 +12,21 @@ namespace Envoy { namespace Tcp { namespace ConnectionPool { +/** + * Controls the behavior of a canceled connection request. + */ +enum class CancelPolicy { + // By default, canceled connection requests allow a pending connection to complete and become + // available for a future connection request. + Default, + // When a connection request is canceled, closes a pending connection if there are more pending + // connections than pending connection requests. CloseExcess is useful for callers that never + // re-use connections (e.g. by closing rather than releasing connections). Using CloseExcess in + // this situation guarantees that no idle connections will be held open by the conn pool awaiting + // a connection request. + CloseExcess, +}; + /** * Handle that allows a pending connection request to be canceled before it is completed. */ @@ -20,9 +35,11 @@ class Cancellable { virtual ~Cancellable() {} /** - * Cancel the pending request. + * Cancel the pending connection request. + * @param cancel_policy a CancelPolicy that controls the behavior of this connection request + * cancellation. */ - virtual void cancel() PURE; + virtual void cancel(CancelPolicy cancel_policy) PURE; }; /** diff --git a/source/common/tcp/conn_pool.cc b/source/common/tcp/conn_pool.cc index bd887b17f424..cb49d4d64efa 100644 --- a/source/common/tcp/conn_pool.cc +++ b/source/common/tcp/conn_pool.cc @@ -22,6 +22,10 @@ ConnPoolImpl::~ConnPoolImpl() { busy_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush); } + while (!pending_conns_.empty()) { + pending_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush); + } + // Make sure all connections are destroyed before we are destroyed. dispatcher_.clearDeferredDeleteList(); } @@ -31,11 +35,15 @@ void ConnPoolImpl::drainConnections() { ready_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush); } - // We drain busy connections by manually setting remaining requests to 1. Thus, when the next - // response completes the connection will be destroyed. + // We drain busy and pending connections by manually setting remaining requests to 1. Thus, when + // the next response completes the connection will be destroyed. for (const auto& conn : busy_conns_) { conn->remaining_requests_ = 1; } + + for (const auto& conn : pending_conns_) { + conn->remaining_requests_ = 1; + } } void ConnPoolImpl::addDrainedCallback(DrainedCb cb) { @@ -52,7 +60,8 @@ void ConnPoolImpl::assignConnection(ActiveConn& conn, ConnectionPool::Callbacks& } void ConnPoolImpl::checkForDrained() { - if (!drained_callbacks_.empty() && pending_requests_.empty() && busy_conns_.empty()) { + if (!drained_callbacks_.empty() && pending_requests_.empty() && busy_conns_.empty() && + pending_conns_.empty()) { while (!ready_conns_.empty()) { ready_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush); } @@ -66,7 +75,7 @@ void ConnPoolImpl::checkForDrained() { void ConnPoolImpl::createNewConnection() { ENVOY_LOG(debug, "creating a new connection"); ActiveConnPtr conn(new ActiveConn(*this)); - conn->moveIntoList(std::move(conn), busy_conns_); + conn->moveIntoList(std::move(conn), pending_conns_); } ConnectionPool::Cancellable* ConnPoolImpl::newConnection(ConnectionPool::Callbacks& callbacks) { @@ -85,7 +94,8 @@ ConnectionPool::Cancellable* ConnPoolImpl::newConnection(ConnectionPool::Callbac } // If we have no connections at all, make one no matter what so we don't starve. - if ((ready_conns_.size() == 0 && busy_conns_.size() == 0) || can_create_connection) { + if ((ready_conns_.empty() && busy_conns_.empty() && pending_conns_.empty()) || + can_create_connection) { createNewConnection(); } @@ -139,7 +149,7 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent // The only time this happens is if we actually saw a connect failure. host_->cluster().stats().upstream_cx_connect_fail_.inc(); host_->stats().cx_connect_fail_.inc(); - removed = conn.removeFromList(busy_conns_); + removed = conn.removeFromList(pending_conns_); // Raw connect failures should never happen under normal circumstances. If we have an upstream // that is behaving badly, requests can get stuck here in the pending state. If we see a @@ -168,7 +178,8 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent dispatcher_.deferredDelete(std::move(removed)); // If we have pending requests and we just lost a connection we should make a new one. - if (pending_requests_.size() > (ready_conns_.size() + busy_conns_.size())) { + if (pending_requests_.size() > + (ready_conns_.size() + busy_conns_.size() + pending_conns_.size())) { createNewConnection(); } @@ -185,17 +196,28 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent // Note that the order in this function is important. Concretely, we must destroy the connect // timer before we process an idle connection, because if this results in an immediate // drain/destruction event, we key off of the existence of the connect timer above to determine - // whether the connection is in the ready list (connected) or the busy list (failed to connect). + // whether the connection is in the ready list (connected) or the pending list (failed to + // connect). if (event == Network::ConnectionEvent::Connected) { conn_connect_ms_->complete(); - processIdleConnection(conn, false); + processIdleConnection(conn, true, false); } } -void ConnPoolImpl::onPendingRequestCancel(PendingRequest& request) { +void ConnPoolImpl::onPendingRequestCancel(PendingRequest& request, + ConnectionPool::CancelPolicy cancel_policy) { ENVOY_LOG(debug, "canceling pending request"); request.removeFromList(pending_requests_); host_->cluster().stats().upstream_rq_cancelled_.inc(); + + // If the cancel requests closure of excess connections and there are more pending connections + // than requests, close the most recently created pending connection. + if (cancel_policy == ConnectionPool::CancelPolicy::CloseExcess && + pending_requests_.size() < pending_conns_.size()) { + ENVOY_LOG(debug, "canceling pending connection"); + pending_conns_.back()->conn_->close(Network::ConnectionCloseType::NoFlush); + } + checkForDrained(); } @@ -211,7 +233,7 @@ void ConnPoolImpl::onConnReleased(ActiveConn& conn) { // Upstream connection might be closed right after response is complete. Setting delay=true // here to assign pending requests in next dispatcher loop to handle that case. // https://github.com/envoyproxy/envoy/issues/2715 - processIdleConnection(conn, true); + processIdleConnection(conn, false, true); } } @@ -226,27 +248,47 @@ void ConnPoolImpl::onUpstreamReady() { ENVOY_CONN_LOG(debug, "assigning connection", *conn.conn_); // There is work to do so bind a connection to the caller and move it to the busy list. Pending // requests are pushed onto the front, so pull from the back. + conn.moveBetweenLists(ready_conns_, busy_conns_); assignConnection(conn, pending_requests_.back()->callbacks_); pending_requests_.pop_back(); - conn.moveBetweenLists(ready_conns_, busy_conns_); } } -void ConnPoolImpl::processIdleConnection(ActiveConn& conn, bool delay) { +void ConnPoolImpl::processIdleConnection(ActiveConn& conn, bool new_connection, bool delay) { if (conn.wrapper_) { conn.wrapper_->invalidate(); conn.wrapper_.reset(); } + // TODO(zuercher): As a future improvement, we may wish to close extra connections when there are + // no pending requests rather than moving them to ready_conns_. For conn pool callers that re-use + // connections it is possible that a busy connection may be re-assigned to a pending request + // while a new connection is pending. The current behavior is to move the pending connection to + // the ready list to await a future request. For some protocols, e.g. mysql which has the server + // transmit handshake data on connect, it may be desirable to close the connection if no pending + // request is available. The CloseExcess flag for cancel is related: if we close pending + // connections without requests here it becomes superfluous (instead of closing connections at + // cancel time we'd wait until they completed and close them here). Finally, we want to avoid + // requiring operators to correct configure clusters to get the necessary pending connection + // behavior (e.g. we want to find a way to enable the new behavior without having to configure + // it on a cluster). + if (pending_requests_.empty() || delay) { // There is nothing to service or delayed processing is requested, so just move the connection // into the ready list. ENVOY_CONN_LOG(debug, "moving to ready", *conn.conn_); - conn.moveBetweenLists(busy_conns_, ready_conns_); + if (new_connection) { + conn.moveBetweenLists(pending_conns_, ready_conns_); + } else { + conn.moveBetweenLists(busy_conns_, ready_conns_); + } } else { // There is work to do immediately so bind a request to the caller and move it to the busy list. // Pending requests are pushed onto the front, so pull from the back. ENVOY_CONN_LOG(debug, "assigning connection", *conn.conn_); + if (new_connection) { + conn.moveBetweenLists(pending_conns_, busy_conns_); + } assignConnection(conn, pending_requests_.back()->callbacks_); pending_requests_.pop_back(); } diff --git a/source/common/tcp/conn_pool.h b/source/common/tcp/conn_pool.h index ff74c2d85847..e484eef80e1f 100644 --- a/source/common/tcp/conn_pool.h +++ b/source/common/tcp/conn_pool.h @@ -124,7 +124,9 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: ~PendingRequest(); // ConnectionPool::Cancellable - void cancel() override { parent_.onPendingRequestCancel(*this); } + void cancel(ConnectionPool::CancelPolicy cancel_policy) override { + parent_.onPendingRequestCancel(*this, cancel_policy); + } ConnPoolImpl& parent_; ConnectionPool::Callbacks& callbacks_; @@ -135,11 +137,11 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: void assignConnection(ActiveConn& conn, ConnectionPool::Callbacks& callbacks); void createNewConnection(); void onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent event); - void onPendingRequestCancel(PendingRequest& request); + void onPendingRequestCancel(PendingRequest& request, ConnectionPool::CancelPolicy cancel_policy); virtual void onConnReleased(ActiveConn& conn); virtual void onConnDestroyed(ActiveConn& conn); void onUpstreamReady(); - void processIdleConnection(ActiveConn& conn, bool delay); + void processIdleConnection(ActiveConn& conn, bool new_connection, bool delay); void checkForDrained(); Event::Dispatcher& dispatcher_; @@ -147,8 +149,9 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: Upstream::ResourcePriority priority_; const Network::ConnectionSocket::OptionsSharedPtr socket_options_; - std::list ready_conns_; - std::list busy_conns_; + std::list pending_conns_; // conns awaiting connected event + std::list ready_conns_; // conns ready for assignment + std::list busy_conns_; // conns assigned std::list pending_requests_; std::list drained_callbacks_; Stats::TimespanPtr conn_connect_ms_; diff --git a/source/common/tcp_proxy/tcp_proxy.cc b/source/common/tcp_proxy/tcp_proxy.cc index e04bc1d37f7d..aab6708dc6af 100644 --- a/source/common/tcp_proxy/tcp_proxy.cc +++ b/source/common/tcp_proxy/tcp_proxy.cc @@ -466,7 +466,8 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) { } else if (upstream_handle_) { if (event == Network::ConnectionEvent::LocalClose || event == Network::ConnectionEvent::RemoteClose) { - upstream_handle_->cancel(); + // Cancel the conn pool request and close any excess pending requests. + upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::CloseExcess); upstream_handle_ = nullptr; } } diff --git a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc index 2764787163a5..f46f9a419ccf 100644 --- a/source/extensions/filters/network/thrift_proxy/router/router_impl.cc +++ b/source/extensions/filters/network/thrift_proxy/router/router_impl.cc @@ -383,7 +383,7 @@ FilterStatus Router::UpstreamRequest::start() { void Router::UpstreamRequest::resetStream() { if (conn_pool_handle_) { - conn_pool_handle_->cancel(); + conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default); } if (conn_data_ != nullptr) { diff --git a/test/common/tcp/conn_pool_test.cc b/test/common/tcp/conn_pool_test.cc index a3ba30df6331..1c3a1bf7f603 100644 --- a/test/common/tcp/conn_pool_test.cc +++ b/test/common/tcp/conn_pool_test.cc @@ -213,11 +213,16 @@ class TcpConnPoolImplDestructorTest : public testing::Test { * Helper for dealing with an active test connection. */ struct ActiveTestConn { - enum class Type { Pending, CreateConnection, Immediate }; + enum class Type { + Pending, // pending request, waiting for free connection + InProgress, // connection created, no callback + CreateConnection, // connection callback occurs after newConnection + Immediate, // connection callback occurs during newConnection + }; ActiveTestConn(TcpConnPoolImplTest& parent, size_t conn_index, Type type) : parent_(parent), conn_index_(conn_index) { - if (type == Type::CreateConnection) { + if (type == Type::CreateConnection || type == Type::InProgress) { parent.conn_pool_.expectConnCreate(); } @@ -234,14 +239,21 @@ struct ActiveTestConn { } if (type == Type::CreateConnection) { - EXPECT_CALL(*parent_.conn_pool_.test_conns_[conn_index_].connect_timer_, disableTimer()); - expectNewConn(); - parent.conn_pool_.test_conns_[conn_index_].connection_->raiseEvent( - Network::ConnectionEvent::Connected); - verifyConn(); + completeConnection(); } } + void completeConnection() { + ASSERT_FALSE(completed_); + + EXPECT_CALL(*parent_.conn_pool_.test_conns_[conn_index_].connect_timer_, disableTimer()); + expectNewConn(); + parent_.conn_pool_.test_conns_[conn_index_].connection_->raiseEvent( + Network::ConnectionEvent::Connected); + verifyConn(); + completed_ = true; + } + void expectNewConn() { EXPECT_CALL(callbacks_.pool_ready_, ready()); } void releaseConn() { callbacks_.conn_data_.reset(); } @@ -255,6 +267,7 @@ struct ActiveTestConn { size_t conn_index_; Tcp::ConnectionPool::Cancellable* handle_{}; ConnPoolCallbacks callbacks_; + bool completed_{}; }; /** @@ -262,16 +275,18 @@ struct ActiveTestConn { */ TEST_F(TcpConnPoolImplTest, DrainConnections) { cluster_->resource_manager_.reset( - new Upstream::ResourceManagerImpl(runtime_, "fake_key", 2, 1024, 1024, 1)); + new Upstream::ResourceManagerImpl(runtime_, "fake_key", 3, 1024, 1024, 1)); InSequence s; ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection); ActiveTestConn c2(*this, 1, ActiveTestConn::Type::CreateConnection); + ActiveTestConn c3(*this, 2, ActiveTestConn::Type::InProgress); EXPECT_CALL(conn_pool_, onConnReleasedForTest()); c1.releaseConn(); - // This will destroy the ready connection and set requests remaining to 1 on the busy connection. + // This will destroy the ready connection and set requests remaining to 1 on the busy and pending + // connections. EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); conn_pool_.drainConnections(); dispatcher_.clearDeferredDeleteList(); @@ -281,6 +296,15 @@ TEST_F(TcpConnPoolImplTest, DrainConnections) { EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); c2.releaseConn(); dispatcher_.clearDeferredDeleteList(); + + // This will destroy the pending connection when the response finishes. + c3.conn_index_ = 0; // c1/c2 have been deleted from test_conns_. + c3.completeConnection(); + + EXPECT_CALL(conn_pool_, onConnReleasedForTest()); + EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); + c3.releaseConn(); + dispatcher_.clearDeferredDeleteList(); } /** @@ -477,7 +501,7 @@ TEST_F(TcpConnPoolImplTest, MaxPendingRequests) { Tcp::ConnectionPool::Cancellable* handle2 = conn_pool_.newConnection(callbacks2); EXPECT_EQ(nullptr, handle2); - handle->cancel(); + handle->cancel(ConnectionPool::CancelPolicy::Default); EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); @@ -584,7 +608,7 @@ TEST_F(TcpConnPoolImplTest, CancelBeforeBound) { Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(callbacks); EXPECT_NE(nullptr, handle); - handle->cancel(); + handle->cancel(ConnectionPool::CancelPolicy::Default); conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); // Cause the connection to go away. @@ -593,6 +617,25 @@ TEST_F(TcpConnPoolImplTest, CancelBeforeBound) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test cancelling before the request is bound to a connection, with connection close. + */ +TEST_F(TcpConnPoolImplTest, CancelAndCloseBeforeBound) { + InSequence s; + + // Request 1 should kick off a new connection. + ConnPoolCallbacks callbacks; + conn_pool_.expectConnCreate(); + Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(callbacks); + EXPECT_NE(nullptr, handle); + + // Expect the connection is closed. + EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); + handle->cancel(ConnectionPool::CancelPolicy::CloseExcess); + + dispatcher_.clearDeferredDeleteList(); +} + /** * Test an upstream disconnection while there is a bound request. */ @@ -821,7 +864,7 @@ TEST_F(TcpConnPoolImplTest, DrainCallback) { ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection); ActiveTestConn c2(*this, 0, ActiveTestConn::Type::Pending); - c2.handle_->cancel(); + c2.handle_->cancel(ConnectionPool::CancelPolicy::Default); EXPECT_CALL(conn_pool_, onConnReleasedForTest()); EXPECT_CALL(drained, ready()); @@ -845,7 +888,7 @@ TEST_F(TcpConnPoolImplTest, DrainWhileConnecting) { EXPECT_NE(nullptr, handle); conn_pool_.addDrainedCallback([&]() -> void { drained.ready(); }); - handle->cancel(); + handle->cancel(ConnectionPool::CancelPolicy::Default); EXPECT_CALL(*conn_pool_.test_conns_[0].connection_, close(Network::ConnectionCloseType::NoFlush)); EXPECT_CALL(drained, ready()); conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::Connected); @@ -880,6 +923,25 @@ TEST_F(TcpConnPoolImplTest, DrainOnClose) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test that pending connections are closed when the connection pool is destroyed. + */ +TEST_F(TcpConnPoolImplDestructorTest, TestPendingConnectionsAreClosed) { + connection_ = new NiceMock(); + connect_timer_ = new NiceMock(&dispatcher_); + EXPECT_CALL(dispatcher_, createClientConnection_(_, _, _, _)).WillOnce(Return(connection_)); + EXPECT_CALL(*connect_timer_, enableTimer(_)); + + callbacks_ = std::make_unique(); + ConnectionPool::Cancellable* handle = conn_pool_->newConnection(*callbacks_); + EXPECT_NE(nullptr, handle); + + EXPECT_CALL(callbacks_->pool_failure_, ready()); + EXPECT_CALL(*connection_, close(Network::ConnectionCloseType::NoFlush)); + EXPECT_CALL(dispatcher_, clearDeferredDeleteList()); + conn_pool_.reset(); +} + /** * Test that busy connections are closed when the connection pool is destroyed. */ diff --git a/test/common/tcp_proxy/tcp_proxy_test.cc b/test/common/tcp_proxy/tcp_proxy_test.cc index bff4b1aad858..3fc5c2483559 100644 --- a/test/common/tcp_proxy/tcp_proxy_test.cc +++ b/test/common/tcp_proxy/tcp_proxy_test.cc @@ -758,9 +758,9 @@ TEST_F(TcpProxyTest, DisconnectBeforeData) { // Test that if the downstream connection is closed before the upstream connection // is established, the upstream connection is cancelled. -TEST_F(TcpProxyTest, RemoteClosetBeforeUpstreamConnected) { +TEST_F(TcpProxyTest, RemoteClosedBeforeUpstreamConnected) { setup(1); - EXPECT_CALL(*conn_pool_handles_.at(0), cancel()); + EXPECT_CALL(*conn_pool_handles_.at(0), cancel(Tcp::ConnectionPool::CancelPolicy::CloseExcess)); filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::RemoteClose); } @@ -768,7 +768,7 @@ TEST_F(TcpProxyTest, RemoteClosetBeforeUpstreamConnected) { // is established, the upstream connection is cancelled. TEST_F(TcpProxyTest, LocalClosetBeforeUpstreamConnected) { setup(1); - EXPECT_CALL(*conn_pool_handles_.at(0), cancel()); + EXPECT_CALL(*conn_pool_handles_.at(0), cancel(Tcp::ConnectionPool::CancelPolicy::CloseExcess)); filter_callbacks_.connection_.raiseEvent(Network::ConnectionEvent::LocalClose); } diff --git a/test/extensions/filters/network/thrift_proxy/router_test.cc b/test/extensions/filters/network/thrift_proxy/router_test.cc index 27a7bda20ee9..f0d1f2c5dc61 100644 --- a/test/extensions/filters/network/thrift_proxy/router_test.cc +++ b/test/extensions/filters/network/thrift_proxy/router_test.cc @@ -605,7 +605,8 @@ TEST_F(ThriftRouterTest, UnexpectedRouterDestroyBeforeUpstreamConnect) { startRequest(MessageType::Call); EXPECT_EQ(1, context_.cluster_manager_.tcp_conn_pool_.handles_.size()); - EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_.handles_.front(), cancel()); + EXPECT_CALL(context_.cluster_manager_.tcp_conn_pool_.handles_.front(), + cancel(Tcp::ConnectionPool::CancelPolicy::Default)); destroyRouter(); } diff --git a/test/mocks/tcp/mocks.h b/test/mocks/tcp/mocks.h index 8abd46bf6188..d233cc766b69 100644 --- a/test/mocks/tcp/mocks.h +++ b/test/mocks/tcp/mocks.h @@ -21,7 +21,7 @@ class MockCancellable : public Cancellable { ~MockCancellable(); // Tcp::ConnectionPool::Cancellable - MOCK_METHOD0(cancel, void()); + MOCK_METHOD1(cancel, void(CancelPolicy cancel_policy)); }; class MockUpstreamCallbacks : public UpstreamCallbacks {