diff --git a/include/envoy/tcp/conn_pool.h b/include/envoy/tcp/conn_pool.h index 8237af37fea3..15e2eaacc377 100644 --- a/include/envoy/tcp/conn_pool.h +++ b/include/envoy/tcp/conn_pool.h @@ -57,6 +57,22 @@ class UpstreamCallbacks : public Network::ConnectionCallbacks { virtual void onUpstreamData(Buffer::Instance& data, bool end_stream) PURE; }; +/** + * ConnectionState is a base class for connection state maintained across requests. For example, a + * protocol may maintain a connection-specific request sequence number or negotiate options that + * affect the behavior of requests for the duration of the connection. A ConnectionState subclass + * is assigned to the ConnectionData to track this state when the connection is returned to the + * pool so that the state is available when the connection is re-used for a subsequent request. + * The ConnectionState assigned to a connection is automatically destroyed when the connection is + * closed. + */ +class ConnectionState { +public: + virtual ~ConnectionState() {} +}; + +typedef std::unique_ptr ConnectionStatePtr; + /* * ConnectionData wraps a ClientConnection allocated to a caller. Open ClientConnections are * released back to the pool for re-use when their containing ConnectionData is destroyed. @@ -70,6 +86,18 @@ class ConnectionData { */ virtual Network::ClientConnection& connection() PURE; + /** + * Sets the ConnectionState for this connection. Any existing ConnectionState is destroyed. + * @param ConnectionStatePtr&& new ConnectionState for this connection. + */ + virtual void setConnectionState(ConnectionStatePtr&& state) PURE; + + /** + * @return T* the current ConnectionState or nullptr if no state is set or if the state's type + * is not T. + */ + template T* connectionStateTyped() { return dynamic_cast(connectionState()); } + /** * Sets the ConnectionPool::UpstreamCallbacks for the connection. If no callback is attached, * data from the upstream will cause the connection to be closed. Callbacks cease when the @@ -77,6 +105,12 @@ class ConnectionData { * @param callback the UpstreamCallbacks to invoke for upstream data */ virtual void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callback) PURE; + +protected: + /** + * @return ConnectionState* pointer to the current ConnectionState or nullptr if not set + */ + virtual ConnectionState* connectionState() PURE; }; typedef std::unique_ptr ConnectionDataPtr; diff --git a/source/common/tcp/conn_pool.h b/source/common/tcp/conn_pool.h index 42f8dc52bd7b..ff74c2d85847 100644 --- a/source/common/tcp/conn_pool.h +++ b/source/common/tcp/conn_pool.h @@ -39,6 +39,11 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: Network::ClientConnection& connection(); void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks); + void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) { + parent_.setConnectionState(std::move(state)); + }; + ConnectionPool::ConnectionState* connectionState() { return parent_.connectionState(); } + void release(bool closed); void invalidate() { conn_valid_ = false; } @@ -60,6 +65,12 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override { wrapper_->addUpstreamCallbacks(callbacks); }; + void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) override { + wrapper_->setConnectionState(std::move(state)); + } + ConnectionPool::ConnectionState* connectionState() override { + return wrapper_->connectionState(); + } ConnectionWrapperSharedPtr wrapper_; }; @@ -90,10 +101,16 @@ class ConnPoolImpl : Logger::Loggable, public ConnectionPool:: void onAboveWriteBufferHighWatermark() override; void onBelowWriteBufferLowWatermark() override; + void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) { + conn_state_ = std::move(state); + } + ConnectionPool::ConnectionState* connectionState() { return conn_state_.get(); } + ConnPoolImpl& parent_; Upstream::HostDescriptionConstSharedPtr real_host_description_; ConnectionWrapperSharedPtr wrapper_; Network::ClientConnectionPtr conn_; + ConnectionPool::ConnectionStatePtr conn_state_; Event::TimerPtr connect_timer_; Stats::TimespanPtr conn_length_; uint64_t remaining_requests_; diff --git a/test/common/tcp/conn_pool_test.cc b/test/common/tcp/conn_pool_test.cc index 6b275d59c6fe..1e0f9218f7e6 100644 --- a/test/common/tcp/conn_pool_test.cc +++ b/test/common/tcp/conn_pool_test.cc @@ -29,6 +29,18 @@ using testing::_; namespace Envoy { namespace Tcp { +namespace { + +struct TestConnectionState : public ConnectionPool::ConnectionState { + TestConnectionState(int id, std::function on_destructor) + : id_(id), on_destructor_(on_destructor) {} + ~TestConnectionState() { on_destructor_(); } + + int id_; + std::function on_destructor_; +}; + +} // namespace /** * Mock callbacks used for conn pool testing. @@ -309,6 +321,9 @@ TEST_F(TcpConnPoolImplTest, VerifyBufferLimits) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test that upstream callback fire for assigned connections. + */ TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) { Buffer::OwnedImpl buffer; @@ -343,6 +358,9 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test that upstream callback close event fires for assigned connections. + */ TEST_F(TcpConnPoolImplTest, UpstreamCallbacksCloseEvent) { Buffer::OwnedImpl buffer; @@ -360,6 +378,9 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacksCloseEvent) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test that a connection pool functions without upstream callbacks. + */ TEST_F(TcpConnPoolImplTest, NoUpstreamCallbacks) { Buffer::OwnedImpl buffer; @@ -400,6 +421,45 @@ TEST_F(TcpConnPoolImplTest, MultipleRequestAndResponse) { dispatcher_.clearDeferredDeleteList(); } +/** + * Tests ConnectionState assignment, lookup and destruction. + */ +TEST_F(TcpConnPoolImplTest, ConnectionStateLifecycle) { + InSequence s; + + bool state_destroyed = false; + + // Request 1 should kick off a new connection. + ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection); + + auto* state = new TestConnectionState(1, [&]() -> void { state_destroyed = true; }); + c1.callbacks_.conn_data_->setConnectionState(std::unique_ptr(state)); + + EXPECT_EQ(state, c1.callbacks_.conn_data_->connectionStateTyped()); + + EXPECT_CALL(conn_pool_, onConnReleasedForTest()); + c1.releaseConn(); + + EXPECT_FALSE(state_destroyed); + + // Request 2 should not. + ActiveTestConn c2(*this, 0, ActiveTestConn::Type::Immediate); + + EXPECT_EQ(state, c2.callbacks_.conn_data_->connectionStateTyped()); + + EXPECT_CALL(conn_pool_, onConnReleasedForTest()); + c2.releaseConn(); + + EXPECT_FALSE(state_destroyed); + + // Cause the connection to go away. + EXPECT_CALL(conn_pool_, onConnDestroyedForTest()); + conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + dispatcher_.clearDeferredDeleteList(); + + EXPECT_TRUE(state_destroyed); +} + /** * Test when we overflow max pending requests. */ @@ -555,6 +615,9 @@ TEST_F(TcpConnPoolImplTest, DisconnectWhileBound) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test upstream disconnection of one request while another is pending. + */ TEST_F(TcpConnPoolImplTest, DisconnectWhilePending) { InSequence s; @@ -664,6 +727,9 @@ TEST_F(TcpConnPoolImplTest, MaxRequestsPerConnection) { EXPECT_EQ(1U, cluster_->stats_.upstream_cx_max_requests_.value()); } +/* + * Test that multiple connections can be assigned at once. + */ TEST_F(TcpConnPoolImplTest, ConcurrentConnections) { InSequence s; @@ -691,6 +757,61 @@ TEST_F(TcpConnPoolImplTest, ConcurrentConnections) { dispatcher_.clearDeferredDeleteList(); } +/** + * Tests ConnectionState lifecycle with multiple concurrent connections. + */ +TEST_F(TcpConnPoolImplTest, ConnectionStateWithConcurrentConnections) { + InSequence s; + + int state_destroyed = 0; + auto* s1 = new TestConnectionState(1, [&]() -> void { state_destroyed |= 1; }); + auto* s2 = new TestConnectionState(2, [&]() -> void { state_destroyed |= 2; }); + auto* s3 = new TestConnectionState(2, [&]() -> void { state_destroyed |= 4; }); + + cluster_->resource_manager_.reset( + new Upstream::ResourceManagerImpl(runtime_, "fake_key", 2, 1024, 1024, 1)); + ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection); + c1.callbacks_.conn_data_->setConnectionState(std::unique_ptr(s1)); + ActiveTestConn c2(*this, 1, ActiveTestConn::Type::CreateConnection); + c2.callbacks_.conn_data_->setConnectionState(std::unique_ptr(s2)); + ActiveTestConn c3(*this, 0, ActiveTestConn::Type::Pending); + + EXPECT_EQ(0, state_destroyed); + + // Finish c1, which gets c3 going. + EXPECT_CALL(conn_pool_, onConnReleasedForTest()); + conn_pool_.expectEnableUpstreamReady(); + c3.expectNewConn(); + c1.releaseConn(); + + conn_pool_.expectAndRunUpstreamReady(); + + // c3 now has the state set by c1. + EXPECT_EQ(s1, c3.callbacks_.conn_data_->connectionStateTyped()); + EXPECT_EQ(s2, c2.callbacks_.conn_data_->connectionStateTyped()); + + // replace c3's state + c3.callbacks_.conn_data_->setConnectionState(std::unique_ptr(s3)); + EXPECT_EQ(1, state_destroyed); + + EXPECT_CALL(conn_pool_, onConnReleasedForTest()).Times(2); + c2.releaseConn(); + c3.releaseConn(); + + EXPECT_EQ(1, state_destroyed); + + // Disconnect both connections. + EXPECT_CALL(conn_pool_, onConnDestroyedForTest()).Times(2); + conn_pool_.test_conns_[1].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose); + dispatcher_.clearDeferredDeleteList(); + + EXPECT_EQ(7, state_destroyed); +} + +/** + * Tests that the DrainCallback is invoked when the number of connections goes to zero. + */ TEST_F(TcpConnPoolImplTest, DrainCallback) { InSequence s; ReadyWatcher drained; @@ -711,7 +832,9 @@ TEST_F(TcpConnPoolImplTest, DrainCallback) { dispatcher_.clearDeferredDeleteList(); } -// Test draining a connection pool that has a pending connection. +/** + * Test draining a connection pool that has a pending connection. + */ TEST_F(TcpConnPoolImplTest, DrainWhileConnecting) { InSequence s; ReadyWatcher drained; @@ -731,6 +854,9 @@ TEST_F(TcpConnPoolImplTest, DrainWhileConnecting) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test that the DrainCallback is invoked when a connection is closed. + */ TEST_F(TcpConnPoolImplTest, DrainOnClose) { ReadyWatcher drained; EXPECT_CALL(drained, ready()); @@ -754,6 +880,9 @@ TEST_F(TcpConnPoolImplTest, DrainOnClose) { dispatcher_.clearDeferredDeleteList(); } +/** + * Test that busy connections are closed when the connection pool is destroyed. + */ TEST_F(TcpConnPoolImplDestructorTest, TestBusyConnectionsAreClosed) { prepareConn(); @@ -762,6 +891,9 @@ TEST_F(TcpConnPoolImplDestructorTest, TestBusyConnectionsAreClosed) { conn_pool_.reset(); } +/** + * Test that ready connections are closed when the connection pool is destroyed. + */ TEST_F(TcpConnPoolImplDestructorTest, TestReadyConnectionsAreClosed) { prepareConn(); diff --git a/test/mocks/tcp/mocks.h b/test/mocks/tcp/mocks.h index 3fb969f088b2..8abd46bf6188 100644 --- a/test/mocks/tcp/mocks.h +++ b/test/mocks/tcp/mocks.h @@ -44,6 +44,10 @@ class MockConnectionData : public ConnectionData { // Tcp::ConnectionPool::ConnectionData MOCK_METHOD0(connection, Network::ClientConnection&()); MOCK_METHOD1(addUpstreamCallbacks, void(ConnectionPool::UpstreamCallbacks&)); + void setConnectionState(ConnectionStatePtr&& state) override { setConnectionState_(state); } + MOCK_METHOD0(connectionState, ConnectionPool::ConnectionState*()); + + MOCK_METHOD1(setConnectionState_, void(ConnectionPool::ConnectionStatePtr& state)); // If set, invoked in ~MockConnectionData, which indicates that the connection pool // caller has relased a connection.