Skip to content

Commit

Permalink
tcp: allow connection pool callers to store protocol state (envoyprox…
Browse files Browse the repository at this point in the history
…y#4131)

Allows protocol-specific state to be stored for the duration of a
connection's life. For example, a proxy filter may store negotiated
flags across connection usages.

*Risk Level*: low, existing functionality unchanged
*Testing*: unit tests
*Docs Changes*: n/a
*Release Notes*: n/a

Signed-off-by: Stephan Zuercher <stephan@turbinelabs.io>
  • Loading branch information
zuercher authored Aug 16, 2018
1 parent 3e1d643 commit 564d256
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 1 deletion.
34 changes: 34 additions & 0 deletions include/envoy/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ class UpstreamCallbacks : public Network::ConnectionCallbacks {
virtual void onUpstreamData(Buffer::Instance& data, bool end_stream) PURE;
};

/**
* ConnectionState is a base class for connection state maintained across requests. For example, a
* protocol may maintain a connection-specific request sequence number or negotiate options that
* affect the behavior of requests for the duration of the connection. A ConnectionState subclass
* is assigned to the ConnectionData to track this state when the connection is returned to the
* pool so that the state is available when the connection is re-used for a subsequent request.
* The ConnectionState assigned to a connection is automatically destroyed when the connection is
* closed.
*/
class ConnectionState {
public:
virtual ~ConnectionState() {}
};

typedef std::unique_ptr<ConnectionState> ConnectionStatePtr;

/*
* ConnectionData wraps a ClientConnection allocated to a caller. Open ClientConnections are
* released back to the pool for re-use when their containing ConnectionData is destroyed.
Expand All @@ -70,13 +86,31 @@ class ConnectionData {
*/
virtual Network::ClientConnection& connection() PURE;

/**
* Sets the ConnectionState for this connection. Any existing ConnectionState is destroyed.
* @param ConnectionStatePtr&& new ConnectionState for this connection.
*/
virtual void setConnectionState(ConnectionStatePtr&& state) PURE;

/**
* @return T* the current ConnectionState or nullptr if no state is set or if the state's type
* is not T.
*/
template <class T> T* connectionStateTyped() { return dynamic_cast<T*>(connectionState()); }

/**
* Sets the ConnectionPool::UpstreamCallbacks for the connection. If no callback is attached,
* data from the upstream will cause the connection to be closed. Callbacks cease when the
* connection is released.
* @param callback the UpstreamCallbacks to invoke for upstream data
*/
virtual void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callback) PURE;

protected:
/**
* @return ConnectionState* pointer to the current ConnectionState or nullptr if not set
*/
virtual ConnectionState* connectionState() PURE;
};

typedef std::unique_ptr<ConnectionData> ConnectionDataPtr;
Expand Down
17 changes: 17 additions & 0 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::

Network::ClientConnection& connection();
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks);
void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) {
parent_.setConnectionState(std::move(state));
};
ConnectionPool::ConnectionState* connectionState() { return parent_.connectionState(); }

void release(bool closed);

void invalidate() { conn_valid_ = false; }
Expand All @@ -60,6 +65,12 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override {
wrapper_->addUpstreamCallbacks(callbacks);
};
void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) override {
wrapper_->setConnectionState(std::move(state));
}
ConnectionPool::ConnectionState* connectionState() override {
return wrapper_->connectionState();
}

ConnectionWrapperSharedPtr wrapper_;
};
Expand Down Expand Up @@ -90,10 +101,16 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void onAboveWriteBufferHighWatermark() override;
void onBelowWriteBufferLowWatermark() override;

void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) {
conn_state_ = std::move(state);
}
ConnectionPool::ConnectionState* connectionState() { return conn_state_.get(); }

ConnPoolImpl& parent_;
Upstream::HostDescriptionConstSharedPtr real_host_description_;
ConnectionWrapperSharedPtr wrapper_;
Network::ClientConnectionPtr conn_;
ConnectionPool::ConnectionStatePtr conn_state_;
Event::TimerPtr connect_timer_;
Stats::TimespanPtr conn_length_;
uint64_t remaining_requests_;
Expand Down
134 changes: 133 additions & 1 deletion test/common/tcp/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ using testing::_;

namespace Envoy {
namespace Tcp {
namespace {

struct TestConnectionState : public ConnectionPool::ConnectionState {
TestConnectionState(int id, std::function<void()> on_destructor)
: id_(id), on_destructor_(on_destructor) {}
~TestConnectionState() { on_destructor_(); }

int id_;
std::function<void()> on_destructor_;
};

} // namespace

/**
* Mock callbacks used for conn pool testing.
Expand Down Expand Up @@ -309,6 +321,9 @@ TEST_F(TcpConnPoolImplTest, VerifyBufferLimits) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that upstream callback fire for assigned connections.
*/
TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) {
Buffer::OwnedImpl buffer;

Expand Down Expand Up @@ -343,6 +358,9 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that upstream callback close event fires for assigned connections.
*/
TEST_F(TcpConnPoolImplTest, UpstreamCallbacksCloseEvent) {
Buffer::OwnedImpl buffer;

Expand All @@ -360,6 +378,9 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacksCloseEvent) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that a connection pool functions without upstream callbacks.
*/
TEST_F(TcpConnPoolImplTest, NoUpstreamCallbacks) {
Buffer::OwnedImpl buffer;

Expand Down Expand Up @@ -400,6 +421,45 @@ TEST_F(TcpConnPoolImplTest, MultipleRequestAndResponse) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Tests ConnectionState assignment, lookup and destruction.
*/
TEST_F(TcpConnPoolImplTest, ConnectionStateLifecycle) {
InSequence s;

bool state_destroyed = false;

// Request 1 should kick off a new connection.
ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection);

auto* state = new TestConnectionState(1, [&]() -> void { state_destroyed = true; });
c1.callbacks_.conn_data_->setConnectionState(std::unique_ptr<TestConnectionState>(state));

EXPECT_EQ(state, c1.callbacks_.conn_data_->connectionStateTyped<TestConnectionState>());

EXPECT_CALL(conn_pool_, onConnReleasedForTest());
c1.releaseConn();

EXPECT_FALSE(state_destroyed);

// Request 2 should not.
ActiveTestConn c2(*this, 0, ActiveTestConn::Type::Immediate);

EXPECT_EQ(state, c2.callbacks_.conn_data_->connectionStateTyped<TestConnectionState>());

EXPECT_CALL(conn_pool_, onConnReleasedForTest());
c2.releaseConn();

EXPECT_FALSE(state_destroyed);

// Cause the connection to go away.
EXPECT_CALL(conn_pool_, onConnDestroyedForTest());
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_TRUE(state_destroyed);
}

/**
* Test when we overflow max pending requests.
*/
Expand Down Expand Up @@ -555,6 +615,9 @@ TEST_F(TcpConnPoolImplTest, DisconnectWhileBound) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test upstream disconnection of one request while another is pending.
*/
TEST_F(TcpConnPoolImplTest, DisconnectWhilePending) {
InSequence s;

Expand Down Expand Up @@ -664,6 +727,9 @@ TEST_F(TcpConnPoolImplTest, MaxRequestsPerConnection) {
EXPECT_EQ(1U, cluster_->stats_.upstream_cx_max_requests_.value());
}

/*
* Test that multiple connections can be assigned at once.
*/
TEST_F(TcpConnPoolImplTest, ConcurrentConnections) {
InSequence s;

Expand Down Expand Up @@ -691,6 +757,61 @@ TEST_F(TcpConnPoolImplTest, ConcurrentConnections) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Tests ConnectionState lifecycle with multiple concurrent connections.
*/
TEST_F(TcpConnPoolImplTest, ConnectionStateWithConcurrentConnections) {
InSequence s;

int state_destroyed = 0;
auto* s1 = new TestConnectionState(1, [&]() -> void { state_destroyed |= 1; });
auto* s2 = new TestConnectionState(2, [&]() -> void { state_destroyed |= 2; });
auto* s3 = new TestConnectionState(2, [&]() -> void { state_destroyed |= 4; });

cluster_->resource_manager_.reset(
new Upstream::ResourceManagerImpl(runtime_, "fake_key", 2, 1024, 1024, 1));
ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection);
c1.callbacks_.conn_data_->setConnectionState(std::unique_ptr<TestConnectionState>(s1));
ActiveTestConn c2(*this, 1, ActiveTestConn::Type::CreateConnection);
c2.callbacks_.conn_data_->setConnectionState(std::unique_ptr<TestConnectionState>(s2));
ActiveTestConn c3(*this, 0, ActiveTestConn::Type::Pending);

EXPECT_EQ(0, state_destroyed);

// Finish c1, which gets c3 going.
EXPECT_CALL(conn_pool_, onConnReleasedForTest());
conn_pool_.expectEnableUpstreamReady();
c3.expectNewConn();
c1.releaseConn();

conn_pool_.expectAndRunUpstreamReady();

// c3 now has the state set by c1.
EXPECT_EQ(s1, c3.callbacks_.conn_data_->connectionStateTyped<TestConnectionState>());
EXPECT_EQ(s2, c2.callbacks_.conn_data_->connectionStateTyped<TestConnectionState>());

// replace c3's state
c3.callbacks_.conn_data_->setConnectionState(std::unique_ptr<TestConnectionState>(s3));
EXPECT_EQ(1, state_destroyed);

EXPECT_CALL(conn_pool_, onConnReleasedForTest()).Times(2);
c2.releaseConn();
c3.releaseConn();

EXPECT_EQ(1, state_destroyed);

// Disconnect both connections.
EXPECT_CALL(conn_pool_, onConnDestroyedForTest()).Times(2);
conn_pool_.test_conns_[1].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(7, state_destroyed);
}

/**
* Tests that the DrainCallback is invoked when the number of connections goes to zero.
*/
TEST_F(TcpConnPoolImplTest, DrainCallback) {
InSequence s;
ReadyWatcher drained;
Expand All @@ -711,7 +832,9 @@ TEST_F(TcpConnPoolImplTest, DrainCallback) {
dispatcher_.clearDeferredDeleteList();
}

// Test draining a connection pool that has a pending connection.
/**
* Test draining a connection pool that has a pending connection.
*/
TEST_F(TcpConnPoolImplTest, DrainWhileConnecting) {
InSequence s;
ReadyWatcher drained;
Expand All @@ -731,6 +854,9 @@ TEST_F(TcpConnPoolImplTest, DrainWhileConnecting) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that the DrainCallback is invoked when a connection is closed.
*/
TEST_F(TcpConnPoolImplTest, DrainOnClose) {
ReadyWatcher drained;
EXPECT_CALL(drained, ready());
Expand All @@ -754,6 +880,9 @@ TEST_F(TcpConnPoolImplTest, DrainOnClose) {
dispatcher_.clearDeferredDeleteList();
}

/**
* Test that busy connections are closed when the connection pool is destroyed.
*/
TEST_F(TcpConnPoolImplDestructorTest, TestBusyConnectionsAreClosed) {
prepareConn();

Expand All @@ -762,6 +891,9 @@ TEST_F(TcpConnPoolImplDestructorTest, TestBusyConnectionsAreClosed) {
conn_pool_.reset();
}

/**
* Test that ready connections are closed when the connection pool is destroyed.
*/
TEST_F(TcpConnPoolImplDestructorTest, TestReadyConnectionsAreClosed) {
prepareConn();

Expand Down
4 changes: 4 additions & 0 deletions test/mocks/tcp/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ class MockConnectionData : public ConnectionData {
// Tcp::ConnectionPool::ConnectionData
MOCK_METHOD0(connection, Network::ClientConnection&());
MOCK_METHOD1(addUpstreamCallbacks, void(ConnectionPool::UpstreamCallbacks&));
void setConnectionState(ConnectionStatePtr&& state) override { setConnectionState_(state); }
MOCK_METHOD0(connectionState, ConnectionPool::ConnectionState*());

MOCK_METHOD1(setConnectionState_, void(ConnectionPool::ConnectionStatePtr& state));

// If set, invoked in ~MockConnectionData, which indicates that the connection pool
// caller has relased a connection.
Expand Down

0 comments on commit 564d256

Please sign in to comment.