From 446d3b605632cf32792d33879fcd47864effd521 Mon Sep 17 00:00:00 2001 From: Nicolas Flacco Date: Thu, 11 Apr 2019 17:50:26 -0700 Subject: [PATCH 1/9] timer batch implementation, tested, merge conflicts fixed, etc Signed-off-by: Nicolas Flacco --- .../network/redis_proxy/v2/redis_proxy.proto | 6 + docs/root/intro/version_history.rst | 3 + .../filters/network/common/redis/client.h | 10 ++ .../network/common/redis/client_impl.cc | 24 ++- .../network/common/redis/client_impl.h | 8 + .../extensions/health_checkers/redis/redis.h | 10 ++ .../network/common/redis/client_impl_test.cc | 162 +++++++++++++++++- 7 files changed, 219 insertions(+), 4 deletions(-) diff --git a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto index 23448eff903f..4d278aaac30e 100644 --- a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto +++ b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto @@ -53,6 +53,12 @@ message RedisProxy { // need to be known to the cluster manager. If the command cannot be redirected, then the // original error is passed downstream unchanged. By default, this support is not enabled. bool enable_redirection = 3; + + // Maximum size of buffer before flush is triggered + uint32 max_buffer_size_before_flush = 4; + + // Buffer is flushed every N milliseconds (unless the buffer is filled before the timer fires) + google.protobuf.Duration buffer_flush_timeout = 5 [(gogoproto.stdduration) = true]; } // Network settings for the connection pool to the upstream cluster. diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 5551b9582341..40d612af5356 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -63,6 +63,9 @@ Version history * redis: added :ref:`success and error stats ` for commands. * redis: migrate hash function for host selection to `MurmurHash2 `_ from std::hash. MurmurHash2 is compatible with std::hash in GNU libstdc++ 3.4.20 or above. This is typically the case when compiled on Linux and not macOS. * redis: added :ref:`latency_in_micros ` to specify the redis commands stats time unit in microseconds. +* redis: added + :ref:`max_buffer_size_before_flush ` to batch commands together until the encoder buffer hits a certain size, and + :ref:`buffer_flush_timeout ` to control how quickly the buffer is flushed if it is not full. * router: added ability to configure a :ref:`retry policy ` at the virtual host level. * router: added reset reason to response body when upstream reset happens. After this change, the response body will be of the form `upstream connect error or disconnect/reset before headers. reset reason:` diff --git a/source/extensions/filters/network/common/redis/client.h b/source/extensions/filters/network/common/redis/client.h index 59c5c88080c9..8cfcfe76c053 100644 --- a/source/extensions/filters/network/common/redis/client.h +++ b/source/extensions/filters/network/common/redis/client.h @@ -110,6 +110,16 @@ class Config { * processed. */ virtual bool enableRedirection() const PURE; + + /** + * @return buffer size for batching commands for a single upstream host. + */ + virtual unsigned int maxBufferSizeBeforeFlush() const PURE; + + /** + * @return timeout for batching commands for a single upstream host. + */ + virtual std::chrono::milliseconds bufferFlushTimeoutInMs() const PURE; }; /** diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 104003656048..34e610b1b605 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -11,7 +11,10 @@ ConfigImpl::ConfigImpl( const envoy::config::filter::network::redis_proxy::v2::RedisProxy::ConnPoolSettings& config) : op_timeout_(PROTOBUF_GET_MS_REQUIRED(config, op_timeout)), enable_hashtagging_(config.enable_hashtagging()), - enable_redirection_(config.enable_redirection()) {} + enable_redirection_(config.enable_redirection()), + max_buffer_size_before_flush_( + config.max_buffer_size_before_flush()), // This is a scalar, so default is zero. + buffer_flush_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_timeout, 3)) {} ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder, DecoderFactory& decoder_factory, @@ -31,7 +34,8 @@ ClientImpl::ClientImpl(Upstream::HostConstSharedPtr host, Event::Dispatcher& dis EncoderPtr&& encoder, DecoderFactory& decoder_factory, const Config& config) : host_(host), encoder_(std::move(encoder)), decoder_(decoder_factory.create(*this)), config_(config), - connect_or_op_timer_(dispatcher.createTimer([this]() -> void { onConnectOrOpTimeout(); })) { + connect_or_op_timer_(dispatcher.createTimer([this]() -> void { onConnectOrOpTimeout(); })), + flush_timer_(dispatcher.createTimer([this]() -> void { flushBufferAndResetTimer(); })) { host->cluster().stats().upstream_cx_total_.inc(); host->stats().cx_total_.inc(); host->cluster().stats().upstream_cx_active_.inc(); @@ -48,12 +52,26 @@ ClientImpl::~ClientImpl() { void ClientImpl::close() { connection_->close(Network::ConnectionCloseType::NoFlush); } +void ClientImpl::flushBufferAndResetTimer() { + if (flush_timer_->enabled()) { + flush_timer_->disableTimer(); + } + connection_->write(encoder_buffer_, false); +} + PoolRequest* ClientImpl::makeRequest(const RespValue& request, PoolCallbacks& callbacks) { ASSERT(connection_->state() == Network::Connection::State::Open); + bool empty_buffer = encoder_buffer_.length() == 0; + pending_requests_.emplace_back(*this, callbacks); encoder_->encode(request, encoder_buffer_); - connection_->write(encoder_buffer_, false); + + if (encoder_buffer_.length() > config_.maxBufferSizeBeforeFlush()) { + flushBufferAndResetTimer(); + } else if (empty_buffer) { + flush_timer_->enableTimer(std::chrono::milliseconds(config_.bufferFlushTimeoutInMs())); + } // else keep adding to buffer // Only boost the op timeout if: // - We are not already connected. Otherwise, we are governed by the connect timeout and the timer diff --git a/source/extensions/filters/network/common/redis/client_impl.h b/source/extensions/filters/network/common/redis/client_impl.h index 5a44d39e8268..5bb75f782f10 100644 --- a/source/extensions/filters/network/common/redis/client_impl.h +++ b/source/extensions/filters/network/common/redis/client_impl.h @@ -41,11 +41,17 @@ class ConfigImpl : public Config { std::chrono::milliseconds opTimeout() const override { return op_timeout_; } bool enableHashtagging() const override { return enable_hashtagging_; } bool enableRedirection() const override { return enable_redirection_; } + unsigned int maxBufferSizeBeforeFlush() const override { return max_buffer_size_before_flush_; } + std::chrono::milliseconds bufferFlushTimeoutInMs() const override { + return buffer_flush_timeout_; + } private: const std::chrono::milliseconds op_timeout_; const bool enable_hashtagging_; const bool enable_redirection_; + const unsigned int max_buffer_size_before_flush_; + const std::chrono::milliseconds buffer_flush_timeout_; }; class ClientImpl : public Client, public DecoderCallbacks, public Network::ConnectionCallbacks { @@ -62,6 +68,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne } void close() override; PoolRequest* makeRequest(const RespValue& request, PoolCallbacks& callbacks) override; + void flushBufferAndResetTimer(); private: struct UpstreamReadFilter : public Network::ReadFilterBaseImpl { @@ -111,6 +118,7 @@ class ClientImpl : public Client, public DecoderCallbacks, public Network::Conne std::list pending_requests_; Event::TimerPtr connect_or_op_timer_; bool connected_{}; + Event::TimerPtr flush_timer_; }; class ClientFactoryImpl : public ClientFactory { diff --git a/source/extensions/health_checkers/redis/redis.h b/source/extensions/health_checkers/redis/redis.h index 7c93b017b5a9..791171d2d83e 100644 --- a/source/extensions/health_checkers/redis/redis.h +++ b/source/extensions/health_checkers/redis/redis.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "envoy/config/health_checker/redis/v2/redis.pb.validate.h" #include "common/upstream/health_checker_base_impl.h" @@ -63,6 +65,14 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase { return true; } // Redirection errors are treated as check successes. + // Batching + unsigned int maxBufferSizeBeforeFlush() const override { + return 0; + } // Forces an immediate flush + std::chrono::milliseconds bufferFlushTimeoutInMs() const override { + return std::chrono::milliseconds(1); + } + // Extensions::NetworkFilters::Common::Redis::Client::PoolCallbacks void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override; void onFailure() override; diff --git a/test/extensions/filters/network/common/redis/client_impl_test.cc b/test/extensions/filters/network/common/redis/client_impl_test.cc index d5c0e2ac7fae..1354e85a9187 100644 --- a/test/extensions/filters/network/common/redis/client_impl_test.cc +++ b/test/extensions/filters/network/common/redis/client_impl_test.cc @@ -64,6 +64,11 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF upstream_connection_ = new NiceMock(); Upstream::MockHost::MockCreateConnectionData conn_info; conn_info.connection_ = upstream_connection_; + + // Create timers in order they are created in client_impl.cc + connect_or_op_timer_ = {new Event::MockTimer(&dispatcher_)}; + flush_timer_ = {new Event::MockTimer(&dispatcher_)}; + EXPECT_CALL(*connect_or_op_timer_, enableTimer(_)); EXPECT_CALL(*host_, createConnection_(_, _)).WillOnce(Return(conn_info)); EXPECT_CALL(*upstream_connection_, addReadFilter(_)) @@ -89,7 +94,8 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF const std::string cluster_name_{"foo"}; std::shared_ptr host_{new NiceMock()}; Event::MockDispatcher dispatcher_; - Event::MockTimer* connect_or_op_timer_{new Event::MockTimer(&dispatcher_)}; + Event::MockTimer* flush_timer_{}; + Event::MockTimer* connect_or_op_timer_{}; MockEncoder* encoder_{new MockEncoder()}; MockDecoder* decoder_{new MockDecoder()}; Common::Redis::DecoderCallbacks* callbacks_{}; @@ -99,6 +105,134 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF ClientPtr client_; }; +TEST_F(RedisClientImplTest, BatchWithZeroBufferAndTimeout) { + // Basic test with a single request, default buffer size (0) and timeout (0). + // This means we do not batch requests, and thus the flush timer is never enabled. + InSequence s; + + setup(); + + // Make the dummy request + Common::Redis::RespValue request1; + MockPoolCallbacks callbacks1; + EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); + EXPECT_NE(nullptr, handle1); + + // Process the dummy request + Buffer::OwnedImpl fake_data; + EXPECT_CALL(*decoder_, decode(Ref(fake_data))).WillOnce(Invoke([&](Buffer::Instance&) -> void { + InSequence s; + Common::Redis::RespValuePtr response1(new Common::Redis::RespValue()); + EXPECT_CALL(callbacks1, onResponse_(Ref(response1))); + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + EXPECT_CALL(host_->outlier_detector_, putResult(Upstream::Outlier::Result::SUCCESS)); + callbacks_->onRespValue(std::move(response1)); + })); + upstream_read_filter_->onData(fake_data, false); + + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + client_->close(); +} + +class ConfigBufferSizeGTSingleRequest : public Config { + bool disableOutlierEvents() const override { return false; } + std::chrono::milliseconds opTimeout() const override { return std::chrono::milliseconds(25); } + bool enableHashtagging() const override { return false; } + unsigned int maxBufferSizeBeforeFlush() const override { return 8; } + std::chrono::milliseconds bufferFlushTimeoutInMs() const override { + return std::chrono::milliseconds(1); + } +}; + +TEST_F(RedisClientImplTest, BatchWithTimerFiring) { + // With a flush buffer > single request length, the flush timer comes into play. + // In this test, we make a single request that doesn't fill the buffer, so we + // have to wait for the flush timer to fire. + InSequence s; + + setup(std::make_unique()); + + // Make the dummy request + Common::Redis::RespValue request1; + MockPoolCallbacks callbacks1; + EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enableTimer(_)); + PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); + EXPECT_NE(nullptr, handle1); + + // Pretend the the flush timer fires + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; // timer is disabled, as it has already fired + flush_timer_->invokeCallback(); + + // Process the dummy request + Buffer::OwnedImpl fake_data; + EXPECT_CALL(*decoder_, decode(Ref(fake_data))).WillOnce(Invoke([&](Buffer::Instance&) -> void { + InSequence s; + Common::Redis::RespValuePtr response1(new Common::Redis::RespValue()); + EXPECT_CALL(callbacks1, onResponse_(Ref(response1))); + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + EXPECT_CALL(host_->outlier_detector_, putResult(Upstream::Outlier::Result::SUCCESS)); + callbacks_->onRespValue(std::move(response1)); + })); + upstream_read_filter_->onData(fake_data, false); + + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + client_->close(); +} + +TEST_F(RedisClientImplTest, BatchWithTimerCancelledByBufferFlush) { + // Expanding on the previous test, let's the flush buffer is filled by two requests. + // In this test, we make a single request that doesn't fill the buffer, and the timer + // starts. However, a second request comes in, which should cancel the timer, such + // that it is never invoked. + InSequence s; + + setup(std::make_unique()); + + // Make the dummy request (doesn't fill buffer, starts timer) + Common::Redis::RespValue request1; + MockPoolCallbacks callbacks1; + EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enableTimer(_)); + PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); + EXPECT_NE(nullptr, handle1); + + // Make a second dummy request (fills buffer, cancels timer) + Common::Redis::RespValue request2; + MockPoolCallbacks callbacks2; + EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(true)); + ; + EXPECT_CALL(*flush_timer_, disableTimer()); + PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); + EXPECT_NE(nullptr, handle2); + + // Process the dummy requests + Buffer::OwnedImpl fake_data; + EXPECT_CALL(*decoder_, decode(Ref(fake_data))).WillOnce(Invoke([&](Buffer::Instance&) -> void { + InSequence s; + Common::Redis::RespValuePtr response1(new Common::Redis::RespValue()); + EXPECT_CALL(callbacks1, onResponse_(Ref(response1))); + EXPECT_CALL(*connect_or_op_timer_, enableTimer(_)); + EXPECT_CALL(host_->outlier_detector_, putResult(Upstream::Outlier::Result::SUCCESS)); + callbacks_->onRespValue(std::move(response1)); + + Common::Redis::RespValuePtr response2(new Common::Redis::RespValue()); + EXPECT_CALL(callbacks2, onResponse_(Ref(response2))); + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + EXPECT_CALL(host_->outlier_detector_, putResult(Upstream::Outlier::Result::SUCCESS)); + callbacks_->onRespValue(std::move(response2)); + })); + upstream_read_filter_->onData(fake_data, false); + + EXPECT_CALL(*upstream_connection_, close(Network::ConnectionCloseType::NoFlush)); + EXPECT_CALL(*connect_or_op_timer_, disableTimer()); + client_->close(); +} + TEST_F(RedisClientImplTest, Basic) { InSequence s; @@ -107,6 +241,8 @@ TEST_F(RedisClientImplTest, Basic) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -115,6 +251,8 @@ TEST_F(RedisClientImplTest, Basic) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -153,6 +291,8 @@ TEST_F(RedisClientImplTest, Cancel) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -161,6 +301,8 @@ TEST_F(RedisClientImplTest, Cancel) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -202,6 +344,8 @@ TEST_F(RedisClientImplTest, FailAll) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -228,6 +372,8 @@ TEST_F(RedisClientImplTest, FailAllWithCancel) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -252,6 +398,8 @@ TEST_F(RedisClientImplTest, ProtocolError) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -279,6 +427,8 @@ TEST_F(RedisClientImplTest, ConnectFail) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -296,6 +446,10 @@ class ConfigOutlierDisabled : public Config { std::chrono::milliseconds opTimeout() const override { return std::chrono::milliseconds(25); } bool enableHashtagging() const override { return false; } bool enableRedirection() const override { return false; } + unsigned int maxBufferSizeBeforeFlush() const override { return 0; } + std::chrono::milliseconds bufferFlushTimeoutInMs() const override { + return std::chrono::milliseconds(0); + } }; TEST_F(RedisClientImplTest, OutlierDisabled) { @@ -306,6 +460,8 @@ TEST_F(RedisClientImplTest, OutlierDisabled) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -326,6 +482,8 @@ TEST_F(RedisClientImplTest, ConnectTimeout) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -347,6 +505,8 @@ TEST_F(RedisClientImplTest, OpTimeout) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); + ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); From bcee909725b612ac28d6d2739c7223a7272d2895 Mon Sep 17 00:00:00 2001 From: Nicolas Flacco Date: Fri, 12 Apr 2019 10:42:15 -0700 Subject: [PATCH 2/9] updated per commits and conflict from branch (redirection woooo) Signed-off-by: Nicolas Flacco --- .../config/filter/network/redis_proxy/v2/redis_proxy.proto | 3 ++- docs/root/intro/version_history.rst | 6 +++--- .../filters/network/common/redis/client_impl_test.cc | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto index 4d278aaac30e..aa69be97a2c8 100644 --- a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto +++ b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto @@ -54,7 +54,8 @@ message RedisProxy { // original error is passed downstream unchanged. By default, this support is not enabled. bool enable_redirection = 3; - // Maximum size of buffer before flush is triggered + // Maximum size of buffer before flush is triggered. If this is unset, the buffer flushes + // whenever it receives data and performs no batching. uint32 max_buffer_size_before_flush = 4; // Buffer is flushed every N milliseconds (unless the buffer is filled before the timer fires) diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 40d612af5356..cee84b5c2229 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -3,6 +3,9 @@ Version history 1.11.0 (Pending) ================ +* redis: added + :ref:`max_buffer_size_before_flush ` to batch commands together until the encoder buffer hits a certain size, and + :ref:`buffer_flush_timeout ` to control how quickly the buffer is flushed if it is not full. 1.10.0 (Apr 5, 2019) ==================== @@ -63,9 +66,6 @@ Version history * redis: added :ref:`success and error stats ` for commands. * redis: migrate hash function for host selection to `MurmurHash2 `_ from std::hash. MurmurHash2 is compatible with std::hash in GNU libstdc++ 3.4.20 or above. This is typically the case when compiled on Linux and not macOS. * redis: added :ref:`latency_in_micros ` to specify the redis commands stats time unit in microseconds. -* redis: added - :ref:`max_buffer_size_before_flush ` to batch commands together until the encoder buffer hits a certain size, and - :ref:`buffer_flush_timeout ` to control how quickly the buffer is flushed if it is not full. * router: added ability to configure a :ref:`retry policy ` at the virtual host level. * router: added reset reason to response body when upstream reset happens. After this change, the response body will be of the form `upstream connect error or disconnect/reset before headers. reset reason:` diff --git a/test/extensions/filters/network/common/redis/client_impl_test.cc b/test/extensions/filters/network/common/redis/client_impl_test.cc index 1354e85a9187..0dae9ec3384f 100644 --- a/test/extensions/filters/network/common/redis/client_impl_test.cc +++ b/test/extensions/filters/network/common/redis/client_impl_test.cc @@ -140,6 +140,7 @@ class ConfigBufferSizeGTSingleRequest : public Config { bool disableOutlierEvents() const override { return false; } std::chrono::milliseconds opTimeout() const override { return std::chrono::milliseconds(25); } bool enableHashtagging() const override { return false; } + bool enableRedirection() const override { return false; } unsigned int maxBufferSizeBeforeFlush() const override { return 8; } std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return std::chrono::milliseconds(1); From 77d5652bf826e2f2911227b792117234a31551df Mon Sep 17 00:00:00 2001 From: Nicolas Flacco Date: Fri, 12 Apr 2019 12:23:45 -0700 Subject: [PATCH 3/9] fix test failures resulting from merge of ask move redirect code Signed-off-by: Nicolas Flacco --- .../filters/network/common/redis/client_impl_test.cc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/extensions/filters/network/common/redis/client_impl_test.cc b/test/extensions/filters/network/common/redis/client_impl_test.cc index 0dae9ec3384f..3598365abfc2 100644 --- a/test/extensions/filters/network/common/redis/client_impl_test.cc +++ b/test/extensions/filters/network/common/redis/client_impl_test.cc @@ -531,6 +531,7 @@ TEST_F(RedisClientImplTest, AskRedirection) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -539,6 +540,7 @@ TEST_F(RedisClientImplTest, AskRedirection) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -589,6 +591,7 @@ TEST_F(RedisClientImplTest, MovedRedirection) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -597,6 +600,7 @@ TEST_F(RedisClientImplTest, MovedRedirection) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -647,6 +651,7 @@ TEST_F(RedisClientImplTest, AskRedirectionNotEnabled) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -655,6 +660,7 @@ TEST_F(RedisClientImplTest, AskRedirectionNotEnabled) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -706,6 +712,7 @@ TEST_F(RedisClientImplTest, MovedRedirectionNotEnabled) { Common::Redis::RespValue request1; MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -714,6 +721,7 @@ TEST_F(RedisClientImplTest, MovedRedirectionNotEnabled) { Common::Redis::RespValue request2; MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); + EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); From 668c29873fd246533b7b9ea02579359a02fdb69e Mon Sep 17 00:00:00 2001 From: Nicolas Flacco Date: Tue, 16 Apr 2019 10:15:31 -0700 Subject: [PATCH 4/9] update history Signed-off-by: Nicolas Flacco --- docs/root/intro/version_history.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 3739cb710060..524591e33396 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -3,12 +3,12 @@ Version history 1.11.0 (Pending) ================ -* redis: added - :ref:`max_buffer_size_before_flush ` to batch commands together until the encoder buffer hits a certain size, and - :ref:`buffer_flush_timeout ` to control how quickly the buffer is flushed if it is not full. * dubbo_proxy: support the :ref:`Dubbo proxy filter `. * http: mitigated a race condition with the :ref:`delayed_close_timeout` where it could trigger while actively flushing a pending write buffer for a downstream connection. * redis: add support for zpopmax and zpopmin commands. +* redis: added + :ref:`max_buffer_size_before_flush ` to batch commands together until the encoder buffer hits a certain size, and + :ref:`buffer_flush_timeout ` to control how quickly the buffer is flushed if it is not full. * upstream: added :ref:`upstream_cx_pool_overflow ` for the connection pool circuit breaker. 1.10.0 (Apr 5, 2019) From 2fd41beacab4e7025eb4fd532d76e596a1b2b8bc Mon Sep 17 00:00:00 2001 From: Nicolas Flacco Date: Tue, 16 Apr 2019 13:29:26 -0700 Subject: [PATCH 5/9] Address matt comments Signed-off-by: Nicolas Flacco --- .../network/redis_proxy/v2/redis_proxy.proto | 20 ++++++++++++++--- .../filters/network/common/redis/client.h | 4 +++- .../network/common/redis/client_impl.cc | 11 +++++++--- .../network/common/redis/client_impl.h | 4 ++-- .../network/common/redis/client_impl_test.cc | 22 ++++++------------- 5 files changed, 37 insertions(+), 24 deletions(-) diff --git a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto index aa69be97a2c8..2db5926eab3f 100644 --- a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto +++ b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto @@ -54,11 +54,25 @@ message RedisProxy { // original error is passed downstream unchanged. By default, this support is not enabled. bool enable_redirection = 3; - // Maximum size of buffer before flush is triggered. If this is unset, the buffer flushes - // whenever it receives data and performs no batching. + // Maximum size of encoded request buffer before flush is triggered and encoded requests + // are sent upstream. If this is unset, the buffer flushes whenever it receives data + // and performs no batching. + // This feature makes it possible for multiple clients to send requests to Envoy and have + // them batched- for example if one is running several worker processes, each with its own + // Redis connection. There is no benefit to using this with a single downstream process. + // Recommended size (if enabled) is 1024 bytes. uint32 max_buffer_size_before_flush = 4; - // Buffer is flushed every N milliseconds (unless the buffer is filled before the timer fires) + // The encoded request buffer is flushed N milliseconds after the first request has been + // encoded, unless the buffer size has already exceeded `max_buffer_size_before_flush`. + // If `max_buffer_size_before_flush` is not set, this flush timer is not used. Otherwise, + // the timer should be set according to the number of clients, overall request rate and + // desired maximum latency for a single command. For example, if there are many requests + // being batched together at a high rate, the buffer will likely be filled before the timer + // fires. Alternatively, if the request rate is lower the buffer will not be filled as often + // before the timer fires. + // If `max_buffer_size_before_flush` is set, but `buffer_flush_timeout`, the latter + // defaults to 3ms. google.protobuf.Duration buffer_flush_timeout = 5 [(gogoproto.stdduration) = true]; } diff --git a/source/extensions/filters/network/common/redis/client.h b/source/extensions/filters/network/common/redis/client.h index 8cfcfe76c053..4a7c53912afc 100644 --- a/source/extensions/filters/network/common/redis/client.h +++ b/source/extensions/filters/network/common/redis/client.h @@ -1,5 +1,7 @@ #pragma once +#include + #include "envoy/upstream/cluster_manager.h" #include "extensions/filters/network/common/redis/codec_impl.h" @@ -114,7 +116,7 @@ class Config { /** * @return buffer size for batching commands for a single upstream host. */ - virtual unsigned int maxBufferSizeBeforeFlush() const PURE; + virtual uint32_t maxBufferSizeBeforeFlush() const PURE; /** * @return timeout for batching commands for a single upstream host. diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 34e610b1b605..2121a2ddd7e8 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -14,7 +14,11 @@ ConfigImpl::ConfigImpl( enable_redirection_(config.enable_redirection()), max_buffer_size_before_flush_( config.max_buffer_size_before_flush()), // This is a scalar, so default is zero. - buffer_flush_timeout_(PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_timeout, 3)) {} + buffer_flush_timeout_(PROTOBUF_GET_MS_OR_DEFAULT( + config, buffer_flush_timeout, + 3)) // Default timeout is 3ms. If max_buffer_size_before_flush is zero, this is not used + // as the buffer is flushed on each request immediately. +{} ClientPtr ClientImpl::create(Upstream::HostConstSharedPtr host, Event::Dispatcher& dispatcher, EncoderPtr&& encoder, DecoderFactory& decoder_factory, @@ -62,16 +66,17 @@ void ClientImpl::flushBufferAndResetTimer() { PoolRequest* ClientImpl::makeRequest(const RespValue& request, PoolCallbacks& callbacks) { ASSERT(connection_->state() == Network::Connection::State::Open); - bool empty_buffer = encoder_buffer_.length() == 0; + const bool empty_buffer = encoder_buffer_.length() == 0; pending_requests_.emplace_back(*this, callbacks); encoder_->encode(request, encoder_buffer_); + // If buffer is full, flush. If the the buffer is empty, start the timer. if (encoder_buffer_.length() > config_.maxBufferSizeBeforeFlush()) { flushBufferAndResetTimer(); } else if (empty_buffer) { flush_timer_->enableTimer(std::chrono::milliseconds(config_.bufferFlushTimeoutInMs())); - } // else keep adding to buffer + } // Only boost the op timeout if: // - We are not already connected. Otherwise, we are governed by the connect timeout and the timer diff --git a/source/extensions/filters/network/common/redis/client_impl.h b/source/extensions/filters/network/common/redis/client_impl.h index 5bb75f782f10..fd9b7b7af7b8 100644 --- a/source/extensions/filters/network/common/redis/client_impl.h +++ b/source/extensions/filters/network/common/redis/client_impl.h @@ -41,7 +41,7 @@ class ConfigImpl : public Config { std::chrono::milliseconds opTimeout() const override { return op_timeout_; } bool enableHashtagging() const override { return enable_hashtagging_; } bool enableRedirection() const override { return enable_redirection_; } - unsigned int maxBufferSizeBeforeFlush() const override { return max_buffer_size_before_flush_; } + uint32_t maxBufferSizeBeforeFlush() const override { return max_buffer_size_before_flush_; } std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_flush_timeout_; } @@ -50,7 +50,7 @@ class ConfigImpl : public Config { const std::chrono::milliseconds op_timeout_; const bool enable_hashtagging_; const bool enable_redirection_; - const unsigned int max_buffer_size_before_flush_; + const uint32_t max_buffer_size_before_flush_; const std::chrono::milliseconds buffer_flush_timeout_; }; diff --git a/test/extensions/filters/network/common/redis/client_impl_test.cc b/test/extensions/filters/network/common/redis/client_impl_test.cc index 3598365abfc2..c1d8269f5b02 100644 --- a/test/extensions/filters/network/common/redis/client_impl_test.cc +++ b/test/extensions/filters/network/common/redis/client_impl_test.cc @@ -66,8 +66,8 @@ class RedisClientImplTest : public testing::Test, public Common::Redis::DecoderF conn_info.connection_ = upstream_connection_; // Create timers in order they are created in client_impl.cc - connect_or_op_timer_ = {new Event::MockTimer(&dispatcher_)}; - flush_timer_ = {new Event::MockTimer(&dispatcher_)}; + connect_or_op_timer_ = new Event::MockTimer(&dispatcher_); + flush_timer_ = new Event::MockTimer(&dispatcher_); EXPECT_CALL(*connect_or_op_timer_, enableTimer(_)); EXPECT_CALL(*host_, createConnection_(_, _)).WillOnce(Return(conn_info)); @@ -163,9 +163,12 @@ TEST_F(RedisClientImplTest, BatchWithTimerFiring) { PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); - // Pretend the the flush timer fires + // Pretend the the flush timer fires. + // The timer callback is the general-purpose flush function, also used when + // the buffer is filled. If the buffer fills before the timer fires, we need + // to check if the timer is active and cancel it. However, if the timer fires + // the callback, this internal check returns false as the timer is finished. EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; // timer is disabled, as it has already fired flush_timer_->invokeCallback(); // Process the dummy request @@ -243,7 +246,6 @@ TEST_F(RedisClientImplTest, Basic) { MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -253,7 +255,6 @@ TEST_F(RedisClientImplTest, Basic) { MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -293,7 +294,6 @@ TEST_F(RedisClientImplTest, Cancel) { MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -303,7 +303,6 @@ TEST_F(RedisClientImplTest, Cancel) { MockPoolCallbacks callbacks2; EXPECT_CALL(*encoder_, encode(Ref(request2), _)); EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; PoolRequest* handle2 = client_->makeRequest(request2, callbacks2); EXPECT_NE(nullptr, handle2); @@ -346,7 +345,6 @@ TEST_F(RedisClientImplTest, FailAll) { MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -374,7 +372,6 @@ TEST_F(RedisClientImplTest, FailAllWithCancel) { MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -400,7 +397,6 @@ TEST_F(RedisClientImplTest, ProtocolError) { MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -429,7 +425,6 @@ TEST_F(RedisClientImplTest, ConnectFail) { MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -462,7 +457,6 @@ TEST_F(RedisClientImplTest, OutlierDisabled) { MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -484,7 +478,6 @@ TEST_F(RedisClientImplTest, ConnectTimeout) { MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); @@ -507,7 +500,6 @@ TEST_F(RedisClientImplTest, OpTimeout) { MockPoolCallbacks callbacks1; EXPECT_CALL(*encoder_, encode(Ref(request1), _)); EXPECT_CALL(*flush_timer_, enabled()).WillOnce(Return(false)); - ; PoolRequest* handle1 = client_->makeRequest(request1, callbacks1); EXPECT_NE(nullptr, handle1); From 318c3e634fa743581aab57017088817c2fa46a2b Mon Sep 17 00:00:00 2001 From: Nicolas Flacco Date: Wed, 17 Apr 2019 14:08:59 -0700 Subject: [PATCH 6/9] add integration test Signed-off-by: Nicolas Flacco --- .../redis_proxy_integration_test.cc | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc index d23c2b264951..46898c34cdc9 100644 --- a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc +++ b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc @@ -65,6 +65,12 @@ const std::string CONFIG_WITH_REDIRECTION = CONFIG + R"EOF( enable_redirection: true )EOF"; +// This is a configuration with batching enabled. +const std::string CONFIG_WITH_BATCHING = CONFIG + R"EOF( + max_buffer_size_before_flush: 1024 + buffer_flush_timeout: 0.003s +)EOF"; + // This function encodes commands as an array of bulkstrings as transmitted by Redis clients to // Redis servers, according to the Redis protocol. std::string makeBulkStringArray(std::vector&& command_strings) { @@ -148,6 +154,11 @@ class RedisProxyWithRedirectionIntegrationTest : public RedisProxyIntegrationTes const std::string& asking_response = "+OK\r\n"); }; +class RedisProxyWithBatchingIntegrationTest : public RedisProxyIntegrationTest { +public: + RedisProxyWithBatchingIntegrationTest() : RedisProxyIntegrationTest(CONFIG_WITH_BATCHING, 2) {} +}; + INSTANTIATE_TEST_SUITE_P(IpVersions, RedisProxyIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), TestUtility::ipTestParamsToString); @@ -156,6 +167,10 @@ INSTANTIATE_TEST_SUITE_P(IpVersions, RedisProxyWithRedirectionIntegrationTest, testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), TestUtility::ipTestParamsToString); +INSTANTIATE_TEST_SUITE_P(IpVersions, RedisProxyWithBatchingIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), + TestUtility::ipTestParamsToString); + void RedisProxyIntegrationTest::initialize() { setUpstreamCount(num_upstreams_); setDeterministic(); @@ -445,5 +460,41 @@ TEST_P(RedisProxyWithRedirectionIntegrationTest, IgnoreRedirectionForAsking) { asking_response.str()); } +// This test verifies that batching works properly. If batching is enabled, when multiple +// clients make a request to a Redis server within a certain time window, they will be batched +// together. The below example, two clients send "GET foo", and Redis receives those two as +// a single, concatendated request. + +TEST_P(RedisProxyWithBatchingIntegrationTest, SimpleBatching) { + initialize(); + + const std::string& request = makeBulkStringArray({"get", "foo"}); + const std::string& response = "$3\r\nbar\r\n"; + + std::string proxy_to_server; + IntegrationTcpClientPtr redis_client_1 = makeTcpConnection(lookupPort("redis_proxy")); + IntegrationTcpClientPtr redis_client_2 = makeTcpConnection(lookupPort("redis_proxy")); + redis_client_1->write(request); + redis_client_2->write(request); + + FakeRawConnectionPtr fake_upstream_connection; + EXPECT_TRUE(fake_upstreams_[0]->waitForRawConnection(fake_upstream_connection)); + EXPECT_TRUE(fake_upstream_connection->waitForData(request.size() * 2, &proxy_to_server)); + // The original request should be the same as the data received by the server. + EXPECT_EQ(request + request, proxy_to_server); + + EXPECT_TRUE(fake_upstream_connection->write(response + response)); + redis_client_1->waitForData(response); + redis_client_2->waitForData(response); + // The original response should be received by the fake Redis client. + EXPECT_EQ(response, redis_client_1->data()); + EXPECT_EQ(response, redis_client_2->data()); + + redis_client_1->close(); + EXPECT_TRUE(fake_upstream_connection->close()); + redis_client_2->close(); + EXPECT_TRUE(fake_upstream_connection->close()); +} + } // namespace } // namespace Envoy From 74f5658c5d16d3edb70730ac804a517f5bc5d713 Mon Sep 17 00:00:00 2001 From: Nicolas Flacco Date: Wed, 17 Apr 2019 14:17:33 -0700 Subject: [PATCH 7/9] fix spelling Signed-off-by: Nicolas Flacco --- .../filters/network/redis_proxy/redis_proxy_integration_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc index 46898c34cdc9..4d009fd330bd 100644 --- a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc +++ b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc @@ -463,7 +463,7 @@ TEST_P(RedisProxyWithRedirectionIntegrationTest, IgnoreRedirectionForAsking) { // This test verifies that batching works properly. If batching is enabled, when multiple // clients make a request to a Redis server within a certain time window, they will be batched // together. The below example, two clients send "GET foo", and Redis receives those two as -// a single, concatendated request. +// a single, concatenated request. TEST_P(RedisProxyWithBatchingIntegrationTest, SimpleBatching) { initialize(); From 441537a4661a2510bf8068f66247bf109a23f14b Mon Sep 17 00:00:00 2001 From: Nicolas Flacco Date: Wed, 17 Apr 2019 15:46:42 -0700 Subject: [PATCH 8/9] one char change to trigger new tests as I suspect flaky unrelated failure Signed-off-by: Nicolas Flacco --- .../filters/network/redis_proxy/redis_proxy_integration_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc index 4194a570d3ed..a51671035e80 100644 --- a/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc +++ b/test/extensions/filters/network/redis_proxy/redis_proxy_integration_test.cc @@ -567,7 +567,7 @@ TEST_P(RedisProxyWithRedirectionIntegrationTest, IgnoreRedirectionForAsking) { // This test verifies that batching works properly. If batching is enabled, when multiple // clients make a request to a Redis server within a certain time window, they will be batched // together. The below example, two clients send "GET foo", and Redis receives those two as -// a single, concatenated request. +// a single concatenated request. TEST_P(RedisProxyWithBatchingIntegrationTest, SimpleBatching) { initialize(); From 602f0c16219befdceab1a109b587f1e1ce14a8af Mon Sep 17 00:00:00 2001 From: Nicolas Flacco Date: Thu, 18 Apr 2019 09:30:42 -0700 Subject: [PATCH 9/9] update per matt comments Signed-off-by: Nicolas Flacco --- .../config/filter/network/redis_proxy/v2/redis_proxy.proto | 2 +- source/extensions/filters/network/common/redis/client_impl.cc | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto index 4fd1331a2503..eec8c3f40954 100644 --- a/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto +++ b/api/envoy/config/filter/network/redis_proxy/v2/redis_proxy.proto @@ -77,7 +77,7 @@ message RedisProxy { // being batched together at a high rate, the buffer will likely be filled before the timer // fires. Alternatively, if the request rate is lower the buffer will not be filled as often // before the timer fires. - // If `max_buffer_size_before_flush` is set, but `buffer_flush_timeout`, the latter + // If `max_buffer_size_before_flush` is set, but `buffer_flush_timeout` is not, the latter // defaults to 3ms. google.protobuf.Duration buffer_flush_timeout = 5 [(gogoproto.stdduration) = true]; } diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 2121a2ddd7e8..fa4bb4bb5c76 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -71,8 +71,8 @@ PoolRequest* ClientImpl::makeRequest(const RespValue& request, PoolCallbacks& ca pending_requests_.emplace_back(*this, callbacks); encoder_->encode(request, encoder_buffer_); - // If buffer is full, flush. If the the buffer is empty, start the timer. - if (encoder_buffer_.length() > config_.maxBufferSizeBeforeFlush()) { + // If buffer is full, flush. If the the buffer was empty before the request, start the timer. + if (encoder_buffer_.length() >= config_.maxBufferSizeBeforeFlush()) { flushBufferAndResetTimer(); } else if (empty_buffer) { flush_timer_->enableTimer(std::chrono::milliseconds(config_.bufferFlushTimeoutInMs()));