Skip to content

Commit

Permalink
perf: simplify connection buffer stats (#244)
Browse files Browse the repository at this point in the history
We spend a considerable amount of time computing instantaneous
connection buffer stats and firing callbacks. Currently we only use
this information for stats purposes. This commit makes the buffer
stats eventually consistent and much more streamlined. Eventually,
we can provide direct accessors vs. callbacks if we need to inspect
buffer information in order to apply back pressure.
  • Loading branch information
mattklein123 authored Nov 28, 2016
1 parent b986a65 commit c0a5701
Show file tree
Hide file tree
Showing 31 changed files with 250 additions and 231 deletions.
14 changes: 0 additions & 14 deletions include/envoy/buffer/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ struct RawSlice {
uint64_t len_;
};

/**
* Buffer change callback.
* @param old_size supplies the size of the buffer prior to the change.
* @param data supplies how much data was added or removed.
*/
typedef std::function<void(uint64_t old_size, int64_t delta)> Callback;

/**
* A basic buffer abstraction.
*/
Expand Down Expand Up @@ -118,13 +111,6 @@ class Instance {
*/
virtual ssize_t search(const void* data, uint64_t size, size_t start) const PURE;

/**
* Set a buffer change callback. Only a single callback can be set at a time. The callback
* is invoked inline with buffer changes.
* @param callback supplies the callback to set. Pass nullptr to clear the callback.
*/
virtual void setCallback(Callback callback) PURE;

/**
* Write the buffer out to a file descriptor.
* @param fd supplies the descriptor to write to.
Expand Down
22 changes: 14 additions & 8 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,6 @@ class ConnectionCallbacks {
public:
virtual ~ConnectionCallbacks() {}

/**
* Callback for connection buffer changes.
* @param type supplies which buffer has changed.
* @param old_size supplies the original size of the buffer.
* @param delta supplies how much data was added or removed from the buffer.
*/
virtual void onBufferChange(ConnectionBufferType type, uint64_t old_size, int64_t delta) PURE;

/**
* Callback for connection events.
* @param events supplies the ConnectionEvent events that occurred as a bitmask.
Expand All @@ -64,6 +56,13 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
public:
enum class State { Open, Closing, Closed };

struct BufferStats {
Stats::Counter& read_total_;
Stats::Gauge& read_current_;
Stats::Counter& write_total_;
Stats::Gauge& write_current_;
};

virtual ~Connection() {}

/**
Expand Down Expand Up @@ -116,6 +115,13 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
*/
virtual const std::string& remoteAddress() PURE;

/**
* Set the buffer stats to update when the connection's read/write buffers change. Note that
* for performance reasons these stats are eventually consistent and may not always accurately
* represent the buffer contents at any given point in time.
*/
virtual void setBufferStats(const BufferStats& stats) PURE;

/**
* @return the SSL connection data if this is an SSL connection, or nullptr if it is not.
*/
Expand Down
19 changes: 0 additions & 19 deletions source/common/buffer/buffer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@

namespace Buffer {

const evbuffer_cb_func OwnedImpl::buffer_cb_ =
[](evbuffer*, const evbuffer_cb_info* info, void* arg)
-> void { static_cast<OwnedImpl*>(arg)->onBufferChange(*info); };

void OwnedImpl::add(const void* data, uint64_t size) { evbuffer_add(buffer_.get(), data, size); }

void OwnedImpl::add(const std::string& data) {
Expand Down Expand Up @@ -78,10 +74,6 @@ void OwnedImpl::move(Instance& rhs, uint64_t length) {
UNREFERENCED_PARAMETER(rc);
}

void OwnedImpl::onBufferChange(const evbuffer_cb_info& info) {
cb_(info.orig_size, info.n_added - info.n_deleted);
}

int OwnedImpl::read(int fd, uint64_t max_length) {
return evbuffer_read(buffer_.get(), fd, max_length);
}
Expand All @@ -108,17 +100,6 @@ ssize_t OwnedImpl::search(const void* data, uint64_t size, size_t start) const {
return result_ptr.pos;
}

void OwnedImpl::setCallback(Callback callback) {
ASSERT(!callback || !cb_);
if (callback) {
evbuffer_add_cb(buffer_.get(), buffer_cb_, this);
cb_ = callback;
} else {
evbuffer_remove_cb(buffer_.get(), buffer_cb_, this);
cb_ = nullptr;
}
}

int OwnedImpl::write(int fd) { return evbuffer_write(buffer_.get(), fd); }

OwnedImpl::OwnedImpl() : buffer_(evbuffer_new()) {}
Expand Down
11 changes: 0 additions & 11 deletions source/common/buffer/buffer_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@

#include "common/event/libevent.h"

// Forward decls to avoid leaking libevent headers to rest of program.
struct evbuffer_cb_info;
typedef void (*evbuffer_cb_func)(evbuffer* buffer, const evbuffer_cb_info* info, void* arg);

namespace Buffer {

/**
Expand All @@ -34,17 +30,10 @@ class OwnedImpl : public Instance {
int read(int fd, uint64_t max_length) override;
uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override;
ssize_t search(const void* data, uint64_t size, size_t start) const override;
void setCallback(Callback callback) override;
int write(int fd) override;

private:
void onBufferChange(const evbuffer_cb_info& info);

static const evbuffer_cb_func buffer_cb_; // Static callback used for all evbuffer callbacks.
// This allows us to add/remove by value.

Event::Libevent::BufferPtr buffer_;
Callback cb_; // The per buffer callback. Invoked via the buffer_cb_ static thunk.
};

} // Buffer
1 change: 0 additions & 1 deletion source/common/filter/auth/client_ssl.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class Instance : public Network::ReadFilter, public Network::ConnectionCallbacks
}

// Network::ConnectionCallbacks
void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {}
void onEvent(uint32_t events) override;

private:
Expand Down
1 change: 0 additions & 1 deletion source/common/filter/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class Instance : public Network::ReadFilter,
}

// Network::ConnectionCallbacks
void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {}
void onEvent(uint32_t events) override;

// RateLimit::RequestCallbacks
Expand Down
39 changes: 15 additions & 24 deletions source/common/filter/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ TcpProxyStats TcpProxyConfig::generateStats(const std::string& name, Stats::Stor
POOL_GAUGE_PREFIX(store, final_prefix))};
}

void TcpProxy::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
read_callbacks_ = &callbacks;
conn_log_info("new tcp proxy session", read_callbacks_->connection());
read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_);
read_callbacks_->connection().setBufferStats({config_->stats().downstream_cx_rx_bytes_total_,
config_->stats().downstream_cx_rx_bytes_buffered_,
config_->stats().downstream_cx_tx_bytes_total_,
config_->stats().downstream_cx_tx_bytes_buffered_});
}

Network::FilterStatus TcpProxy::initializeUpstreamConnection() {
Upstream::ResourceManager& upstream_cluster_resource_manager =
cluster_manager_.get(config_->clusterName())
Expand All @@ -68,6 +78,11 @@ Network::FilterStatus TcpProxy::initializeUpstreamConnection() {

upstream_connection_->addReadFilter(upstream_callbacks_);
upstream_connection_->addConnectionCallbacks(*upstream_callbacks_);
upstream_connection_->setBufferStats(
{read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_rx_bytes_total_,
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_rx_bytes_buffered_,
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_total_,
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_buffered_});
upstream_connection_->connect();
upstream_connection_->noDelay(true);

Expand Down Expand Up @@ -103,18 +118,6 @@ Network::FilterStatus TcpProxy::onData(Buffer::Instance& data) {
return Network::FilterStatus::StopIteration;
}

void TcpProxy::onDownstreamBufferChange(Network::ConnectionBufferType type, uint64_t,
int64_t delta) {
if (type == Network::ConnectionBufferType::Write) {
if (delta > 0) {
config_->stats().downstream_cx_tx_bytes_total_.add(delta);
config_->stats().downstream_cx_tx_bytes_buffered_.add(delta);
} else {
config_->stats().downstream_cx_tx_bytes_buffered_.sub(std::abs(delta));
}
}
}

void TcpProxy::onDownstreamEvent(uint32_t event) {
if ((event & Network::ConnectionEvent::RemoteClose ||
event & Network::ConnectionEvent::LocalClose) &&
Expand All @@ -127,18 +130,6 @@ void TcpProxy::onDownstreamEvent(uint32_t event) {
}
}

void TcpProxy::onUpstreamBufferChange(Network::ConnectionBufferType type, uint64_t, int64_t delta) {
if (type == Network::ConnectionBufferType::Write) {
if (delta > 0) {
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_total_.add(delta);
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_buffered_.add(delta);
} else {
read_callbacks_->upstreamHost()->cluster().stats().upstream_cx_tx_bytes_buffered_.sub(
std::abs(delta));
}
}
}

void TcpProxy::onUpstreamData(Buffer::Instance& data) {
read_callbacks_->connection().write(data);
ASSERT(0 == data.length());
Expand Down
21 changes: 3 additions & 18 deletions source/common/filter/tcp_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace Filter {
*/
// clang-format off
#define ALL_TCP_PROXY_STATS(COUNTER, GAUGE) \
COUNTER(downstream_cx_rx_bytes_total) \
GAUGE (downstream_cx_rx_bytes_buffered) \
COUNTER(downstream_cx_tx_bytes_total) \
GAUGE (downstream_cx_tx_bytes_buffered)
// clang-format on
Expand Down Expand Up @@ -61,22 +63,13 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggable<Logger::Id::filter
// Network::ReadFilter
Network::FilterStatus onData(Buffer::Instance& data) override;
Network::FilterStatus onNewConnection() override { return initializeUpstreamConnection(); }
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override {
read_callbacks_ = &callbacks;
conn_log_info("new tcp proxy session", read_callbacks_->connection());
read_callbacks_->connection().addConnectionCallbacks(downstream_callbacks_);
}
void initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) override;

private:
struct DownstreamCallbacks : public Network::ConnectionCallbacks {
DownstreamCallbacks(TcpProxy& parent) : parent_(parent) {}

// Network::ConnectionCallbacks
void onBufferChange(Network::ConnectionBufferType type, uint64_t old_size,
int64_t delta) override {
parent_.onDownstreamBufferChange(type, old_size, delta);
}

void onEvent(uint32_t event) override { parent_.onDownstreamEvent(event); }

TcpProxy& parent_;
Expand All @@ -87,11 +80,6 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggable<Logger::Id::filter
UpstreamCallbacks(TcpProxy& parent) : parent_(parent) {}

// Network::ConnectionCallbacks
void onBufferChange(Network::ConnectionBufferType type, uint64_t old_size,
int64_t delta) override {
parent_.onUpstreamBufferChange(type, old_size, delta);
}

void onEvent(uint32_t event) override { parent_.onUpstreamEvent(event); }

// Network::ReadFilter
Expand All @@ -105,10 +93,7 @@ class TcpProxy : public Network::ReadFilter, Logger::Loggable<Logger::Id::filter

Network::FilterStatus initializeUpstreamConnection();
void onConnectTimeout();
void onDownstreamBufferChange(Network::ConnectionBufferType type, uint64_t old_size,
int64_t delta);
void onDownstreamEvent(uint32_t event);
void onUpstreamBufferChange(Network::ConnectionBufferType type, uint64_t old_size, int64_t delta);
void onUpstreamData(Buffer::Instance& data);
void onUpstreamEvent(uint32_t event);

Expand Down
5 changes: 4 additions & 1 deletion source/common/http/codec_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ class CodecClient : Logger::Loggable<Logger::Id::client>,
*/
StreamEncoder& newStream(StreamDecoder& response_decoder);

void setBufferStats(const Network::Connection::BufferStats& stats) {
connection_->setBufferStats(stats);
}

void setCodecClientCallbacks(CodecClientCallbacks& callbacks) {
codec_client_callbacks_ = &callbacks;
}
Expand Down Expand Up @@ -180,7 +184,6 @@ class CodecClient : Logger::Loggable<Logger::Id::client>,
void onData(Buffer::Instance& data);

// Network::ConnectionCallbacks
void onBufferChange(Network::ConnectionBufferType, uint64_t, int64_t) override {}
void onEvent(uint32_t events) override;

std::list<ActiveRequestPtr> active_requests_;
Expand Down
13 changes: 5 additions & 8 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCal
[this]() -> void { onIdleTimeout(); });
idle_timer_->enableTimer(config_.idleTimeout().value());
}

read_callbacks_->connection().setBufferStats({stats_.named_.downstream_cx_rx_bytes_total_,
stats_.named_.downstream_cx_rx_bytes_buffered_,
stats_.named_.downstream_cx_tx_bytes_total_,
stats_.named_.downstream_cx_tx_bytes_buffered_});
}

ConnectionManagerImpl::~ConnectionManagerImpl() {
Expand Down Expand Up @@ -189,14 +194,6 @@ Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data) {
return Network::FilterStatus::StopIteration;
}

void ConnectionManagerImpl::onBufferChange(Network::ConnectionBufferType type, uint64_t,
int64_t delta) {
Network::Utility::updateBufferStats(type, delta, stats_.named_.downstream_cx_rx_bytes_total_,
stats_.named_.downstream_cx_rx_bytes_buffered_,
stats_.named_.downstream_cx_tx_bytes_total_,
stats_.named_.downstream_cx_tx_bytes_buffered_);
}

void ConnectionManagerImpl::resetAllStreams() {
while (!streams_.empty()) {
// Mimic a downstream reset in this case.
Expand Down
2 changes: 0 additions & 2 deletions source/common/http/conn_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ class ConnectionManagerImpl : Logger::Loggable<Logger::Id::http>,
StreamDecoder& newStream(StreamEncoder& response_encoder) override;

// Network::ConnectionCallbacks
void onBufferChange(Network::ConnectionBufferType type, uint64_t old_size,
int64_t delta) override;
void onEvent(uint32_t events) override;

private:
Expand Down
14 changes: 5 additions & 9 deletions source/common/http/http1/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent)
conn_length_ = parent_.host_->cluster().stats().upstream_cx_length_ms_.allocateSpan();
connect_timer_->enableTimer(parent_.host_->cluster().connectTimeout());
parent_.host_->cluster().resourceManager(parent_.priority_).connections().inc();

codec_client_->setBufferStats({parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_,
parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_});
}

ConnPoolImpl::ActiveClient::~ActiveClient() {
Expand All @@ -285,15 +290,6 @@ ConnPoolImpl::ActiveClient::~ActiveClient() {
parent_.host_->cluster().resourceManager(parent_.priority_).connections().dec();
}

void ConnPoolImpl::ActiveClient::onBufferChange(Network::ConnectionBufferType type, uint64_t,
int64_t delta) {
Network::Utility::updateBufferStats(
type, delta, parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_,
parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_);
}

void ConnPoolImpl::ActiveClient::onConnectTimeout() {
// We just close the client at this point. This will result in both a timeout and a connect
// failure and will fold into all the normal connect failure logic.
Expand Down
2 changes: 0 additions & 2 deletions source/common/http/http1/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ class ConnPoolImpl : Logger::Loggable<Logger::Id::pool>, public ConnectionPool::
void onConnectTimeout();

// Network::ConnectionCallbacks
void onBufferChange(Network::ConnectionBufferType type, uint64_t old_size,
int64_t delta) override;
void onEvent(uint32_t events) override { parent_.onConnectionEvent(*this, events); }

ConnPoolImpl& parent_;
Expand Down
14 changes: 5 additions & 9 deletions source/common/http/http2/conn_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,11 @@ ConnPoolImpl::ActiveClient::ActiveClient(ConnPoolImpl& parent)
parent_.host_->cluster().stats().upstream_cx_active_.inc();
parent_.host_->cluster().stats().upstream_cx_http2_total_.inc();
conn_length_ = parent_.host_->cluster().stats().upstream_cx_length_ms_.allocateSpan();

client_->setBufferStats({parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_,
parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_});
}

ConnPoolImpl::ActiveClient::~ActiveClient() {
Expand All @@ -234,15 +239,6 @@ ConnPoolImpl::ActiveClient::~ActiveClient() {
conn_length_->complete();
}

void ConnPoolImpl::ActiveClient::onBufferChange(Network::ConnectionBufferType type, uint64_t,
int64_t delta) {
Network::Utility::updateBufferStats(
type, delta, parent_.host_->cluster().stats().upstream_cx_rx_bytes_total_,
parent_.host_->cluster().stats().upstream_cx_rx_bytes_buffered_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_total_,
parent_.host_->cluster().stats().upstream_cx_tx_bytes_buffered_);
}

CodecClientPtr ProdConnPoolImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) {
CodecClientStats stats{host_->cluster().stats().upstream_cx_protocol_error_};
CodecClientPtr codec{new CodecClientProd(CodecClient::Type::HTTP2, std::move(data.connection_),
Expand Down
Loading

0 comments on commit c0a5701

Please sign in to comment.