Skip to content

Commit

Permalink
tcp/conn_pool: improve interface for callers (#3903)
Browse files Browse the repository at this point in the history
tcp/conn_pool: improve interface for callers

Provides additional pool failure reasons to allow Tcp::ConnectionPool
callers to distinguish between time outs and connection failures.
Passes through connection and buffer watermark events.

A subsequent PR will switch the TCP proxy to use the TCP connection
pool (and makes use of these features). Relates to #3818.

*Risk Level*: low
*Testing*: unit tests
*Docs Changes*: n/a
*Release Notes*: n/a

Signed-off-by: Stephan Zuercher <stephan@turbinelabs.io>
  • Loading branch information
zuercher authored Jul 23, 2018
1 parent e021e4d commit 7c11e92
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 23 deletions.
22 changes: 14 additions & 8 deletions include/envoy/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,25 @@ class Cancellable {
};

/**
* Reason that a pool stream could not be obtained.
* Reason that a pool connection could not be obtained.
*/
enum class PoolFailureReason {
// A resource overflowed and policy prevented a new stream from being created.
// A resource overflowed and policy prevented a new connection from being created.
Overflow,
// A connection failure took place and the stream could not be bound.
ConnectionFailure
// A local connection failure took place while creating a new connection.
LocalConnectionFailure,
// A remote connection failure took place while creating a new connection.
RemoteConnectionFailure,
// A timeout occurred while creating a new connection.
Timeout,
};

/*
* UpstreamCallbacks for connection pool upstream connection callbacks.
* UpstreamCallbacks for connection pool upstream connection callbacks and data. Note that
* onEvent(Connected) is never triggered since the event always occurs before a ConnectionPool
* caller is assigned a connection.
*/
class UpstreamCallbacks {
class UpstreamCallbacks : public Network::ConnectionCallbacks {
public:
virtual ~UpstreamCallbacks() {}

Expand Down Expand Up @@ -122,14 +128,14 @@ class Instance : public Event::DeferredDeletable {
/**
* Register a callback that gets called when the connection pool is fully drained. No actual
* draining is done. The owner of the connection pool is responsible for not creating any
* new streams.
* new connections.
*/
virtual void addDrainedCallback(DrainedCb cb) PURE;

/**
* Actively drain all existing connection pool connections. This method can be used in cases
* where the connection pool is not being destroyed, but the caller wishes to make sure that
* all new streams take place on a new connection. For example, when a health check failure
* all new requests take place on a new connection. For example, when a health check failure
* occurs.
*/
virtual void drainConnections() PURE;
Expand Down
45 changes: 41 additions & 4 deletions source/common/tcp/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent
if (event == Network::ConnectionEvent::RemoteClose ||
event == Network::ConnectionEvent::LocalClose) {
ENVOY_CONN_LOG(debug, "client disconnected", *conn.conn_);

if (event == Network::ConnectionEvent::LocalClose) {
host_->cluster().stats().upstream_cx_destroy_local_.inc();
}
if (event == Network::ConnectionEvent::RemoteClose) {
host_->cluster().stats().upstream_cx_destroy_remote_.inc();
}
host_->cluster().stats().upstream_cx_destroy_.inc();

ActiveConnPtr removed;
bool check_for_drained = true;
if (conn.wrapper_ != nullptr) {
Expand Down Expand Up @@ -135,13 +144,21 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent
// do with the request.
// NOTE: We move the existing pending requests to a temporary list. This is done so that
// if retry logic submits a new request to the pool, we don't fail it inline.
ConnectionPool::PoolFailureReason reason;
if (conn.timed_out_) {
reason = ConnectionPool::PoolFailureReason::Timeout;
} else if (event == Network::ConnectionEvent::RemoteClose) {
reason = ConnectionPool::PoolFailureReason::RemoteConnectionFailure;
} else {
reason = ConnectionPool::PoolFailureReason::LocalConnectionFailure;
}

std::list<PendingRequestPtr> pending_requests_to_purge(std::move(pending_requests_));
while (!pending_requests_to_purge.empty()) {
PendingRequestPtr request =
pending_requests_to_purge.front()->removeFromList(pending_requests_to_purge);
host_->cluster().stats().upstream_rq_pending_failure_eject_.inc();
request->callbacks_.onPoolFailure(ConnectionPool::PoolFailureReason::ConnectionFailure,
conn.real_host_description_);
request->callbacks_.onPoolFailure(reason, conn.real_host_description_);
}
}

Expand Down Expand Up @@ -277,7 +294,7 @@ ConnPoolImpl::PendingRequest::~PendingRequest() {
ConnPoolImpl::ActiveConn::ActiveConn(ConnPoolImpl& parent)
: parent_(parent),
connect_timer_(parent_.dispatcher_.createTimer([this]() -> void { onConnectTimeout(); })),
remaining_requests_(parent_.host_->cluster().maxRequestsPerConnection()) {
remaining_requests_(parent_.host_->cluster().maxRequestsPerConnection()), timed_out_(false) {

parent_.conn_connect_ms_.reset(
new Stats::Timespan(parent_.host_->cluster().stats().upstream_cx_connect_ms_));
Expand All @@ -297,7 +314,6 @@ ConnPoolImpl::ActiveConn::ActiveConn(ConnPoolImpl& parent)

parent_.host_->cluster().stats().upstream_cx_total_.inc();
parent_.host_->cluster().stats().upstream_cx_active_.inc();
parent_.host_->cluster().stats().upstream_cx_http1_total_.inc();
parent_.host_->stats().cx_total_.inc();
parent_.host_->stats().cx_active_.inc();
conn_length_.reset(new Stats::Timespan(parent_.host_->cluster().stats().upstream_cx_length_ms_));
Expand Down Expand Up @@ -328,6 +344,7 @@ void ConnPoolImpl::ActiveConn::onConnectTimeout() {
// We just close the connection at this point. This will result in both a timeout and a connect
// failure and will fold into all the normal connect failure logic.
ENVOY_CONN_LOG(debug, "connect timeout", *conn_);
timed_out_ = true;
parent_.host_->cluster().stats().upstream_cx_connect_timeout_.inc();
conn_->close(Network::ConnectionCloseType::NoFlush);
}
Expand All @@ -343,5 +360,25 @@ void ConnPoolImpl::ActiveConn::onUpstreamData(Buffer::Instance& data, bool end_s
}
}

void ConnPoolImpl::ActiveConn::onEvent(Network::ConnectionEvent event) {
if (wrapper_ != nullptr && wrapper_->callbacks_ != nullptr) {
wrapper_->callbacks_->onEvent(event);
}

parent_.onConnectionEvent(*this, event);
}

void ConnPoolImpl::ActiveConn::onAboveWriteBufferHighWatermark() {
if (wrapper_ != nullptr && wrapper_->callbacks_ != nullptr) {
wrapper_->callbacks_->onAboveWriteBufferHighWatermark();
}
}

void ConnPoolImpl::ActiveConn::onBelowWriteBufferLowWatermark() {
if (wrapper_ != nullptr && wrapper_->callbacks_ != nullptr) {
wrapper_->callbacks_->onBelowWriteBufferLowWatermark();
}
}

} // namespace Tcp
} // namespace Envoy
9 changes: 4 additions & 5 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,9 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void onUpstreamData(Buffer::Instance& data, bool end_stream);

// Network::ConnectionCallbacks
void onEvent(Network::ConnectionEvent event) override {
parent_.onConnectionEvent(*this, event);
}
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}
void onEvent(Network::ConnectionEvent event) override;
void onAboveWriteBufferHighWatermark() override;
void onBelowWriteBufferLowWatermark() override;

ConnPoolImpl& parent_;
Upstream::HostDescriptionConstSharedPtr real_host_description_;
Expand All @@ -85,6 +83,7 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
Event::TimerPtr connect_timer_;
Stats::TimespanPtr conn_length_;
uint64_t remaining_requests_;
bool timed_out_;
};

typedef std::unique_ptr<ActiveConn> ActiveConnPtr;
Expand Down
68 changes: 65 additions & 3 deletions test/common/tcp/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ struct ConnPoolCallbacks : public Tcp::ConnectionPool::Callbacks {
pool_ready_.ready();
}

void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason,
void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason reason,
Upstream::HostDescriptionConstSharedPtr host) override {
reason_ = reason;
host_ = host;
pool_failure_.ready();
}

ReadyWatcher pool_failure_;
ReadyWatcher pool_ready_;
ConnectionPool::ConnectionData* conn_data_;
ConnectionPool::ConnectionData* conn_data_{};
absl::optional<ConnectionPool::PoolFailureReason> reason_;
Upstream::HostDescriptionConstSharedPtr host_;
};

Expand Down Expand Up @@ -322,6 +324,16 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) {
EXPECT_EQ(Network::FilterStatus::StopIteration,
conn_pool_.test_conns_[0].filter_->onData(buffer, false));

EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark());
for (auto* cb : conn_pool_.test_conns_[0].connection_->callbacks_) {
cb->onAboveWriteBufferHighWatermark();
}

EXPECT_CALL(callbacks, onBelowWriteBufferLowWatermark());
for (auto* cb : conn_pool_.test_conns_[0].connection_->callbacks_) {
cb->onBelowWriteBufferLowWatermark();
}

// Shutdown normally.
EXPECT_CALL(conn_pool_, onConnReleasedForTest());
c1.releaseConn();
Expand All @@ -331,6 +343,23 @@ TEST_F(TcpConnPoolImplTest, UpstreamCallbacks) {
dispatcher_.clearDeferredDeleteList();
}

TEST_F(TcpConnPoolImplTest, UpstreamCallbacksCloseEvent) {
Buffer::OwnedImpl buffer;

InSequence s;
ConnectionPool::MockUpstreamCallbacks callbacks;

// Create connection, set UpstreamCallbacks
ActiveTestConn c1(*this, 0, ActiveTestConn::Type::CreateConnection);
c1.callbacks_.conn_data_->addUpstreamCallbacks(callbacks);

EXPECT_CALL(callbacks, onEvent(Network::ConnectionEvent::RemoteClose));

EXPECT_CALL(conn_pool_, onConnDestroyedForTest());
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();
}

TEST_F(TcpConnPoolImplTest, NoUpstreamCallbacks) {
Buffer::OwnedImpl buffer;

Expand Down Expand Up @@ -394,14 +423,16 @@ TEST_F(TcpConnPoolImplTest, MaxPendingRequests) {
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(ConnectionPool::PoolFailureReason::Overflow, callbacks2.reason_);

EXPECT_EQ(1U, cluster_->stats_.upstream_rq_pending_overflow_.value());
}

/**
* Tests a connection failure before a request is bound which should result in the pending request
* getting purged.
*/
TEST_F(TcpConnPoolImplTest, ConnectFailure) {
TEST_F(TcpConnPoolImplTest, RemoteConnectFailure) {
InSequence s;

// Request 1 should kick off a new connection.
Expand All @@ -417,6 +448,34 @@ TEST_F(TcpConnPoolImplTest, ConnectFailure) {
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::RemoteClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(ConnectionPool::PoolFailureReason::RemoteConnectionFailure, callbacks.reason_);

EXPECT_EQ(1U, cluster_->stats_.upstream_cx_connect_fail_.value());
EXPECT_EQ(1U, cluster_->stats_.upstream_rq_pending_failure_eject_.value());
}

/**
* Tests a connection failure before a request is bound which should result in the pending request
* getting purged.
*/
TEST_F(TcpConnPoolImplTest, LocalConnectFailure) {
InSequence s;

// Request 1 should kick off a new connection.
ConnPoolCallbacks callbacks;
conn_pool_.expectConnCreate();
Tcp::ConnectionPool::Cancellable* handle = conn_pool_.newConnection(callbacks);
EXPECT_NE(nullptr, handle);

EXPECT_CALL(callbacks.pool_failure_, ready());
EXPECT_CALL(*conn_pool_.test_conns_[0].connect_timer_, disableTimer());

EXPECT_CALL(conn_pool_, onConnDestroyedForTest());
conn_pool_.test_conns_[0].connection_->raiseEvent(Network::ConnectionEvent::LocalClose);
dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(ConnectionPool::PoolFailureReason::LocalConnectionFailure, callbacks.reason_);

EXPECT_EQ(1U, cluster_->stats_.upstream_cx_connect_fail_.value());
EXPECT_EQ(1U, cluster_->stats_.upstream_rq_pending_failure_eject_.value());
}
Expand Down Expand Up @@ -446,6 +505,9 @@ TEST_F(TcpConnPoolImplTest, ConnectTimeout) {
EXPECT_CALL(conn_pool_, onConnDestroyedForTest()).Times(2);
dispatcher_.clearDeferredDeleteList();

EXPECT_EQ(ConnectionPool::PoolFailureReason::Timeout, callbacks1.reason_);
EXPECT_EQ(ConnectionPool::PoolFailureReason::Timeout, callbacks2.reason_);

EXPECT_EQ(2U, cluster_->stats_.upstream_cx_connect_fail_.value());
EXPECT_EQ(2U, cluster_->stats_.upstream_cx_connect_timeout_.value());
}
Expand Down
5 changes: 5 additions & 0 deletions test/integration/tcp_conn_pool_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class TestFilter : public Network::ReadFilter {
public:
Request(TestFilter& parent, Buffer::Instance& data) : parent_(parent) { data_.move(data); }

// Tcp::ConnectionPool::Callbacks
void onPoolFailure(Tcp::ConnectionPool::PoolFailureReason,
Upstream::HostDescriptionConstSharedPtr) override {
ASSERT(false);
Expand All @@ -59,6 +60,7 @@ class TestFilter : public Network::ReadFilter {
upstream_->connection().write(data_, false);
}

// Tcp::ConnectionPool::UpstreamCallbacks
void onUpstreamData(Buffer::Instance& data, bool end_stream) override {
UNREFERENCED_PARAMETER(end_stream);

Expand All @@ -67,6 +69,9 @@ class TestFilter : public Network::ReadFilter {

upstream_->release();
}
void onEvent(Network::ConnectionEvent) override {}
void onAboveWriteBufferHighWatermark() override {}
void onBelowWriteBufferLowWatermark() override {}

TestFilter& parent_;
Buffer::OwnedImpl data_;
Expand Down
1 change: 1 addition & 0 deletions test/mocks/tcp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ envoy_cc_mock(
deps = [
"//include/envoy/buffer:buffer_interface",
"//include/envoy/tcp:conn_pool_interface",
"//test/mocks/network:network_mocks",
"//test/mocks/upstream:host_mocks",
],
)
37 changes: 36 additions & 1 deletion test/mocks/tcp/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
#include "gmock/gmock.h"
#include "gtest/gtest.h"

using testing::Invoke;
using testing::ReturnRef;
using testing::_;

namespace Envoy {
namespace Tcp {
namespace ConnectionPool {
Expand All @@ -13,9 +17,40 @@ MockCancellable::~MockCancellable() {}
MockUpstreamCallbacks::MockUpstreamCallbacks() {}
MockUpstreamCallbacks::~MockUpstreamCallbacks() {}

MockInstance::MockInstance() {}
MockConnectionData::MockConnectionData() {
ON_CALL(*this, connection()).WillByDefault(ReturnRef(connection_));
}
MockConnectionData::~MockConnectionData() {}

MockInstance::MockInstance() {
ON_CALL(*this, newConnection(_)).WillByDefault(Invoke([&](Callbacks& cb) -> Cancellable* {
return newConnectionImpl(cb);
}));
}
MockInstance::~MockInstance() {}

MockCancellable* MockInstance::newConnectionImpl(Callbacks& cb) {
handles_.emplace_back();
callbacks_.push_back(&cb);
return &handles_.back();
}

void MockInstance::poolFailure(PoolFailureReason reason) {
Callbacks* cb = callbacks_.front();
callbacks_.pop_front();
handles_.pop_front();

cb->onPoolFailure(reason, host_);
}

void MockInstance::poolReady() {
Callbacks* cb = callbacks_.front();
callbacks_.pop_front();
handles_.pop_front();

cb->onPoolReady(connection_data_, host_);
}

} // namespace ConnectionPool
} // namespace Tcp
} // namespace Envoy
Loading

0 comments on commit 7c11e92

Please sign in to comment.