Skip to content

Commit

Permalink
admission control: Implement thread-local controller (#11628)
Browse files Browse the repository at this point in the history
Signed-off-by: Tony Allen <tony@allen.gg>
  • Loading branch information
tonya11en authored Jun 24, 2020
1 parent f87e68f commit 6fecf0b
Show file tree
Hide file tree
Showing 15 changed files with 594 additions and 41 deletions.
4 changes: 1 addition & 3 deletions source/extensions/extensions_build_config.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ EXTENSIONS = {
#

"envoy.filters.http.adaptive_concurrency": "//source/extensions/filters/http/adaptive_concurrency:config",
# NOTE: The admission control filter does not have a proper filter
# implemented right now. We are just referencing the filter lib here.
"envoy.filters.http.admission_control": "//source/extensions/filters/http/admission_control:admission_control_filter_lib",
"envoy.filters.http.admission_control": "//source/extensions/filters/http/admission_control:config",
"envoy.filters.http.aws_lambda": "//source/extensions/filters/http/aws_lambda:config",
"envoy.filters.http.aws_request_signing": "//source/extensions/filters/http/aws_request_signing:config",
"envoy.filters.http.buffer": "//source/extensions/filters/http/buffer:config",
Expand Down
18 changes: 18 additions & 0 deletions source/extensions/filters/http/admission_control/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ envoy_cc_extension(
name = "admission_control_filter_lib",
srcs = [
"admission_control.cc",
"thread_local_controller.cc",
],
hdrs = [
"admission_control.h",
Expand All @@ -33,3 +34,20 @@ envoy_cc_extension(
"@envoy_api//envoy/extensions/filters/http/admission_control/v3alpha:pkg_cc_proto",
],
)

envoy_cc_extension(
name = "config",
srcs = ["config.cc"],
hdrs = ["config.h"],
security_posture = "unknown",
status = "alpha",
deps = [
"//include/envoy/registry",
"//source/common/common:enum_to_int",
"//source/extensions/filters/http:well_known_names",
"//source/extensions/filters/http/admission_control:admission_control_filter_lib",
"//source/extensions/filters/http/admission_control/evaluators:response_evaluator_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//envoy/extensions/filters/http/admission_control/v3alpha:pkg_cc_proto",
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ using GrpcStatus = Grpc::Status::GrpcStatus;
static constexpr double defaultAggression = 2.0;

AdmissionControlFilterConfig::AdmissionControlFilterConfig(
const AdmissionControlProto& proto_config, Runtime::Loader& runtime, TimeSource&,
const AdmissionControlProto& proto_config, Runtime::Loader& runtime,
Runtime::RandomGenerator& random, Stats::Scope& scope, ThreadLocal::SlotPtr&& tls,
std::shared_ptr<ResponseEvaluator> response_evaluator)
: random_(random), scope_(scope), tls_(std::move(tls)),
Expand Down Expand Up @@ -122,8 +122,9 @@ AdmissionControlFilter::encodeTrailers(Http::ResponseTrailerMap& trailers) {
}

bool AdmissionControlFilter::shouldRejectRequest() const {
const double total = config_->getController().requestTotalCount();
const double success = config_->getController().requestSuccessCount();
const auto request_counts = config_->getController().requestCounts();
const double total = request_counts.requests;
const double success = request_counts.successes;
const double probability = (total - config_->aggression() * success) / (total + 1);

// Choosing an accuracy of 4 significant figures for the probability.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ using AdmissionControlProto =
class AdmissionControlFilterConfig {
public:
AdmissionControlFilterConfig(const AdmissionControlProto& proto_config, Runtime::Loader& runtime,
TimeSource&, Runtime::RandomGenerator& random, Stats::Scope& scope,
Runtime::RandomGenerator& random, Stats::Scope& scope,
ThreadLocal::SlotPtr&& tls,
std::shared_ptr<ResponseEvaluator> response_evaluator);
virtual ~AdmissionControlFilterConfig() = default;

virtual ThreadLocalController& getController() const { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
virtual ThreadLocalController& getController() const {
return tls_->getTyped<ThreadLocalControllerImpl>();
}

Runtime::RandomGenerator& random() const { return random_; }
bool filterEnabled() const { return admission_control_feature_.enabled(); }
Expand Down
64 changes: 64 additions & 0 deletions source/extensions/filters/http/admission_control/config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#include "extensions/filters/http/admission_control/config.h"

#include "envoy/extensions/filters/http/admission_control/v3alpha/admission_control.pb.h"
#include "envoy/extensions/filters/http/admission_control/v3alpha/admission_control.pb.validate.h"
#include "envoy/registry/registry.h"

#include "common/common/enum_to_int.h"

#include "extensions/filters/http/admission_control/admission_control.h"
#include "extensions/filters/http/admission_control/evaluators/response_evaluator.h"
#include "extensions/filters/http/admission_control/evaluators/success_criteria_evaluator.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace AdmissionControl {

static constexpr std::chrono::seconds defaultSamplingWindow{120};

Http::FilterFactoryCb AdmissionControlFilterFactory::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::admission_control::v3alpha::AdmissionControl& config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) {

const std::string prefix = stats_prefix + "admission_control.";

// Create the thread-local controller.
auto tls = context.threadLocal().allocateSlot();
auto sampling_window = std::chrono::seconds(
PROTOBUF_GET_MS_OR_DEFAULT(config, sampling_window, 1000 * defaultSamplingWindow.count()) /
1000);
tls->set(
[sampling_window, &context](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr {
return std::make_shared<ThreadLocalControllerImpl>(context.timeSource(), sampling_window);
});

std::unique_ptr<ResponseEvaluator> response_evaluator;
switch (config.evaluation_criteria_case()) {
case AdmissionControlProto::EvaluationCriteriaCase::kSuccessCriteria:
response_evaluator = std::make_unique<SuccessCriteriaEvaluator>(config.success_criteria());
break;
case AdmissionControlProto::EvaluationCriteriaCase::EVALUATION_CRITERIA_NOT_SET:
NOT_REACHED_GCOVR_EXCL_LINE;
}

AdmissionControlFilterConfigSharedPtr filter_config =
std::make_shared<AdmissionControlFilterConfig>(config, context.runtime(), context.random(),
context.scope(), std::move(tls),
std::move(response_evaluator));

return [filter_config, prefix](Http::FilterChainFactoryCallbacks& callbacks) -> void {
callbacks.addStreamFilter(std::make_shared<AdmissionControlFilter>(filter_config, prefix));
};
}

/**
* Static registration for the admission_control filter. @see RegisterFactory.
*/
REGISTER_FACTORY(AdmissionControlFilterFactory,
Server::Configuration::NamedHttpFilterConfigFactory);

} // namespace AdmissionControl
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
32 changes: 32 additions & 0 deletions source/extensions/filters/http/admission_control/config.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include "envoy/extensions/filters/http/admission_control/v3alpha/admission_control.pb.h"
#include "envoy/extensions/filters/http/admission_control/v3alpha/admission_control.pb.validate.h"

#include "extensions/filters/http/common/factory_base.h"
#include "extensions/filters/http/well_known_names.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace AdmissionControl {

/**
* Config registration for the adaptive concurrency limit filter. @see NamedHttpFilterConfigFactory.
*/
class AdmissionControlFilterFactory
: public Common::FactoryBase<
envoy::extensions::filters::http::admission_control::v3alpha::AdmissionControl> {
public:
AdmissionControlFilterFactory() : FactoryBase(HttpFilterNames::get().AdmissionControl) {}

Http::FilterFactoryCb createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::admission_control::v3alpha::AdmissionControl&
proto_config,
const std::string& stats_prefix, Server::Configuration::FactoryContext& context) override;
};

} // namespace AdmissionControl
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#include "extensions/filters/http/admission_control/thread_local_controller.h"

#include <cstdint>

#include "envoy/common/pure.h"
#include "envoy/common/time.h"
#include "envoy/http/codes.h"

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
namespace AdmissionControl {

static constexpr std::chrono::seconds defaultHistoryGranularity{1};

ThreadLocalControllerImpl::ThreadLocalControllerImpl(TimeSource& time_source,
std::chrono::seconds sampling_window)
: time_source_(time_source), sampling_window_(sampling_window) {}

void ThreadLocalControllerImpl::maybeUpdateHistoricalData() {
// Purge stale samples.
while (!historical_data_.empty() && ageOfOldestSample() >= sampling_window_) {
removeOldestSample();
}

// It's possible we purged stale samples from the history and are left with nothing, so it's
// necessary to add an empty entry. We will also need to roll over into a new entry in the
// historical data if we've exceeded the time specified by the granularity.
if (historical_data_.empty() || ageOfNewestSample() >= defaultHistoryGranularity) {
historical_data_.emplace_back(time_source_.monotonicTime(), RequestData());
}
}

void ThreadLocalControllerImpl::recordRequest(bool success) {
maybeUpdateHistoricalData();

// The back of the deque will be the most recent samples.
++historical_data_.back().second.requests;
++global_data_.requests;
if (success) {
++historical_data_.back().second.successes;
++global_data_.successes;
}
}

} // namespace AdmissionControl
} // namespace HttpFilters
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,96 @@ namespace AdmissionControl {
*/
class ThreadLocalController {
public:
struct RequestData {
RequestData(uint32_t request_count, uint32_t success_count)
: requests(request_count), successes(success_count) {}
RequestData() = default;

inline bool operator==(const RequestData& rhs) const {
return (requests == rhs.requests) && (successes == rhs.successes);
}

uint32_t requests{0};
uint32_t successes{0};
};

virtual ~ThreadLocalController() = default;

// Record success/failure of a request and update the internal state of the controller to reflect
// this.
virtual void recordSuccess() PURE;
virtual void recordFailure() PURE;

// Returns the current number of recorded requests.
virtual uint32_t requestTotalCount() PURE;
// Returns the current number of requests and how many of them are successful.
virtual RequestData requestCounts() PURE;
};

/**
* Thread-local object to track request counts and successes over a rolling time window. Request
* data for the time window is kept recent via a circular buffer that phases out old request/success
* counts when recording new samples.
*
* This controller is thread-local so that we do not need to take any locks on the sample histories
* to update them, at the cost of decreasing the number of samples.
*
* The look-back window for request samples is accurate up to a hard-coded 1-second granularity.
* TODO (tonya11en): Allow the granularity to be configurable.
*/
class ThreadLocalControllerImpl : public ThreadLocalController,
public ThreadLocal::ThreadLocalObject {
public:
ThreadLocalControllerImpl(TimeSource& time_source, std::chrono::seconds sampling_window);
~ThreadLocalControllerImpl() override = default;
void recordSuccess() override { recordRequest(true); }
void recordFailure() override { recordRequest(false); }

RequestData requestCounts() override {
maybeUpdateHistoricalData();
return global_data_;
}

private:
void recordRequest(bool success);

// Potentially remove any stale samples and record sample aggregates to the historical data.
void maybeUpdateHistoricalData();

// Returns the age of the oldest sample in the historical data.
std::chrono::microseconds ageOfOldestSample() const {
ASSERT(!historical_data_.empty());
using namespace std::chrono;
return duration_cast<microseconds>(time_source_.monotonicTime() -
historical_data_.front().first);
}

// Returns the age of the newest sample in the historical data.
std::chrono::microseconds ageOfNewestSample() const {
ASSERT(!historical_data_.empty());
using namespace std::chrono;
return duration_cast<microseconds>(time_source_.monotonicTime() -
historical_data_.back().first);
}

// Removes the oldest sample in the historical data and reconciles the global data.
void removeOldestSample() {
ASSERT(!historical_data_.empty());
global_data_.successes -= historical_data_.front().second.successes;
global_data_.requests -= historical_data_.front().second.requests;
historical_data_.pop_front();
}

TimeSource& time_source_;

// Stores samples from oldest (front) to newest (back). Since there is no need to read/modify
// entries that are not the oldest or newest (front/back), we can get away with using a deque
// which allocates memory in chunks and keeps most elements contiguous and cache-friendly.
std::deque<std::pair<MonotonicTime, RequestData>> historical_data_;

// Request data aggregated for the whole look-back window.
RequestData global_data_;

// Returns the current number of recorded request successes.
virtual uint32_t requestSuccessCount() PURE;
// The rolling time window size.
const std::chrono::seconds sampling_window_;
};

} // namespace AdmissionControl
Expand Down
24 changes: 24 additions & 0 deletions test/extensions/filters/http/admission_control/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,27 @@ envoy_extension_cc_test(
"@envoy_api//envoy/extensions/filters/http/admission_control/v3alpha:pkg_cc_proto",
],
)

envoy_extension_cc_test(
name = "admission_control_integration_test",
srcs = ["admission_control_integration_test.cc"],
extension_name = "envoy.filters.http.admission_control",
deps = [
"//source/extensions/filters/http/admission_control:config",
"//test/integration:http_integration_lib",
"//test/test_common:utility_lib",
],
)

envoy_extension_cc_test(
name = "admission_controller_test",
srcs = ["controller_test.cc"],
extension_name = "envoy.filters.http.admission_control",
deps = [
"//source/common/http:headers_lib",
"//source/extensions/filters/http/admission_control:admission_control_filter_lib",
"//test/test_common:simulated_time_system_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/extensions/filters/http/admission_control/v3alpha:pkg_cc_proto",
],
)
Loading

0 comments on commit 6fecf0b

Please sign in to comment.