Skip to content

Commit

Permalink
tcp: allow conn pool cancel to request connection close (envoyproxy#4684
Browse files Browse the repository at this point in the history
)

Allows cancellation of a pending connection via the TCP connection pool to
request that the pending connection be closed if it cannot be immediately used.

*Risk Level*: medium
*Testing*: unit tests
*Docs Changes*: n/a
*Release Notes*: n/a
*Fixes*: envoyproxy#4409

Signed-off-by: Stephan Zuercher <stephan@turbinelabs.io>

Signed-off-by: Yang Song <yasong@yasong00.cam.corp.google.com>
  • Loading branch information
zuercher authored and Yang Song committed Oct 19, 2018
1 parent 8290697 commit 0528a09
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 41 deletions.
21 changes: 19 additions & 2 deletions include/envoy/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@ namespace Envoy {
namespace Tcp {
namespace ConnectionPool {

/**
* Controls the behavior of a canceled connection request.
*/
enum class CancelPolicy {
// By default, canceled connection requests allow a pending connection to complete and become
// available for a future connection request.
Default,
// When a connection request is canceled, closes a pending connection if there are more pending
// connections than pending connection requests. CloseExcess is useful for callers that never
// re-use connections (e.g. by closing rather than releasing connections). Using CloseExcess in
// this situation guarantees that no idle connections will be held open by the conn pool awaiting
// a connection request.
CloseExcess,
};

/**
* Handle that allows a pending connection request to be canceled before it is completed.
*/
Expand All @@ -20,9 +35,11 @@ class Cancellable {
virtual ~Cancellable() {}

/**
* Cancel the pending request.
* Cancel the pending connection request.
* @param cancel_policy a CancelPolicy that controls the behavior of this connection request
* cancellation.
*/
virtual void cancel() PURE;
virtual void cancel(CancelPolicy cancel_policy) PURE;
};

/**
Expand Down
70 changes: 56 additions & 14 deletions source/common/tcp/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ ConnPoolImpl::~ConnPoolImpl() {
busy_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush);
}

while (!pending_conns_.empty()) {
pending_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush);
}

// Make sure all connections are destroyed before we are destroyed.
dispatcher_.clearDeferredDeleteList();
}
Expand All @@ -31,11 +35,15 @@ void ConnPoolImpl::drainConnections() {
ready_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush);
}

// We drain busy connections by manually setting remaining requests to 1. Thus, when the next
// response completes the connection will be destroyed.
// We drain busy and pending connections by manually setting remaining requests to 1. Thus, when
// the next response completes the connection will be destroyed.
for (const auto& conn : busy_conns_) {
conn->remaining_requests_ = 1;
}

for (const auto& conn : pending_conns_) {
conn->remaining_requests_ = 1;
}
}

void ConnPoolImpl::addDrainedCallback(DrainedCb cb) {
Expand All @@ -52,7 +60,8 @@ void ConnPoolImpl::assignConnection(ActiveConn& conn, ConnectionPool::Callbacks&
}

void ConnPoolImpl::checkForDrained() {
if (!drained_callbacks_.empty() && pending_requests_.empty() && busy_conns_.empty()) {
if (!drained_callbacks_.empty() && pending_requests_.empty() && busy_conns_.empty() &&
pending_conns_.empty()) {
while (!ready_conns_.empty()) {
ready_conns_.front()->conn_->close(Network::ConnectionCloseType::NoFlush);
}
Expand All @@ -66,7 +75,7 @@ void ConnPoolImpl::checkForDrained() {
void ConnPoolImpl::createNewConnection() {
ENVOY_LOG(debug, "creating a new connection");
ActiveConnPtr conn(new ActiveConn(*this));
conn->moveIntoList(std::move(conn), busy_conns_);
conn->moveIntoList(std::move(conn), pending_conns_);
}

ConnectionPool::Cancellable* ConnPoolImpl::newConnection(ConnectionPool::Callbacks& callbacks) {
Expand All @@ -85,7 +94,8 @@ ConnectionPool::Cancellable* ConnPoolImpl::newConnection(ConnectionPool::Callbac
}

// If we have no connections at all, make one no matter what so we don't starve.
if ((ready_conns_.size() == 0 && busy_conns_.size() == 0) || can_create_connection) {
if ((ready_conns_.empty() && busy_conns_.empty() && pending_conns_.empty()) ||
can_create_connection) {
createNewConnection();
}

Expand Down Expand Up @@ -139,7 +149,7 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent
// The only time this happens is if we actually saw a connect failure.
host_->cluster().stats().upstream_cx_connect_fail_.inc();
host_->stats().cx_connect_fail_.inc();
removed = conn.removeFromList(busy_conns_);
removed = conn.removeFromList(pending_conns_);

// Raw connect failures should never happen under normal circumstances. If we have an upstream
// that is behaving badly, requests can get stuck here in the pending state. If we see a
Expand Down Expand Up @@ -168,7 +178,8 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent
dispatcher_.deferredDelete(std::move(removed));

// If we have pending requests and we just lost a connection we should make a new one.
if (pending_requests_.size() > (ready_conns_.size() + busy_conns_.size())) {
if (pending_requests_.size() >
(ready_conns_.size() + busy_conns_.size() + pending_conns_.size())) {
createNewConnection();
}

Expand All @@ -185,17 +196,28 @@ void ConnPoolImpl::onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent
// Note that the order in this function is important. Concretely, we must destroy the connect
// timer before we process an idle connection, because if this results in an immediate
// drain/destruction event, we key off of the existence of the connect timer above to determine
// whether the connection is in the ready list (connected) or the busy list (failed to connect).
// whether the connection is in the ready list (connected) or the pending list (failed to
// connect).
if (event == Network::ConnectionEvent::Connected) {
conn_connect_ms_->complete();
processIdleConnection(conn, false);
processIdleConnection(conn, true, false);
}
}

void ConnPoolImpl::onPendingRequestCancel(PendingRequest& request) {
void ConnPoolImpl::onPendingRequestCancel(PendingRequest& request,
ConnectionPool::CancelPolicy cancel_policy) {
ENVOY_LOG(debug, "canceling pending request");
request.removeFromList(pending_requests_);
host_->cluster().stats().upstream_rq_cancelled_.inc();

// If the cancel requests closure of excess connections and there are more pending connections
// than requests, close the most recently created pending connection.
if (cancel_policy == ConnectionPool::CancelPolicy::CloseExcess &&
pending_requests_.size() < pending_conns_.size()) {
ENVOY_LOG(debug, "canceling pending connection");
pending_conns_.back()->conn_->close(Network::ConnectionCloseType::NoFlush);
}

checkForDrained();
}

Expand All @@ -211,7 +233,7 @@ void ConnPoolImpl::onConnReleased(ActiveConn& conn) {
// Upstream connection might be closed right after response is complete. Setting delay=true
// here to assign pending requests in next dispatcher loop to handle that case.
// https://github.com/envoyproxy/envoy/issues/2715
processIdleConnection(conn, true);
processIdleConnection(conn, false, true);
}
}

Expand All @@ -226,27 +248,47 @@ void ConnPoolImpl::onUpstreamReady() {
ENVOY_CONN_LOG(debug, "assigning connection", *conn.conn_);
// There is work to do so bind a connection to the caller and move it to the busy list. Pending
// requests are pushed onto the front, so pull from the back.
conn.moveBetweenLists(ready_conns_, busy_conns_);
assignConnection(conn, pending_requests_.back()->callbacks_);
pending_requests_.pop_back();
conn.moveBetweenLists(ready_conns_, busy_conns_);
}
}

void ConnPoolImpl::processIdleConnection(ActiveConn& conn, bool delay) {
void ConnPoolImpl::processIdleConnection(ActiveConn& conn, bool new_connection, bool delay) {
if (conn.wrapper_) {
conn.wrapper_->invalidate();
conn.wrapper_.reset();
}

// TODO(zuercher): As a future improvement, we may wish to close extra connections when there are
// no pending requests rather than moving them to ready_conns_. For conn pool callers that re-use
// connections it is possible that a busy connection may be re-assigned to a pending request
// while a new connection is pending. The current behavior is to move the pending connection to
// the ready list to await a future request. For some protocols, e.g. mysql which has the server
// transmit handshake data on connect, it may be desirable to close the connection if no pending
// request is available. The CloseExcess flag for cancel is related: if we close pending
// connections without requests here it becomes superfluous (instead of closing connections at
// cancel time we'd wait until they completed and close them here). Finally, we want to avoid
// requiring operators to correct configure clusters to get the necessary pending connection
// behavior (e.g. we want to find a way to enable the new behavior without having to configure
// it on a cluster).

if (pending_requests_.empty() || delay) {
// There is nothing to service or delayed processing is requested, so just move the connection
// into the ready list.
ENVOY_CONN_LOG(debug, "moving to ready", *conn.conn_);
conn.moveBetweenLists(busy_conns_, ready_conns_);
if (new_connection) {
conn.moveBetweenLists(pending_conns_, ready_conns_);
} else {
conn.moveBetweenLists(busy_conns_, ready_conns_);
}
} else {
// There is work to do immediately so bind a request to the caller and move it to the busy list.
// Pending requests are pushed onto the front, so pull from the back.
ENVOY_CONN_LOG(debug, "assigning connection", *conn.conn_);
if (new_connection) {
conn.moveBetweenLists(pending_conns_, busy_conns_);
}
assignConnection(conn, pending_requests_.back()->callbacks_);
pending_requests_.pop_back();
}
Expand Down
13 changes: 8 additions & 5 deletions source/common/tcp/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
~PendingRequest();

// ConnectionPool::Cancellable
void cancel() override { parent_.onPendingRequestCancel(*this); }
void cancel(ConnectionPool::CancelPolicy cancel_policy) override {
parent_.onPendingRequestCancel(*this, cancel_policy);
}

ConnPoolImpl& parent_;
ConnectionPool::Callbacks& callbacks_;
Expand All @@ -135,20 +137,21 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void assignConnection(ActiveConn& conn, ConnectionPool::Callbacks& callbacks);
void createNewConnection();
void onConnectionEvent(ActiveConn& conn, Network::ConnectionEvent event);
void onPendingRequestCancel(PendingRequest& request);
void onPendingRequestCancel(PendingRequest& request, ConnectionPool::CancelPolicy cancel_policy);
virtual void onConnReleased(ActiveConn& conn);
virtual void onConnDestroyed(ActiveConn& conn);
void onUpstreamReady();
void processIdleConnection(ActiveConn& conn, bool delay);
void processIdleConnection(ActiveConn& conn, bool new_connection, bool delay);
void checkForDrained();

Event::Dispatcher& dispatcher_;
Upstream::HostConstSharedPtr host_;
Upstream::ResourcePriority priority_;
const Network::ConnectionSocket::OptionsSharedPtr socket_options_;

std::list<ActiveConnPtr> ready_conns_;
std::list<ActiveConnPtr> busy_conns_;
std::list<ActiveConnPtr> pending_conns_; // conns awaiting connected event
std::list<ActiveConnPtr> ready_conns_; // conns ready for assignment
std::list<ActiveConnPtr> busy_conns_; // conns assigned
std::list<PendingRequestPtr> pending_requests_;
std::list<DrainedCb> drained_callbacks_;
Stats::TimespanPtr conn_connect_ms_;
Expand Down
3 changes: 2 additions & 1 deletion source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ void Filter::onDownstreamEvent(Network::ConnectionEvent event) {
} else if (upstream_handle_) {
if (event == Network::ConnectionEvent::LocalClose ||
event == Network::ConnectionEvent::RemoteClose) {
upstream_handle_->cancel();
// Cancel the conn pool request and close any excess pending requests.
upstream_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::CloseExcess);
upstream_handle_ = nullptr;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ FilterStatus Router::UpstreamRequest::start() {

void Router::UpstreamRequest::resetStream() {
if (conn_pool_handle_) {
conn_pool_handle_->cancel();
conn_pool_handle_->cancel(Tcp::ConnectionPool::CancelPolicy::Default);
}

if (conn_data_ != nullptr) {
Expand Down
Loading

0 comments on commit 0528a09

Please sign in to comment.