Skip to content

Commit

Permalink
extract attributes changes from envoyproxy#29069
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Bohanon <jacob.bohanon@solo.io>
  • Loading branch information
jbohanon committed Nov 8, 2023
1 parent 1e8d60a commit 87cf9ba
Show file tree
Hide file tree
Showing 14 changed files with 270 additions and 27 deletions.
4 changes: 0 additions & 4 deletions api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,13 @@ message ExternalProcessor {
// for a reply.
bool async_mode = 4;

// [#not-implemented-hide:]
// Envoy provides a number of :ref:`attributes <arch_overview_attributes>`
// for expressive policies. Each attribute name provided in this field will be
// matched against that list and populated in the request_headers message.
// See the :ref:`attribute documentation <arch_overview_request_attributes>`
// for the list of supported attributes and their types.
repeated string request_attributes = 5;

// [#not-implemented-hide:]
// Envoy provides a number of :ref:`attributes <arch_overview_attributes>`
// for expressive policies. Each attribute name provided in this field will be
// matched against that list and populated in the response_headers message.
Expand Down Expand Up @@ -257,12 +255,10 @@ message ExtProcOverrides {
// Set a different asynchronous processing option than the default.
bool async_mode = 2;

// [#not-implemented-hide:]
// Set different optional attributes than the default setting of the
// ``request_attributes`` field.
repeated string request_attributes = 3;

// [#not-implemented-hide:]
// Set different optional properties than the default setting of the
// ``response_attributes`` field.
repeated string response_attributes = 4;
Expand Down
7 changes: 7 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,12 @@ new_features:
Ratelimit supports optional additional prefix to use when emitting statistics with :ref:`stat_prefix
<envoy_v3_api_field_extensions.filters.http.ratelimit.v3.RateLimit.stat_prefix>`
configuration flag.
- area: ext_proc
change: |
implemented
:ref:`request_attributes <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.request_attributes>`
and
:ref:`response_attributes <envoy_v3_api_field_extensions.filters.http.ext_proc.v3.ExternalProcessor.response_attributes>`
config APIs to enable sending and receiving attributes from/to the external processing server.
deprecated:
25 changes: 24 additions & 1 deletion source/extensions/filters/http/ext_proc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ envoy_cc_library(
"ext_proc.h",
"processor_state.h",
],
copts = select({
"//bazel:windows_x86_64": [],
"//conditions:default": [
"-DUSE_CEL_PARSER",
],
}),
deps = [
":client_interface",
":mutation_utils_lib",
Expand All @@ -29,24 +35,41 @@ envoy_cc_library(
"//source/common/buffer:buffer_lib",
"//source/common/protobuf",
"//source/common/runtime:runtime_features_lib",
"//source/extensions/filters/common/expr:evaluator_lib",
"//source/extensions/filters/common/mutation_rules:mutation_rules_lib",
"//source/extensions/filters/http/common:pass_through_filter_lib",
"@com_google_absl//absl/status",
"@com_google_absl//absl/strings:str_format",
"@com_google_cel_cpp//eval/public:builtin_func_registrar",
"@com_google_cel_cpp//eval/public:cel_expr_builder_factory",
"@envoy_api//envoy/config/common/mutation_rules/v3:pkg_cc_proto",
"@envoy_api//envoy/config/core/v3:pkg_cc_proto",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
"@envoy_api//envoy/service/ext_proc/v3:pkg_cc_proto",
],
] + select(
{
"//bazel:windows_x86_64": [],
"//conditions:default": [
"@com_google_cel_cpp//parser",
],
},
),
)

envoy_cc_extension(
name = "config",
srcs = ["config.cc"],
hdrs = ["config.h"],
copts = select({
"//bazel:windows_x86_64": [],
"//conditions:default": [
"-DUSE_CEL_PARSER",
],
}),
deps = [
":client_lib",
":ext_proc",
"//source/extensions/filters/common/expr:evaluator_lib",
"//source/extensions/filters/http/common:factory_base_lib",
"@envoy_api//envoy/extensions/filters/http/ext_proc/v3:pkg_cc_proto",
],
Expand Down
13 changes: 7 additions & 6 deletions source/extensions/filters/http/ext_proc/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ Http::FilterFactoryCb ExternalProcessingFilterConfig::createFilterFactoryFromPro
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs);
const uint32_t max_message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(message_timeout_ms),
max_message_timeout_ms, context.scope(), stats_prefix);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
context.scope(), stats_prefix, Envoy::Extensions::Filters::Common::Expr::getBuilder(context));

return [filter_config, grpc_service = proto_config.grpc_service(),
&context](Http::FilterChainFactoryCallbacks& callbacks) {
Expand All @@ -44,9 +44,10 @@ ExternalProcessingFilterConfig::createFilterFactoryFromProtoWithServerContextTyp
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, message_timeout, DefaultMessageTimeoutMs);
const uint32_t max_message_timeout_ms =
PROTOBUF_GET_MS_OR_DEFAULT(proto_config, max_message_timeout, DefaultMaxMessageTimeoutMs);
const auto filter_config =
std::make_shared<FilterConfig>(proto_config, std::chrono::milliseconds(message_timeout_ms),
max_message_timeout_ms, server_context.scope(), stats_prefix);
const auto filter_config = std::make_shared<FilterConfig>(
proto_config, std::chrono::milliseconds(message_timeout_ms), max_message_timeout_ms,
server_context.scope(), stats_prefix,
Envoy::Extensions::Filters::Common::Expr::getBuilder(server_context));

return [filter_config, grpc_service = proto_config.grpc_service(),
&server_context](Http::FilterChainFactoryCallbacks& callbacks) {
Expand Down
3 changes: 2 additions & 1 deletion source/extensions/filters/http/ext_proc/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.h"
#include "envoy/extensions/filters/http/ext_proc/v3/ext_proc.pb.validate.h"

#include "source/extensions/filters/common/expr/evaluator.h"
#include "source/extensions/filters/http/common/factory_base.h"

namespace Envoy {
Expand All @@ -29,7 +30,7 @@ class ExternalProcessingFilterConfig

Router::RouteSpecificFilterConfigConstSharedPtr createRouteSpecificFilterConfigTyped(
const envoy::extensions::filters::http::ext_proc::v3::ExtProcPerRoute& proto_config,
Server::Configuration::ServerFactoryContext& context,
Server::Configuration::ServerFactoryContext&,
ProtobufMessage::ValidationVisitor& validator) override;

Http::FilterFactoryCb createFilterFactoryFromProtoWithServerContextTyped(
Expand Down
85 changes: 82 additions & 3 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@

#include "absl/strings/str_format.h"

#if defined(USE_CEL_PARSER)
#include "parser/parser.h"
#endif

namespace Envoy {
namespace Extensions {
namespace HttpFilters {
Expand Down Expand Up @@ -113,6 +117,27 @@ ExtProcLoggingInfo::grpcCalls(envoy::config::core::v3::TrafficDirection traffic_
: encoding_processor_grpc_calls_;
}

absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
FilterConfig::initExpressions(const Protobuf::RepeatedPtrField<std::string>& matchers) const {
absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr> expressions;
#if defined(USE_CEL_PARSER)
for (const auto& matcher : matchers) {
auto parse_status = google::api::expr::parser::Parse(matcher);
if (!parse_status.ok()) {
throw EnvoyException("Unable to parse descriptor expression: " +
parse_status.status().ToString());
}
expressions.emplace(matcher, Extensions::Filters::Common::Expr::createExpression(
builder_->builder(), parse_status.value().expr()));
}
#else
ENVOY_LOG(warn, "CEL expression parsing is not available for use in this environment."
" Attempted to parse " +
std::to_string(matchers.size()) + " expressions");
#endif
return expressions;
}

FilterConfigPerRoute::FilterConfigPerRoute(const ExtProcPerRoute& config)
: disabled_(config.disabled()) {
if (config.has_overrides()) {
Expand Down Expand Up @@ -201,7 +226,8 @@ void Filter::onDestroy() {
}

FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream) {
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
absl::optional<ProtobufWkt::Struct> proto) {
switch (openStream()) {
case StreamOpenState::Error:
return FilterHeadersStatus::StopIteration;
Expand All @@ -219,6 +245,9 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(),
*headers_req->mutable_headers());
headers_req->set_end_of_stream(end_stream);
if (proto.has_value()) {
(*headers_req->mutable_attributes())[FilterName] = proto.value();
}
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
ProcessorState::CallbackState::HeadersCallback);
ENVOY_LOG(debug, "Sending headers message");
Expand All @@ -228,6 +257,48 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
return FilterHeadersStatus::StopIteration;
}

const absl::optional<ProtobufWkt::Struct> Filter::evaluateAttributes(
Filters::Common::Expr::ActivationPtr activation,
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
expr) {
absl::optional<ProtobufWkt::Struct> proto;
if (expr.size() > 0) {
proto.emplace(ProtobufWkt::Struct{});
for (const auto& hash_entry : expr) {
ProtobufWkt::Arena arena;
const auto result = hash_entry.second.get()->Evaluate(*activation, &arena);
if (!result.ok()) {
// TODO: Stats?
continue;
}

if (result.value().IsError()) {
ENVOY_LOG(trace, "error parsing cel expression {}", hash_entry.first);
continue;
}

ProtobufWkt::Value value;
switch (result.value().type()) {
case google::api::expr::runtime::CelValue::Type::kBool:
value.set_bool_value(result.value().BoolOrDie());
break;
case google::api::expr::runtime::CelValue::Type::kNullType:
value.set_null_value(ProtobufWkt::NullValue{});
break;
case google::api::expr::runtime::CelValue::Type::kDouble:
value.set_number_value(result.value().DoubleOrDie());
break;
default:
value.set_string_value(Filters::Common::Expr::print(result.value()));
}

(*(proto.value()).mutable_fields())[hash_entry.first] = value;
}
}

return proto;
}

FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
ENVOY_LOG(trace, "decodeHeaders: end_stream = {}", end_stream);
mergePerRouteConfig();
Expand All @@ -237,7 +308,11 @@ FilterHeadersStatus Filter::decodeHeaders(RequestHeaderMap& headers, bool end_st

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (decoding_state_.sendHeaders()) {
status = onHeaders(decoding_state_, headers, end_stream);
auto activation_ptr = Filters::Common::Expr::createActivation(decoding_state_.streamInfo(),
&headers, nullptr, nullptr);
auto proto = evaluateAttributes(std::move(activation_ptr), config_->requestExpr());

status = onHeaders(decoding_state_, headers, end_stream, proto);
ENVOY_LOG(trace, "onHeaders returning {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "decodeHeaders: Skipped header processing");
Expand Down Expand Up @@ -515,7 +590,11 @@ FilterHeadersStatus Filter::encodeHeaders(ResponseHeaderMap& headers, bool end_s

FilterHeadersStatus status = FilterHeadersStatus::Continue;
if (!processing_complete_ && encoding_state_.sendHeaders()) {
status = onHeaders(encoding_state_, headers, end_stream);
auto activation_ptr = Filters::Common::Expr::createActivation(encoding_state_.streamInfo(),
nullptr, &headers, nullptr);
auto proto = evaluateAttributes(std::move(activation_ptr), config_->responseExpr());

status = onHeaders(encoding_state_, headers, end_stream, proto);
ENVOY_LOG(trace, "onHeaders returns {}", static_cast<int>(status));
} else {
ENVOY_LOG(trace, "encodeHeaders: Skipped header processing");
Expand Down
38 changes: 34 additions & 4 deletions source/extensions/filters/http/ext_proc/ext_proc.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "source/common/common/logger.h"
#include "source/common/common/matchers.h"
#include "source/common/protobuf/protobuf.h"
#include "source/extensions/filters/common/expr/evaluator.h"
#include "source/extensions/filters/common/mutation_rules/mutation_rules.h"
#include "source/extensions/filters/http/common/pass_through_filter.h"
#include "source/extensions/filters/http/ext_proc/client.h"
Expand Down Expand Up @@ -121,12 +122,13 @@ class ExtProcLoggingInfo : public Envoy::StreamInfo::FilterState::Object {
Upstream::HostDescriptionConstSharedPtr upstream_host_;
};

class FilterConfig {
class FilterConfig : public Logger::Loggable<Logger::Id::ext_proc> {
public:
FilterConfig(const envoy::extensions::filters::http::ext_proc::v3::ExternalProcessor& config,
const std::chrono::milliseconds message_timeout,
const uint32_t max_message_timeout_ms, Stats::Scope& scope,
const std::string& stats_prefix)
const std::string& stats_prefix,
Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder)
: failure_mode_allow_(config.failure_mode_allow()),
disable_clear_route_cache_(config.disable_clear_route_cache()),
message_timeout_(message_timeout), max_message_timeout_ms_(max_message_timeout_ms),
Expand All @@ -136,7 +138,9 @@ class FilterConfig {
allow_mode_override_(config.allow_mode_override()),
disable_immediate_response_(config.disable_immediate_response()),
allowed_headers_(initHeaderMatchers(config.forward_rules().allowed_headers())),
disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())) {}
disallowed_headers_(initHeaderMatchers(config.forward_rules().disallowed_headers())),
builder_(builder), request_expr_(initExpressions(config.request_attributes())),
response_expr_(initExpressions(config.response_attributes())) {}

bool failureModeAllow() const { return failure_mode_allow_; }

Expand Down Expand Up @@ -166,6 +170,16 @@ class FilterConfig {

const Envoy::ProtobufWkt::Struct& filterMetadata() const { return filter_metadata_; }

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
requestExpr() const {
return request_expr_;
}

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
responseExpr() const {
return response_expr_;
}

private:
ExtProcFilterStats generateStats(const std::string& prefix,
const std::string& filter_stats_prefix, Stats::Scope& scope) {
Expand All @@ -183,6 +197,9 @@ class FilterConfig {
return header_matchers;
}

absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
initExpressions(const Protobuf::RepeatedPtrField<std::string>& matchers) const;

const bool failure_mode_allow_;
const bool disable_clear_route_cache_;
const std::chrono::milliseconds message_timeout_;
Expand All @@ -201,6 +218,13 @@ class FilterConfig {
const std::vector<Matchers::StringMatcherPtr> allowed_headers_;
// Empty disallowed_header_ means disallow nothing, i.e, allow all.
const std::vector<Matchers::StringMatcherPtr> disallowed_headers_;

Extensions::Filters::Common::Expr::BuilderInstanceSharedPtr builder_;

const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
request_expr_;
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>
response_expr_;
};

using FilterConfigSharedPtr = std::shared_ptr<FilterConfig>;
Expand Down Expand Up @@ -300,7 +324,13 @@ class Filter : public Logger::Loggable<Logger::Id::ext_proc>,
void sendImmediateResponse(const envoy::service::ext_proc::v3::ImmediateResponse& response);

Http::FilterHeadersStatus onHeaders(ProcessorState& state,
Http::RequestOrResponseHeaderMap& headers, bool end_stream);
Http::RequestOrResponseHeaderMap& headers, bool end_stream,
absl::optional<ProtobufWkt::Struct> proto);

const absl::optional<ProtobufWkt::Struct> evaluateAttributes(
Filters::Common::Expr::ActivationPtr activation,
const absl::flat_hash_map<std::string, Extensions::Filters::Common::Expr::ExpressionPtr>&
expr);
// Return a pair of whether to terminate returning the current result.
std::pair<bool, Http::FilterDataStatus> sendStreamChunk(ProcessorState& state);
Http::FilterDataStatus onData(ProcessorState& state, Buffer::Instance& data, bool end_stream);
Expand Down
6 changes: 6 additions & 0 deletions source/extensions/filters/http/ext_proc/processor_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ class ProcessorState : public Logger::Loggable<Logger::Id::ext_proc> {
virtual envoy::service::ext_proc::v3::HttpTrailers*
mutableTrailers(envoy::service::ext_proc::v3::ProcessingRequest& request) const PURE;

virtual StreamInfo::StreamInfo& streamInfo() PURE;

protected:
void setBodyMode(
envoy::extensions::filters::http::ext_proc::v3::ProcessingMode_BodySendMode body_mode);
Expand Down Expand Up @@ -283,6 +285,8 @@ class DecodingProcessorState : public ProcessorState {
void requestWatermark() override;
void clearWatermark() override;

StreamInfo::StreamInfo& streamInfo() override { return decoder_callbacks_->streamInfo(); }

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down Expand Up @@ -356,6 +360,8 @@ class EncodingProcessorState : public ProcessorState {
void requestWatermark() override;
void clearWatermark() override;

StreamInfo::StreamInfo& streamInfo() override { return encoder_callbacks_->streamInfo(); }

private:
void setProcessingModeInternal(
const envoy::extensions::filters::http::ext_proc::v3::ProcessingMode& mode);
Expand Down
Loading

0 comments on commit 87cf9ba

Please sign in to comment.