Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch implementation with timer #6452

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,27 @@ message RedisProxy {
// need to be known to the cluster manager. If the command cannot be redirected, then the
// original error is passed downstream unchanged. By default, this support is not enabled.
bool enable_redirection = 3;

// Maximum size of encoded request buffer before flush is triggered and encoded requests
// are sent upstream. If this is unset, the buffer flushes whenever it receives data
// and performs no batching.
// This feature makes it possible for multiple clients to send requests to Envoy and have
// them batched- for example if one is running several worker processes, each with its own
// Redis connection. There is no benefit to using this with a single downstream process.
// Recommended size (if enabled) is 1024 bytes.
uint32 max_buffer_size_before_flush = 4;

// The encoded request buffer is flushed N milliseconds after the first request has been
// encoded, unless the buffer size has already exceeded `max_buffer_size_before_flush`.
// If `max_buffer_size_before_flush` is not set, this flush timer is not used. Otherwise,
// the timer should be set according to the number of clients, overall request rate and
// desired maximum latency for a single command. For example, if there are many requests
// being batched together at a high rate, the buffer will likely be filled before the timer
// fires. Alternatively, if the request rate is lower the buffer will not be filled as often
// before the timer fires.
// If `max_buffer_size_before_flush` is set, but `buffer_flush_timeout` is not, the latter
// defaults to 3ms.
google.protobuf.Duration buffer_flush_timeout = 5 [(gogoproto.stdduration) = true];
}

// Network settings for the connection pool to the upstream clusters.
Expand Down
3 changes: 3 additions & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ Version history
* http: mitigated a race condition with the :ref:`delayed_close_timeout<envoy_api_field_config.filter.network.http_connection_manager.v2.HttpConnectionManager.delayed_close_timeout>` where it could trigger while actively flushing a pending write buffer for a downstream connection.
* redis: added :ref:`prefix routing <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.prefix_routes>` to enable routing commands based on their key's prefix to different upstream.
* redis: add support for zpopmax and zpopmin commands.
* redis: added
:ref:`max_buffer_size_before_flush <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.max_buffer_size_before_flush>` to batch commands together until the encoder buffer hits a certain size, and
:ref:`buffer_flush_timeout <envoy_api_field_config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings.buffer_flush_timeout>` to control how quickly the buffer is flushed if it is not full.
* router: added ability to control retry back-off intervals via :ref:`retry policy <envoy_api_msg_route.RetryPolicy.RetryBackOff>`.
* upstream: added :ref:`upstream_cx_pool_overflow <config_cluster_manager_cluster_stats>` for the connection pool circuit breaker.

Expand Down
12 changes: 12 additions & 0 deletions source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <cstdint>

#include "envoy/upstream/cluster_manager.h"

#include "extensions/filters/network/common/redis/codec_impl.h"
Expand Down Expand Up @@ -110,6 +112,16 @@ class Config {
* processed.
*/
virtual bool enableRedirection() const PURE;

/**
* @return buffer size for batching commands for a single upstream host.
*/
virtual uint32_t maxBufferSizeBeforeFlush() const PURE;

/**
* @return timeout for batching commands for a single upstream host.
*/
virtual std::chrono::milliseconds bufferFlushTimeoutInMs() const PURE;
};

/**
Expand Down
29 changes: 26 additions & 3 deletions source/extensions/filters/network/common/redis/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@ ConfigImpl::ConfigImpl(
const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config)
: op_timeout_(PROTOBUF_GET_MS_REQUIRED(config, op_timeout)),
enable_hashtagging_(config.enable_hashtagging()),
enable_redirection_(config.enable_redirection()) {}
enable_redirection_(config.enable_redirection()),
max_buffer_size_before_flush_(
config.max_buffer_size_before_flush()), // This is a scalar, so default is zero.
buffer_flush_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(
config, buffer_flush_timeout,
3)) // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used
// as the buffer is flushed on each request immediately.
{}

ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher,
EncoderPtr&& encoder, DecoderFactory& decoder_factory,
Expand All @@ -31,7 +38,8 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis
EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config)
: host_(host), encoder_(std::move(encoder)), decoder_(decoder_factory.create(*this)),
config_(config),
connect_or_op_timer_(dispatcher.createTimer([this]() -> void { onConnectOrOpTimeout(); })) {
connect_or_op_timer_(dispatcher.createTimer([this]() -> void { onConnectOrOpTimeout(); })),
flush_timer_(dispatcher.createTimer([this]() -> void { flushBufferAndResetTimer(); })) {
host->cluster().stats().upstream_cx_total_.inc();
host->stats().cx_total_.inc();
host->cluster().stats().upstream_cx_active_.inc();
Expand All @@ -48,12 +56,27 @@ ClientImpl::~ClientImpl() {

void ClientImpl::close() { connection_->close(Network::ConnectionCloseType::NoFlush); }

void ClientImpl::flushBufferAndResetTimer() {
if (flush_timer_->enabled()) {
flush_timer_->disableTimer();
}
connection_->write(encoder_buffer_, false);
}

PoolRequest* ClientImpl::makeRequest(const RespValue& request, PoolCallbacks& callbacks) {
ASSERT(connection_->state() == Network::Connection::State::Open);

const bool empty_buffer = encoder_buffer_.length() == 0;

pending_requests_.emplace_back(*this, callbacks);
encoder_->encode(request, encoder_buffer_);
connection_->write(encoder_buffer_, false);

// If buffer is full, flush. If the the buffer was empty before the request, start the timer.
if (encoder_buffer_.length() >= config_.maxBufferSizeBeforeFlush()) {
flushBufferAndResetTimer();
} else if (empty_buffer) {
flush_timer_->enableTimer(std::chrono::milliseconds(config_.bufferFlushTimeoutInMs()));
}

// Only boost the op timeout if:
// - We are not already connected. Otherwise, we are governed by the connect timeout and the timer
Expand Down
8 changes: 8 additions & 0 deletions source/extensions/filters/network/common/redis/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,17 @@ class ConfigImpl : public Config {
std::chrono::milliseconds opTimeout() const override { return op_timeout_; }
bool enableHashtagging() const override { return enable_hashtagging_; }
bool enableRedirection() const override { return enable_redirection_; }
uint32_t maxBufferSizeBeforeFlush() const override { return max_buffer_size_before_flush_; }
std::chrono::milliseconds bufferFlushTimeoutInMs() const override {
return buffer_flush_timeout_;
}

private:
const std::chrono::milliseconds op_timeout_;
const bool enable_hashtagging_;
const bool enable_redirection_;
const uint32_t max_buffer_size_before_flush_;
const std::chrono::milliseconds buffer_flush_timeout_;
};

class ClientImpl : public Client, public DecoderCallbacks, public Network::ConnectionCallbacks {
Expand All @@ -62,6 +68,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
}
void close() override;
PoolRequest* makeRequest(const RespValue& request, PoolCallbacks& callbacks) override;
void flushBufferAndResetTimer();

private:
struct UpstreamReadFilter : public Network::ReadFilterBaseImpl {
Expand Down Expand Up @@ -111,6 +118,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne
std::list<PendingRequest> pending_requests_;
Event::TimerPtr connect_or_op_timer_;
bool connected_{};
Event::TimerPtr flush_timer_;
};

class ClientFactoryImpl : public ClientFactory {
Expand Down
10 changes: 10 additions & 0 deletions source/extensions/health_checkers/redis/redis.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <chrono>

#include "envoy/config/health_checker/redis/v2/redis.pb.validate.h"

#include "common/upstream/health_checker_base_impl.h"
Expand Down Expand Up @@ -63,6 +65,14 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase {
return true;
} // Redirection errors are treated as check successes.

// Batching
unsigned int maxBufferSizeBeforeFlush() const override {
return 0;
} // Forces an immediate flush
std::chrono::milliseconds bufferFlushTimeoutInMs() const override {
return std::chrono::milliseconds(1);
}

// Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks
void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override;
void onFailure() override;
Expand Down
Loading