Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch implementation with timer #6452

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ message RedisProxy {
// need to be known to the cluster manager. If the command cannot be redirected, then the
// original error is passed downstream unchanged. By default, this support is not enabled.
bool enable_redirection = 3;

// Maximum size of buffer before flush is triggered. If this is unset, the buffer flushes
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved
// whenever it receives data and performs no batching.
uint32 max_buffer_size_before_flush = 4;

// Buffer is flushed every N milliseconds (unless the buffer is filled before the timer fires)
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved
google.protobuf.Duration buffer_flush_timeout = 5 [(gogoproto.stdduration) = true];
}

// Network settings for the connection pool to the upstream cluster.
Expand Down
3 changes: 3 additions & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ Version history
* dubbo_proxy: support the :ref:`Dubbo proxy filter <config_network_filters_dubbo_proxy>`.
* http: mitigated a race condition with the :ref:`delayed_close_timeout<envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.delayed_close_timeout>` where it could trigger while actively flushing a pending write buffer for a downstream connection.
* redis: add support for zpopmax and zpopmin commands.
* redis: added
:ref:`max_buffer_size_before_flush <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.max_buffer_size_before_flush>` to batch commands together until the encoder buffer hits a certain size, and
:ref:`buffer_flush_timeout <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.buffer_flush_timeout>` to control how quickly the buffer is flushed if it is not full.
* upstream: added :ref:`upstream_cx_pool_overflow <config_cluster_manager_cluster_stats>` for the connection pool circuit breaker.

1.10.0 (Apr 5, 2019)
Expand Down
10 changes: 10 additions & 0 deletions source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,16 @@ class Config {
* processed.
*/
virtual bool enableRedirection() const PURE;

/**
* @return buffer size for batching commands for a single upstream host.
*/
virtual unsigned int maxBufferSizeBeforeFlush() const PURE;
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved

/**
* @return timeout for batching commands for a single upstream host.
*/
virtual std::chrono::milliseconds bufferFlushTimeoutInMs() const PURE;
};

/**
Expand Down
24 changes: 21 additions & 3 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ ConfigImpl::ConfigImpl(
const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config)
: op_timeout_(PROTOBUF_GET_MS_REQUIRED(config, op_timeout)),
enable_hashtagging_(config.enable_hashtagging()),
enable_redirection_(config.enable_redirection()) {}
enable_redirection_(config.enable_redirection()),
max_buffer_size_before_flush_(
config.max_buffer_size_before_flush()), // This is a scalar, so default is zero.
buffer_flush_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_timeout, 3)) {}
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved

ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory,
Expand All @@ -31,7 +34,8 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis
EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config)
: host_(host), encoder_(std::move(encoder)), decoder_(decoder_factory.create(*this)),
config_(config),
connect_or_op_timer_(dispatcher.createTimer([this]() -> void { onConnectOrOpTimeout(); })) {
connect_or_op_timer_(dispatcher.createTimer([this]() -> void { onConnectOrOpTimeout(); })),
flush_timer_(dispatcher.createTimer([this]() -> void { flushBufferAndResetTimer(); })) {
host->cluster().stats().upstream_cx_total_.inc();
host->stats().cx_total_.inc();
host->cluster().stats().upstream_cx_active_.inc();
Expand All @@ -48,12 +52,26 @@ ClientImpl::~ClientImpl() {

void ClientImpl::close() { connection_->close(Network::ConnectionCloseType::NoFlush); }

void ClientImpl::flushBufferAndResetTimer() {
if (flush_timer_->enabled()) {
flush_timer_->disableTimer();
}
connection_->write(encoder_buffer_, false);
}

PoolRequest* ClientImpl::makeRequest(const RespValue& request, PoolCallbacks& callbacks) {
ASSERT(connection_->state() == Network::Connection::State::Open);

bool empty_buffer = encoder_buffer_.length() == 0;
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved

pending_requests_.emplace_back(*this, callbacks);
encoder_->encode(request, encoder_buffer_);
connection_->write(encoder_buffer_, false);

if (encoder_buffer_.length() > config_.maxBufferSizeBeforeFlush()) {
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved
flushBufferAndResetTimer();
} else if (empty_buffer) {
flush_timer_->enableTimer(std::chrono::milliseconds(config_.bufferFlushTimeoutInMs()));
} // else keep adding to buffer
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved

// Only boost the op timeout if:
// - We are not already connected. Otherwise, we are governed by the connect timeout and the timer
Expand Down
8 changes: 8 additions & 0 deletions source/extensions/filters/network/common/redis/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,17 @@ class ConfigImpl : public Config {
std::chrono::milliseconds opTimeout() const override { return op_timeout_; }
bool enableHashtagging() const override { return enable_hashtagging_; }
bool enableRedirection() const override { return enable_redirection_; }
unsigned int maxBufferSizeBeforeFlush() const override { return max_buffer_size_before_flush_; }
FAYiEKcbD0XFqF2QK2E4viAHg8rMm2VbjYKdjTg marked this conversation as resolved.
Show resolved Hide resolved
std::chrono::milliseconds bufferFlushTimeoutInMs() const override {
return buffer_flush_timeout_;
}

private:
const std::chrono::milliseconds op_timeout_;
const bool enable_hashtagging_;
const bool enable_redirection_;
const unsigned int max_buffer_size_before_flush_;
const std::chrono::milliseconds buffer_flush_timeout_;
};

class ClientImpl : public Client, public DecoderCallbacks, public Network::ConnectionCallbacks {
Expand All @@ -62,6 +68,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
}
void close() override;
PoolRequest* makeRequest(const RespValue& request, PoolCallbacks& callbacks) override;
void flushBufferAndResetTimer();

private:
struct UpstreamReadFilter : public Network::ReadFilterBaseImpl {
Expand Down Expand Up @@ -111,6 +118,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
std::list<PendingRequest> pending_requests_;
Event::TimerPtr connect_or_op_timer_;
bool connected_{};
Event::TimerPtr flush_timer_;
};

class ClientFactoryImpl : public ClientFactory {
Expand Down
10 changes: 10 additions & 0 deletions source/extensions/health_checkers/redis/redis.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <chrono>

#include "envoy/config/health_checker/redis/v2/redis.pb.validate.h"

#include "common/upstream/health_checker_base_impl.h"
Expand Down Expand Up @@ -63,6 +65,14 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase {
return true;
} // Redirection errors are treated as check successes.

// Batching
unsigned int maxBufferSizeBeforeFlush() const override {
return 0;
} // Forces an immediate flush
std::chrono::milliseconds bufferFlushTimeoutInMs() const override {
return std::chrono::milliseconds(1);
}

// Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks
void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override;
void onFailure() override;
Expand Down
Loading