Skip to content

Commit

Permalink
ratelimit: Add rate limit upstream headers (envoyproxy#8565)
Browse files Browse the repository at this point in the history
*Ability to add custom upstream headers from ratelimit service/filter.

*For LimitStatus::OK, custom upstream headers are added if RLS service sends upstream headers.

Risk Level: Low

Testing:
Unit and integration tests added.
Verified with modified github.com/lyft/ratelimit service.
Passes "bazel test //test/..." in Linux
Docs Changes: protobuf documentation updated

Release Notes: ratelimit: support for adding custom headers to upstream server
from ratelimit service

** Issues: envoyproxy#6141

Signed-off-by: Nandu Vinodan <nandu.vinodan@freshworks.com>
  • Loading branch information
nandu-vinodan committed Oct 17, 2019
1 parent dff005c commit 0072503
Show file tree
Hide file tree
Showing 16 changed files with 154 additions and 74 deletions.
5 changes: 4 additions & 1 deletion api/envoy/service/ratelimit/v2/rls.proto
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ message RateLimitResponse {
// descriptors failed and/or what the currently configured limits are for all of them.
repeated DescriptorStatus statuses = 2;

// A list of headers to add to the response
// [#next-major-version: rename to response_headers_to_add]
repeated api.v2.core.HeaderValue headers = 3;

// A list of headers to add to the request when forwarded
repeated api.v2.core.HeaderValue request_headers_to_add = 4;
}
5 changes: 4 additions & 1 deletion api/envoy/service/ratelimit/v3alpha/rls.proto
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ message RateLimitResponse {
// descriptors failed and/or what the currently configured limits are for all of them.
repeated DescriptorStatus statuses = 2;

// A list of headers to add to the response
// [#next-major-version: rename to response_headers_to_add]
repeated api.v3alpha.core.HeaderValue headers = 3;

// A list of headers to add to the request when forwarded
repeated api.v3alpha.core.HeaderValue request_headers_to_add = 4;
}
7 changes: 4 additions & 3 deletions source/extensions/filters/common/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ class RequestCallbacks {
virtual ~RequestCallbacks() = default;

/**
* Called when a limit request is complete. The resulting status and
* response headers are supplied.
* Called when a limit request is complete. The resulting status,
* response headers and request headers to be forwarded to the upstream are supplied.
*/
virtual void complete(LimitStatus status, Http::HeaderMapPtr&& headers) PURE;
virtual void complete(LimitStatus status, Http::HeaderMapPtr&& response_headers_to_add,
Http::HeaderMapPtr&& request_headers_to_add) PURE;
};

/**
Expand Down
18 changes: 13 additions & 5 deletions source/extensions/filters/common/ratelimit/ratelimit_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,28 @@ void GrpcClientImpl::onSuccess(
span.setTag(Constants::get().TraceStatus, Constants::get().TraceOk);
}

Http::HeaderMapPtr headers = std::make_unique<Http::HeaderMapImpl>();
if (response->headers_size()) {
Http::HeaderMapPtr response_headers_to_add = std::make_unique<Http::HeaderMapImpl>();
if (!response->headers().empty()) {
for (const auto& h : response->headers()) {
headers->addCopy(Http::LowerCaseString(h.key()), h.value());
response_headers_to_add->addCopy(Http::LowerCaseString(h.key()), h.value());
}
}
callbacks_->complete(status, std::move(headers));

Http::HeaderMapPtr request_headers_to_add = std::make_unique<Http::HeaderMapImpl>();
if (!response->request_headers_to_add().empty()) {
for (const auto& h : response->request_headers_to_add()) {
request_headers_to_add->addCopy(Http::LowerCaseString(h.key()), h.value());
}
}
callbacks_->complete(status, std::move(response_headers_to_add),
std::move(request_headers_to_add));
callbacks_ = nullptr;
}

void GrpcClientImpl::onFailure(Grpc::Status::GrpcStatus status, const std::string&,
Tracing::Span&) {
ASSERT(status != Grpc::Status::GrpcStatus::Ok);
callbacks_->complete(LimitStatus::Error, nullptr);
callbacks_->complete(LimitStatus::Error, nullptr, nullptr);
callbacks_ = nullptr;
}

Expand Down
31 changes: 22 additions & 9 deletions source/extensions/filters/http/ratelimit/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool)
return Http::FilterHeadersStatus::Continue;
}

request_headers_ = &headers;
initiateCall(headers);
return (state_ == State::Calling || state_ == State::Responded)
? Http::FilterHeadersStatus::StopIteration
Expand Down Expand Up @@ -100,7 +101,7 @@ Http::FilterHeadersStatus Filter::encode100ContinueHeaders(Http::HeaderMap&) {
}

Http::FilterHeadersStatus Filter::encodeHeaders(Http::HeaderMap& headers, bool) {
addHeaders(headers);
populateResponseHeaders(headers);
return Http::FilterHeadersStatus::Continue;
}

Expand All @@ -126,9 +127,11 @@ void Filter::onDestroy() {
}

void Filter::complete(Filters::Common::RateLimit::LimitStatus status,
Http::HeaderMapPtr&& headers) {
Http::HeaderMapPtr&& response_headers_to_add,
Http::HeaderMapPtr&& request_headers_to_add) {
state_ = State::Complete;
headers_to_add_ = std::move(headers);
response_headers_to_add_ = std::move(response_headers_to_add);
Http::HeaderMapPtr req_headers_to_add = std::move(request_headers_to_add);
Stats::StatName empty_stat_name;
Filters::Common::RateLimit::StatNames& stat_names = config_->statNames();

Expand All @@ -152,7 +155,7 @@ void Filter::complete(Filters::Common::RateLimit::LimitStatus status,
empty_stat_name,
false};
httpContext().codeStats().chargeResponseStat(info);
headers_to_add_->insertEnvoyRateLimited().value(
response_headers_to_add_->insertEnvoyRateLimited().value(
Http::Headers::get().EnvoyRateLimitedValues.True);
break;
}
Expand All @@ -161,13 +164,15 @@ void Filter::complete(Filters::Common::RateLimit::LimitStatus status,
config_->runtime().snapshot().featureEnabled("ratelimit.http_filter_enforcing", 100)) {
state_ = State::Responded;
callbacks_->sendLocalReply(
Http::Code::TooManyRequests, "", [this](Http::HeaderMap& headers) { addHeaders(headers); },
Http::Code::TooManyRequests, "",
[this](Http::HeaderMap& headers) { populateResponseHeaders(headers); },
config_->rateLimitedGrpcStatus(), RcDetails::get().RateLimited);
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::RateLimited);
} else if (status == Filters::Common::RateLimit::LimitStatus::Error) {
if (config_->failureModeAllow()) {
cluster_->statsScope().counterFromStatName(stat_names.failure_mode_allowed_).inc();
if (!initiating_call_) {
appendRequestHeaders(req_headers_to_add);
callbacks_->continueDecoding();
}
} else {
Expand All @@ -177,6 +182,7 @@ void Filter::complete(Filters::Common::RateLimit::LimitStatus status,
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::RateLimitServiceError);
}
} else if (!initiating_call_) {
appendRequestHeaders(req_headers_to_add);
callbacks_->continueDecoding();
}
}
Expand All @@ -198,10 +204,17 @@ void Filter::populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_li
}
}

void Filter::addHeaders(Http::HeaderMap& headers) {
if (headers_to_add_) {
Http::HeaderUtility::addHeaders(headers, *headers_to_add_);
headers_to_add_ = nullptr;
void Filter::populateResponseHeaders(Http::HeaderMap& response_headers) {
if (response_headers_to_add_) {
Http::HeaderUtility::addHeaders(response_headers, *response_headers_to_add_);
response_headers_to_add_ = nullptr;
}
}

void Filter::appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add) {
if (request_headers_to_add && request_headers_) {
Http::HeaderUtility::addHeaders(*request_headers_, *request_headers_to_add);
request_headers_to_add = nullptr;
}
}

Expand Down
10 changes: 7 additions & 3 deletions source/extensions/filters/http/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,18 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req

// RateLimit::RequestCallbacks
void complete(Filters::Common::RateLimit::LimitStatus status,
Http::HeaderMapPtr&& headers) override;
Http::HeaderMapPtr&& response_headers_to_add,
Http::HeaderMapPtr&& request_headers_to_add) override;

private:
void initiateCall(const Http::HeaderMap& headers);
void populateRateLimitDescriptors(const Router::RateLimitPolicy& rate_limit_policy,
std::vector<Envoy::RateLimit::Descriptor>& descriptors,
const Router::RouteEntry* route_entry,
const Http::HeaderMap& headers) const;
void addHeaders(Http::HeaderMap& headers);
void populateResponseHeaders(Http::HeaderMap& response_headers);
void appendRequestHeaders(Http::HeaderMapPtr& request_headers_to_add);

Http::Context& httpContext() { return config_->httpContext(); }

enum class State { NotStarted, Calling, Complete, Responded };
Expand All @@ -134,7 +137,8 @@ class Filter : public Http::StreamFilter, public Filters::Common::RateLimit::Req
State state_{State::NotStarted};
Upstream::ClusterInfoConstSharedPtr cluster_;
bool initiating_call_{};
Http::HeaderMapPtr headers_to_add_;
Http::HeaderMapPtr response_headers_to_add_;
Http::HeaderMap* request_headers_;
};

} // namespace RateLimitFilter
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/network/ratelimit/ratelimit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ void Filter::onEvent(Network::ConnectionEvent event) {
}
}

void Filter::complete(Filters::Common::RateLimit::LimitStatus status, Http::HeaderMapPtr&&) {
void Filter::complete(Filters::Common::RateLimit::LimitStatus status, Http::HeaderMapPtr&&,
Http::HeaderMapPtr&&) {
status_ = Status::Complete;
config_->stats().active_.dec();

Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/network/ratelimit/ratelimit.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ class Filter : public Network::ReadFilter,

// RateLimit::RequestCallbacks
void complete(Filters::Common::RateLimit::LimitStatus status,
Http::HeaderMapPtr&& headers) override;
Http::HeaderMapPtr&& response_headers_to_add,
Http::HeaderMapPtr&& request_headers_to_add) override;

private:
enum class Status { NotStarted, Calling, Complete };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ void Filter::onDestroy() {
}

void Filter::complete(Filters::Common::RateLimit::LimitStatus status,
Http::HeaderMapPtr&& headers) {
Http::HeaderMapPtr&& response_headers_to_add,
Http::HeaderMapPtr&& request_headers_to_add) {
// TODO(zuercher): Store headers to append to a response. Adding them to a local reply (over
// limit or error) is a matter of modifying the callbacks to allow it. Adding them to an upstream
// response requires either response (aka encoder) filters or some other mechanism.
UNREFERENCED_PARAMETER(headers);
UNREFERENCED_PARAMETER(response_headers_to_add);
UNREFERENCED_PARAMETER(request_headers_to_add);

state_ = State::Complete;
Filters::Common::RateLimit::StatNames& stat_names = config_->statNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ class Filter : public ThriftProxy::ThriftFilters::DecoderFilter,

// RateLimit::RequestCallbacks
void complete(Filters::Common::RateLimit::LimitStatus status,
Http::HeaderMapPtr&& headers) override;
Http::HeaderMapPtr&& response_headers_to_add,
Http::HeaderMapPtr&& request_headers_to_add) override;

private:
void initiateCall(const ThriftProxy::MessageMetadata& metadata);
Expand Down
3 changes: 2 additions & 1 deletion test/common/network/filter_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,8 @@ stat_prefix: name
EXPECT_CALL(factory_context.cluster_manager_, tcpConnPoolForCluster("fake_cluster", _, _))
.WillOnce(Return(&conn_pool));

request_callbacks->complete(Extensions::Filters::Common::RateLimit::LimitStatus::OK, nullptr);
request_callbacks->complete(Extensions::Filters::Common::RateLimit::LimitStatus::OK, nullptr,
nullptr);

conn_pool.poolReady(upstream_connection);

Expand Down
14 changes: 8 additions & 6 deletions test/extensions/filters/common/ratelimit/ratelimit_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ namespace {

class MockRequestCallbacks : public RequestCallbacks {
public:
void complete(LimitStatus status, Http::HeaderMapPtr&& headers) override {
complete_(status, headers.get());
void complete(LimitStatus status, Http::HeaderMapPtr&& response_headers_to_add,
Http::HeaderMapPtr&& request_headers_to_add) override {
complete_(status, response_headers_to_add.get(), request_headers_to_add.get());
}

MOCK_METHOD2(complete_, void(LimitStatus status, const Http::HeaderMap* headers));
MOCK_METHOD3(complete_, void(LimitStatus status, const Http::HeaderMap* response_headers_to_add,
const Http::HeaderMap* request_headers_to_add));
};

class RateLimitGrpcClientTest : public testing::Test {
Expand Down Expand Up @@ -81,7 +83,7 @@ TEST_F(RateLimitGrpcClientTest, Basic) {
response = std::make_unique<envoy::service::ratelimit::v2::RateLimitResponse>();
response->set_overall_code(envoy::service::ratelimit::v2::RateLimitResponse_Code_OVER_LIMIT);
EXPECT_CALL(span_, setTag(Eq("ratelimit_status"), Eq("over_limit")));
EXPECT_CALL(request_callbacks_, complete_(LimitStatus::OverLimit, _));
EXPECT_CALL(request_callbacks_, complete_(LimitStatus::OverLimit, _, _));
client_.onSuccess(std::move(response), span_);
}

Expand All @@ -100,7 +102,7 @@ TEST_F(RateLimitGrpcClientTest, Basic) {
response = std::make_unique<envoy::service::ratelimit::v2::RateLimitResponse>();
response->set_overall_code(envoy::service::ratelimit::v2::RateLimitResponse_Code_OK);
EXPECT_CALL(span_, setTag(Eq("ratelimit_status"), Eq("ok")));
EXPECT_CALL(request_callbacks_, complete_(LimitStatus::OK, _));
EXPECT_CALL(request_callbacks_, complete_(LimitStatus::OK, _, _));
client_.onSuccess(std::move(response), span_);
}

Expand All @@ -117,7 +119,7 @@ TEST_F(RateLimitGrpcClientTest, Basic) {
Tracing::NullSpan::instance());

response = std::make_unique<envoy::service::ratelimit::v2::RateLimitResponse>();
EXPECT_CALL(request_callbacks_, complete_(LimitStatus::Error, _));
EXPECT_CALL(request_callbacks_, complete_(LimitStatus::Error, _, _));
client_.onFailure(Grpc::Status::Unknown, "", span_);
}
}
Expand Down
Loading

0 comments on commit 0072503

Please sign in to comment.