Skip to content

Commit

Permalink
http fault: add response rate limit injection
Browse files Browse the repository at this point in the history
Part of #5942

Signed-off-by: Matt Klein <mklein@lyft.com>
  • Loading branch information
mattklein123 committed Mar 12, 2019
1 parent 56b0309 commit 50062ed
Show file tree
Hide file tree
Showing 12 changed files with 640 additions and 96 deletions.
23 changes: 21 additions & 2 deletions api/envoy/config/filter/fault/v2/fault.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,25 @@ message FaultDelay {
[(validate.rules).duration.gt = {}, (gogoproto.stdduration) = true];
}

// The percentage of operations/connection requests on which the delay will be injected.
envoy.type.FractionalPercent percentage = 4;
// The percentage of operations/connections/requests on which the delay will be injected.
type.FractionalPercent percentage = 4;
}

// Describes a rate limit to be applied.
message FaultRateLimit {
// Describes a fixed/constant rate limit.
message FixedLimit {
// The limit supplied in KiB/s.
uint64 limit_kbps = 1 [(validate.rules).uint64.gte = 1];
}

oneof limit_type {
option (validate.required) = true;

// A fixed rate limit.
FixedLimit fixed_limit = 1;
}

// The percentage of operations/connections/requests on which the rate limit will be injected.
type.FractionalPercent percentage = 2;
}
9 changes: 6 additions & 3 deletions api/envoy/config/filter/http/fault/v2/fault.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ message FaultAbort {

// The percentage of requests/operations/connections that will be aborted with the error code
// provided.
envoy.type.FractionalPercent percentage = 3;
type.FractionalPercent percentage = 3;
}

message HTTPFault {
// If specified, the filter will inject delays based on the values in the
// object. At least *abort* or *delay* must be specified.
envoy.config.filter.fault.v2.FaultDelay delay = 1;
// object.
filter.fault.v2.FaultDelay delay = 1;

// If specified, the filter will abort requests based on the values in
// the object. At least *abort* or *delay* must be specified.
Expand Down Expand Up @@ -79,4 +79,7 @@ message HTTPFault {
// limit. It's possible for the number of active faults to rise slightly above the configured
// amount due to the implementation details.
google.protobuf.UInt32Value max_active_faults = 6;

// The response rate limit to be applied to the stream.
filter.fault.v2.FaultRateLimit response_rate_limit = 7;
}
7 changes: 7 additions & 0 deletions docs/root/configuration/http_filters/fault_filter.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ fault.http.max_active_faults
cause resource constraint issues. If not specified, the :ref:`max_active_faults
<envoy_api_field_config.filter.http.fault.v2.HTTPFault.max_active_faults>` setting will be used.

fault.http.rate_limit.response
% of requests which will have a response rate limit fault injected, if the filter is
:ref:`configured <envoy_api_field_config.filter.http.fault.v2.HTTPFault.response_rate_limit>` to
do so. Defaults to the value set in the :ref:`percentage
<envoy_api_field_config.filter.fault.v2.FaultRateLimit.percentage>` field.

*Note*, fault filter runtime settings for the specific downstream cluster
override the default ones if present. The following are downstream specific
runtime keys:
Expand Down Expand Up @@ -100,6 +106,7 @@ owning HTTP connection manager.

delays_injected, Counter, Total requests that were delayed
aborts_injected, Counter, Total requests that were aborted
response_rl_injected, Counter, Total requests that had a response rate limit injected
faults_overflow, Counter, Total number of faults that were not injected due to overflowing the :ref:`max_active_faults <envoy_api_field_config.filter.http.fault.v2.HTTPFault.max_active_faults>` setting
active_faults, Gauge, Total number of faults active at the current time
<downstream-cluster>.delays_injected, Counter, Total delayed requests for the given downstream cluster
Expand Down
2 changes: 2 additions & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Version history
<envoy_api_field_config.filter.http.fault.v2.HTTPFault.max_active_faults>` setting, as well as
:ref:`statistics <config_http_filters_fault_injection_stats>` for the number of active faults
and the number of faults the overflowed.
* fault: add :ref:`response rate limit
<envoy_api_field_config.filter.http.fault.v2.HTTPFault.response_rate_limit>` fault injection.
* governance: extending Envoy deprecation policy from 1 release (0-3 months) to 2 releases (3-6 months).
* health check: expected response codes in http health checks are now :ref:`configurable <envoy_api_msg_core.HealthCheck.HttpHealthCheck>`.
* http: added new grpc_http1_reverse_bridge filter for converting gRPC requests into HTTP/1.1 requests.
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/fault/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ envoy_cc_library(
"//include/envoy/stats:stats_macros",
"//source/common/common:assert_lib",
"//source/common/common:empty_string",
"//source/common/common:token_bucket_impl_lib",
"//source/common/http:codes_lib",
"//source/common/http:header_map_lib",
"//source/common/http:header_utility_lib",
Expand Down
6 changes: 3 additions & 3 deletions source/extensions/filters/http/fault/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ namespace Fault {
Http::FilterFactoryCb FaultFilterFactory::createFilterFactoryFromProtoTyped(
const envoy::config::filter::http::fault::v2::HTTPFault& config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) {
FaultFilterConfigSharedPtr filter_config(
new FaultFilterConfig(config, context.runtime(), stats_prefix, context.scope()));
FaultFilterConfigSharedPtr filter_config(new FaultFilterConfig(
config, context.runtime(), stats_prefix, context.scope(), context.timeSource()));
return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamDecoderFilter(std::make_shared<FaultFilter>(filter_config));
callbacks.addStreamFilter(std::make_shared<FaultFilter>(filter_config));
};
}

Expand Down
178 changes: 164 additions & 14 deletions source/extensions/filters/http/fault/fault_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ namespace HttpFilters {
namespace Fault {

FaultSettings::FaultSettings(const envoy::config::filter::http::fault::v2::HTTPFault& fault) {

if (fault.has_abort()) {
const auto& abort = fault.abort();
abort_percentage_ = abort.percentage();
Expand All @@ -53,13 +52,21 @@ FaultSettings::FaultSettings(const envoy::config::filter::http::fault::v2::HTTPF
if (fault.has_max_active_faults()) {
max_active_faults_ = fault.max_active_faults().value();
}

if (fault.has_response_rate_limit()) {
RateLimit rate_limit;
ASSERT(fault.response_rate_limit().has_fixed_limit());
rate_limit.fixed_rate_kbps_ = fault.response_rate_limit().fixed_limit().limit_kbps();
rate_limit.percentage_ = fault.response_rate_limit().percentage();
response_rate_limit_ = rate_limit;
}
}

FaultFilterConfig::FaultFilterConfig(const envoy::config::filter::http::fault::v2::HTTPFault& fault,
Runtime::Loader& runtime, const std::string& stats_prefix,
Stats::Scope& scope)
Stats::Scope& scope, TimeSource& time_source)
: settings_(fault), runtime_(runtime), stats_(generateStats(stats_prefix, scope)),
stats_prefix_(stats_prefix), scope_(scope) {}
stats_prefix_(stats_prefix), scope_(scope), time_source_(time_source) {}

FaultFilter::FaultFilter(FaultFilterConfigSharedPtr config) : config_(config) {}

Expand All @@ -75,9 +82,9 @@ Http::FilterHeadersStatus FaultFilter::decodeHeaders(Http::HeaderMap& headers, b
// faults. In other words, runtime is supported only when faults are
// configured at the filter level.
fault_settings_ = config_->settings();
if (callbacks_->route() && callbacks_->route()->routeEntry()) {
if (decoder_callbacks_->route() && decoder_callbacks_->route()->routeEntry()) {
const std::string& name = Extensions::HttpFilters::HttpFilterNames::get().Fault;
const auto* route_entry = callbacks_->route()->routeEntry();
const auto* route_entry = decoder_callbacks_->route()->routeEntry();

const FaultSettings* tmp = route_entry->perFilterConfigTyped<FaultSettings>(name);
const FaultSettings* per_route_settings =
Expand Down Expand Up @@ -115,12 +122,15 @@ Http::FilterHeadersStatus FaultFilter::decodeHeaders(Http::HeaderMap& headers, b
fmt::format("fault.http.{}.abort.http_status", downstream_cluster_);
}

maybeSetupResponseRateLimit();

absl::optional<uint64_t> duration_ms = delayDuration();
if (duration_ms) {
delay_timer_ = callbacks_->dispatcher().createTimer([this]() -> void { postDelayInjection(); });
delay_timer_ =
decoder_callbacks_->dispatcher().createTimer([this]() -> void { postDelayInjection(); });
delay_timer_->enableTimer(std::chrono::milliseconds(duration_ms.value()));
recordDelaysInjectedStats();
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DelayInjected);
decoder_callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DelayInjected);
return Http::FilterHeadersStatus::StopIteration;
}

Expand All @@ -132,6 +142,38 @@ Http::FilterHeadersStatus FaultFilter::decodeHeaders(Http::HeaderMap& headers, b
return Http::FilterHeadersStatus::Continue;
}

void FaultFilter::maybeSetupResponseRateLimit() {
if (!fault_settings_->responseRateLimit().has_value()) {
return;
}

// TODO(mattklein123): Allow runtime override via downstream cluster similar to the other keys.
if (!config_->runtime().snapshot().featureEnabled(
RuntimeKeys::get().ResponseRateLimitKey,
fault_settings_->responseRateLimit().value().percentage_)) {
return;
}

incActiveFaults();
config_->stats().response_rl_injected_.inc();

response_limiter_.reset(new StreamRateLimiter(
fault_settings_->responseRateLimit().value().fixed_rate_kbps_,
encoder_callbacks_->encoderBufferLimit(),
[this] { encoder_callbacks_->onEncoderFilterAboveWriteBufferHighWatermark(); },
[this] {
// TODO(mattklein123): Do we want an actual high/low watermark within
// this filter? Probably? Can we share the code/logic with the primary
// high/low watermark logic?
encoder_callbacks_->onEncoderFilterBelowWriteBufferLowWatermark();
},
[this](Buffer::Instance& data, bool end_stream) {
encoder_callbacks_->injectEncodedDataToFilterChain(data, end_stream);
},
[this] { encoder_callbacks_->continueEncoding(); }, config_->timeSource(),
decoder_callbacks_->dispatcher()));
}

bool FaultFilter::faultOverflow() {
const uint64_t max_faults = config_->runtime().snapshot().getInteger(
RuntimeKeys::get().MaxActiveFaultsKey, fault_settings_->maxActiveFaults().has_value()
Expand Down Expand Up @@ -274,22 +316,22 @@ void FaultFilter::postDelayInjection() {
abortWithHTTPStatus();
} else {
// Continue request processing.
callbacks_->continueDecoding();
decoder_callbacks_->continueDecoding();
}
}

void FaultFilter::abortWithHTTPStatus() {
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::FaultInjected);
callbacks_->sendLocalReply(static_cast<Http::Code>(abortHttpStatus()), "fault filter abort",
nullptr, absl::nullopt);
decoder_callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::FaultInjected);
decoder_callbacks_->sendLocalReply(static_cast<Http::Code>(abortHttpStatus()),
"fault filter abort", nullptr, absl::nullopt);
recordAbortsInjectedStats();
}

bool FaultFilter::matchesTargetUpstreamCluster() {
bool matches = true;

if (!fault_settings_->upstreamCluster().empty()) {
Router::RouteConstSharedPtr route = callbacks_->route();
Router::RouteConstSharedPtr route = decoder_callbacks_->route();
matches = route && route->routeEntry() &&
(route->routeEntry()->clusterName() == fault_settings_->upstreamCluster());
}
Expand Down Expand Up @@ -318,8 +360,116 @@ void FaultFilter::resetTimerState() {
}
}

void FaultFilter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
callbacks_ = &callbacks;
Http::FilterDataStatus FaultFilter::encodeData(Buffer::Instance& data, bool end_stream) {
if (response_limiter_ != nullptr) {
response_limiter_->writeData(data, end_stream);
return Http::FilterDataStatus::StopIterationNoBuffer;
}

return Http::FilterDataStatus::Continue;
}

Http::FilterTrailersStatus FaultFilter::encodeTrailers(Http::HeaderMap&) {
if (response_limiter_ != nullptr) {
return response_limiter_->onTrailers();
}

return Http::FilterTrailersStatus::Continue;
}

StreamRateLimiter::StreamRateLimiter(uint64_t max_kbps, uint64_t max_buffered_data,
std::function<void()> pause_data_cb,
std::function<void()> resume_data_cb,
std::function<void(Buffer::Instance&, bool)> write_data_cb,
std::function<void()> continue_cb, TimeSource& time_source,
Event::Dispatcher& dispatcher)
: // bytes_per_time_slice is KiB converted to bytes divided by the number of ticks per second.
bytes_per_time_slice_((max_kbps * 1024) / SecondDivisor),
max_buffered_data_(max_buffered_data), pause_data_cb_(pause_data_cb),
resume_data_cb_(resume_data_cb), write_data_cb_(write_data_cb), continue_cb_(continue_cb),
// The token bucket is configured with a max token count of the number of ticks per second,
// and refills at the same rate, so that we have a per second limit which refills gradually in
// ~63ms intervals.
token_bucket_(SecondDivisor, time_source, SecondDivisor),
token_timer_(dispatcher.createTimer([this] { onTokenTimer(); })) {
ASSERT(bytes_per_time_slice_ > 0);
ASSERT(max_buffered_data_ > 0);
}

void StreamRateLimiter::onTokenTimer() {
ASSERT(waiting_for_token_);
waiting_for_token_ = false;

ENVOY_LOG(trace, "limiter: timer wakeup: buffered={}", buffer_.length());
Buffer::OwnedImpl data_to_write;

// Compute the number of tokens needed (rounded up), try to obtain that many tickets, and then
// figure out how many bytes to write given the number of tokens we actually got.
const uint64_t tokens_needed =
(buffer_.length() + bytes_per_time_slice_ - 1) / bytes_per_time_slice_;
const uint64_t tokens_obtained = token_bucket_.consume(tokens_needed, true);
const uint64_t bytes_to_write =
std::min(tokens_obtained * bytes_per_time_slice_, buffer_.length());
ENVOY_LOG(trace, "limiter: tokens_needed={} tokens_obtained={} to_write={}", tokens_needed,
tokens_obtained, bytes_to_write);

// Move the data to write into the output buffer with as little copying as possible.
data_to_write.move(buffer_, bytes_to_write);

// If the buffer still contains data in it, we couldn't get enough tokens, so schedule the next
// token available time.
if (buffer_.length() > 0) {
const std::chrono::milliseconds ms = token_bucket_.nextTokenAvailable();
if (ms.count() > 0) {
ENVOY_LOG(trace, "limiter: scheduling wakeup for {}ms", ms.count());
token_timer_->enableTimer(ms);
waiting_for_token_ = true;
}
}

// If we need to resume receiving data, so that now.
if (buffer_overflow_ && buffer_.length() < max_buffered_data_) {
buffer_overflow_ = false;
resume_data_cb_();
}

// Write the data out, indicating end stream if we saw end stream, there is no further data to
// send, and there are no trailers.w
write_data_cb_(data_to_write, saw_end_stream_ && buffer_.length() == 0 && !saw_trailers_);

// If there is no more data to send and we saw trailers, we need to continue iteration to release
// the trailers to further filters.
if (buffer_.length() == 0 && saw_trailers_) {
continue_cb_();
}
}

void StreamRateLimiter::writeData(Buffer::Instance& incoming_buffer, bool end_stream) {
ENVOY_LOG(trace, "limiter: incoming data length={} buffered={}", incoming_buffer.length(),
buffer_.length());
buffer_.move(incoming_buffer);
if (!buffer_overflow_ && buffer_.length() > max_buffered_data_) {
buffer_overflow_ = true;
pause_data_cb_();
}

saw_end_stream_ = end_stream;
if (!waiting_for_token_) {
// TODO(mattklein123): In an optimal world we would be able to continue iteration with the data
// we want in the buffer, but have a way to clear end_stream in case we can't send it all.
// The filter API does not currently support that and it will not be a trivial change to add.
// Instead we cheat here by scheduling the token timer to run immediately after the stack is
// unwound, at which point we can directly called encode/decodeData.
token_timer_->enableTimer(std::chrono::milliseconds(0));
waiting_for_token_ = true;
}
}

Http::FilterTrailersStatus StreamRateLimiter::onTrailers() {
saw_end_stream_ = true;
saw_trailers_ = true;
return buffer_.length() > 0 ? Http::FilterTrailersStatus::StopIteration
: Http::FilterTrailersStatus::Continue;
}

} // namespace Fault
Expand Down
Loading

0 comments on commit 50062ed

Please sign in to comment.