Skip to content

Commit

Permalink
extract metadata changes from envoyproxy#29069
Browse files Browse the repository at this point in the history
Signed-off-by: Jacob Bohanon <jacob.bohanon@solo.io>
  • Loading branch information
jbohanon authored and nfuden committed Nov 30, 2023
1 parent 18fac45 commit 369bd16
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 28 deletions.
39 changes: 36 additions & 3 deletions api/envoy/extensions/filters/http/ext_proc/v3/ext_proc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// All options and processing modes are implemented except for the following:
//
// * Request and response attributes are not sent and not processed.
// * Dynamic metadata in responses from the external processor is ignored.
// * "async mode" is not implemented.

// The filter communicates with an external gRPC service called an "external processor"
Expand Down Expand Up @@ -99,7 +98,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// <arch_overview_advanced_filter_state_sharing>` object in a namespace matching the filter
// name.
//
// [#next-free-field: 15]
// [#next-free-field: 17]
message ExternalProcessor {
// Configuration for the gRPC service that the filter will communicate with.
// The filter supports both the "Envoy" and "Google" gRPC clients.
Expand Down Expand Up @@ -198,6 +197,37 @@ message ExternalProcessor {
// :ref:`mode_override <envoy_v3_api_field_service.ext_proc.v3.ProcessingResponse.mode_override>`.
// If not set, ``mode_override`` API in the response message will be ignored.
bool allow_mode_override = 14;

reserved 15;

// Options related to the sending and receiving of dynamic metadata
MetadataOptions metadata_options = 16;
}

// The MetadataOptions structure defines options for the sending and receiving of
// dynamic metadata. Specifically, which namespaces to send to the server, whether
// metadata returned by the server may be written, and how that metadata may be written.
message MetadataOptions {
message MetadataNamespaces {
// Specifies a list of metadata namespaces whose values, if present,
// will be passed to the ext_proc service as an opaque *protobuf::Struct*.
repeated string untyped = 1;

// Specifies a list of metadata namespaces whose values, if present,
// will be passed to the ext_proc service as a *protobuf::Any*. This allows
// envoy and the external processing server to share the protobuf message
// definition for safe parsing.
repeated string typed = 2;
}

// Describes which typed or untyped dynamic metadata namespaces to forward to
// the external processing server.
MetadataNamespaces forwarding_namespaces = 1;

// Describes which typed or untyped dynamic metadata namespaces to accept from
// the external processing server. Set to empty or leave unset to disallow writing
// any received dynamic metadata. Receiving of typed metadata is not supported.
MetadataNamespaces receiving_namespaces = 2;
}

// The HeaderForwardingRules structure specifies what headers are
Expand Down Expand Up @@ -240,7 +270,7 @@ message ExtProcPerRoute {
}

// Overrides that may be set on a per-route basis
// [#next-free-field: 6]
// [#next-free-field: 7]
message ExtProcOverrides {
// Set a different processing mode for this route than the default.
ProcessingMode processing_mode = 1;
Expand All @@ -259,4 +289,7 @@ message ExtProcOverrides {

// Set a different gRPC service for this route than the default.
config.core.v3.GrpcService grpc_service = 5;

// Options related to the sending and receiving of dynamic metadata
MetadataOptions metadata_options = 6;
}
11 changes: 7 additions & 4 deletions api/envoy/service/ext_proc/v3/external_processor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ service ExternalProcessor {

// This represents the different types of messages that Envoy can send
// to an external processing server.
// [#next-free-field: 8]
// [#next-free-field: 9]
message ProcessingRequest {
// Specify whether the filter that sent this request is running in synchronous
// or asynchronous mode. The choice of synchronous or asynchronous mode
Expand Down Expand Up @@ -115,6 +115,9 @@ message ProcessingRequest {
// in the filter configuration.
HttpTrailers response_trailers = 7;
}

// Dynamic metadata associated with the request.
config.core.v3.Metadata metadata_context = 8;
}

// For every ProcessingRequest received by the server with the ``async_mode`` field
Expand Down Expand Up @@ -158,9 +161,9 @@ message ProcessingResponse {
ImmediateResponse immediate_response = 7;
}

// [#not-implemented-hide:]
// Optional metadata that will be emitted as dynamic metadata to be consumed by the next
// filter. This metadata will be placed in the namespace ``envoy.filters.http.ext_proc``.
// Optional metadata that will be emitted as dynamic metadata to be consumed by
// following filters. This metadata will be placed in the namespace(s) specified by the top-level
// field name(s) of the struct.
google.protobuf.Struct dynamic_metadata = 8;

// Override how parts of the HTTP request and response are processed
Expand Down
173 changes: 162 additions & 11 deletions source/extensions/filters/http/ext_proc/ext_proc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,35 @@ FilterConfig::initExpressions(const Protobuf::RepeatedPtrField<std::string>& mat

FilterConfigPerRoute::FilterConfigPerRoute(const ExtProcPerRoute& config)
: disabled_(config.disabled()) {
if (config.has_overrides()) {
if (!config.has_overrides()) {
return;
}

const auto& overrides = config.overrides();
if (overrides.has_processing_mode()) {
processing_mode_ = config.overrides().processing_mode();
}
if (config.overrides().has_grpc_service()) {
grpc_service_ = config.overrides().grpc_service();
if (overrides.has_grpc_service()) {
grpc_service_ = overrides.grpc_service();
}

if (!overrides.has_metadata_options()) {
return;
}

const auto& md_opts = overrides.metadata_options();
if (md_opts.has_forwarding_namespaces()) {
untyped_forwarding_namespaces_ =
std::vector<std::string>(md_opts.forwarding_namespaces().untyped().begin(),
md_opts.forwarding_namespaces().untyped().end());
typed_forwarding_namespaces_ =
std::vector<std::string>(md_opts.forwarding_namespaces().typed().begin(),
md_opts.forwarding_namespaces().typed().end());
}
if (md_opts.has_receiving_namespaces()) {
untyped_receiving_namespaces_ =
std::vector<std::string>(md_opts.receiving_namespaces().untyped().begin(),
md_opts.receiving_namespaces().untyped().end());
}
}

Expand All @@ -154,6 +178,15 @@ void FilterConfigPerRoute::merge(const FilterConfigPerRoute& src) {
if (src.grpcService().has_value()) {
grpc_service_ = src.grpcService();
}
if (src.untypedForwardingMetadataNamespaces().has_value()) {
untyped_forwarding_namespaces_ = src.untypedForwardingMetadataNamespaces();
}
if (src.typedForwardingMetadataNamespaces().has_value()) {
typed_forwarding_namespaces_ = src.typedForwardingMetadataNamespaces();
}
if (src.untypedReceivingMetadataNamespaces().has_value()) {
untyped_receiving_namespaces_ = src.untypedReceivingMetadataNamespaces();
}
}

void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
Expand Down Expand Up @@ -246,6 +279,7 @@ FilterHeadersStatus Filter::onHeaders(ProcessorState& state,
state.setHeaders(&headers);
state.setHasNoBody(end_stream);
ProcessingRequest req;
addDynamicMetadata(state, req);
auto* headers_req = state.mutableHeaders(req);
MutationUtils::headersToProto(headers, config_->allowedHeaders(), config_->disallowedHeaders(),
*headers_req->mutable_headers());
Expand Down Expand Up @@ -660,6 +694,7 @@ void Filter::sendBodyChunk(ProcessorState& state, const Buffer::Instance& data,
state.onStartProcessorCall(std::bind(&Filter::onMessageTimeout, this), config_->messageTimeout(),
new_state);
ProcessingRequest req;
addDynamicMetadata(state, req);
auto* body_req = state.mutableBody(req);
body_req->set_end_of_stream(end_stream);
body_req->set_body(data.toString());
Expand All @@ -680,6 +715,7 @@ void Filter::sendBufferedData(ProcessorState& state, ProcessorState::CallbackSta

void Filter::sendTrailers(ProcessorState& state, const Http::HeaderMap& trailers) {
ProcessingRequest req;
addDynamicMetadata(state, req);
auto* trailers_req = state.mutableTrailers(req);
MutationUtils::headersToProto(trailers, config_->allowedHeaders(), config_->disallowedHeaders(),
*trailers_req->mutable_trailers());
Expand Down Expand Up @@ -719,6 +755,83 @@ void Filter::onNewTimeout(const ProtobufWkt::Duration& override_message_timeout)
stats_.override_message_timeout_received_.inc();
}

void Filter::addDynamicMetadata(ProcessorState& state, ProcessingRequest& req) {
// get the callbacks from the ProcessorState. This will be the appropriate
// callbacks for the current state of the filter
auto* cb = state.callbacks();
envoy::config::core::v3::Metadata forwarding_metadata;

// If metadata_context_namespaces is specified, pass matching filter metadata to the ext_proc
// service. If metadata key is set in both the connection and request metadata then the value
// will be the request metadata value. The metadata will only be searched for the callbacks
// corresponding to the traffic direction at the time of the external processing request.
const auto& request_metadata = cb->streamInfo().dynamicMetadata().filter_metadata();
for (const auto& context_key : state.untypedForwardingMetadataNamespaces()) {
if (const auto metadata_it = request_metadata.find(context_key);
metadata_it != request_metadata.end()) {
(*forwarding_metadata.mutable_filter_metadata())[metadata_it->first] = metadata_it->second;
} else if (cb->connection().has_value()) {
const auto& connection_metadata =
cb->connection().value().get().streamInfo().dynamicMetadata().filter_metadata();
if (const auto metadata_it = connection_metadata.find(context_key);
metadata_it != connection_metadata.end()) {
(*forwarding_metadata.mutable_filter_metadata())[metadata_it->first] = metadata_it->second;
}
}
}

// If typed_metadata_context_namespaces is specified, pass matching typed filter metadata to the
// ext_proc service. If metadata key is set in both the connection and request metadata then
// the value will be the request metadata value. The metadata will only be searched for the
// callbacks corresponding to the traffic direction at the time of the external processing
// request.
const auto& request_typed_metadata = cb->streamInfo().dynamicMetadata().typed_filter_metadata();
for (const auto& context_key : state.typedForwardingMetadataNamespaces()) {
if (const auto metadata_it = request_typed_metadata.find(context_key);
metadata_it != request_typed_metadata.end()) {
(*forwarding_metadata.mutable_typed_filter_metadata())[metadata_it->first] =
metadata_it->second;
} else if (cb->connection().has_value()) {
const auto& connection_typed_metadata =
cb->connection().value().get().streamInfo().dynamicMetadata().typed_filter_metadata();
if (const auto metadata_it = connection_typed_metadata.find(context_key);
metadata_it != connection_typed_metadata.end()) {
(*forwarding_metadata.mutable_typed_filter_metadata())[metadata_it->first] =
metadata_it->second;
}
}
}

*req.mutable_metadata_context() = forwarding_metadata;
}

void Filter::setDynamicMetadata(Http::StreamFilterCallbacks* cb, const ProcessorState& state,
std::unique_ptr<ProcessingResponse>& response) {
bool has_receiving_namespaces = state.untypedReceivingMetadataNamespaces().size() > 0;
if (!(has_receiving_namespaces && response->has_dynamic_metadata())) {
return;
}

if (response->has_dynamic_metadata()) {
auto response_metadata = response->dynamic_metadata().fields();
auto receiving_namespaces = state.untypedReceivingMetadataNamespaces();
for (const auto& context_key : response_metadata) {
if (auto metadata_it = std::find(receiving_namespaces.begin(), receiving_namespaces.end(),
context_key.first);
metadata_it != receiving_namespaces.end())
cb->streamInfo().setDynamicMetadata(context_key.first,
response_metadata.at(context_key.first).struct_value());
}
}
}

void Filter::setEncoderDynamicMetadata(std::unique_ptr<ProcessingResponse>& response) {
setDynamicMetadata(encoder_callbacks_, encoding_state_, response);
}
void Filter::setDecoderDynamicMetadata(std::unique_ptr<ProcessingResponse>& response) {
setDynamicMetadata(decoder_callbacks_, decoding_state_, response);
}

void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
if (processing_complete_) {
ENVOY_LOG(debug, "Ignoring stream message received after processing complete");
Expand Down Expand Up @@ -748,32 +861,46 @@ void Filter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&& r) {
absl::Status processing_status;
switch (response->response_case()) {
case ProcessingResponse::ResponseCase::kRequestHeaders:
setDecoderDynamicMetadata(response);
processing_status = decoding_state_.handleHeadersResponse(response->request_headers());
break;
case ProcessingResponse::ResponseCase::kResponseHeaders:
setEncoderDynamicMetadata(response);
processing_status = encoding_state_.handleHeadersResponse(response->response_headers());
break;
case ProcessingResponse::ResponseCase::kRequestBody:
setDecoderDynamicMetadata(response);
processing_status = decoding_state_.handleBodyResponse(response->request_body());
break;
case ProcessingResponse::ResponseCase::kResponseBody:
setEncoderDynamicMetadata(response);
processing_status = encoding_state_.handleBodyResponse(response->response_body());
break;
case ProcessingResponse::ResponseCase::kRequestTrailers:
setDecoderDynamicMetadata(response);
processing_status = decoding_state_.handleTrailersResponse(response->request_trailers());
break;
case ProcessingResponse::ResponseCase::kResponseTrailers:
setEncoderDynamicMetadata(response);
processing_status = encoding_state_.handleTrailersResponse(response->response_trailers());
break;
case ProcessingResponse::ResponseCase::kImmediateResponse:
// We won't be sending anything more to the stream after we
// receive this message.
ENVOY_LOG(debug, "Sending immediate response");
processing_complete_ = true;
closeStream();
onFinishProcessorCalls(Grpc::Status::Ok);
sendImmediateResponse(response->immediate_response());
processing_status = absl::OkStatus();
if (config_->disableImmediateResponse()) {
ENVOY_LOG(debug, "Filter has disable_immediate_response configured. "
"Treat the immediate response message as spurious response.");
processing_status =
absl::FailedPreconditionError("unhandled immediate response due to config disabled it");
} else {
setDecoderDynamicMetadata(response);
// We won't be sending anything more to the stream after we
// receive this message.
ENVOY_LOG(debug, "Sending immediate response");
processing_complete_ = true;
onFinishProcessorCalls(Grpc::Status::Ok);
closeStream();
sendImmediateResponse(response->immediate_response());
processing_status = absl::OkStatus();
}
break;
default:
// Any other message is considered spurious
Expand Down Expand Up @@ -966,6 +1093,30 @@ void Filter::mergePerRouteConfig() {
ENVOY_LOG(trace, "Setting new GrpcService from per-route configuration");
grpc_service_ = *merged_config->grpcService();
}
if (merged_config->untypedForwardingMetadataNamespaces()) {
ENVOY_LOG(trace,
"Setting new untyped forwarding metadata namespaces from per-route configuration");
decoding_state_.setUntypedForwardingMetadataNamespaces(
*merged_config->untypedForwardingMetadataNamespaces());
encoding_state_.setUntypedForwardingMetadataNamespaces(
*merged_config->untypedForwardingMetadataNamespaces());
}
if (merged_config->typedForwardingMetadataNamespaces()) {
ENVOY_LOG(trace,
"Setting new typed forwarding metadata namespaces from per-route configuration");
decoding_state_.setTypedForwardingMetadataNamespaces(
*merged_config->typedForwardingMetadataNamespaces());
encoding_state_.setTypedForwardingMetadataNamespaces(
*merged_config->typedForwardingMetadataNamespaces());
}
if (merged_config->untypedReceivingMetadataNamespaces()) {
ENVOY_LOG(trace,
"Setting new untyped receiving metadata namespaces from per-route configuration");
decoding_state_.setUntypedReceivingMetadataNamespaces(
*merged_config->untypedReceivingMetadataNamespaces());
encoding_state_.setUntypedReceivingMetadataNamespaces(
*merged_config->untypedReceivingMetadataNamespaces());
}
}

std::string responseCaseToString(const ProcessingResponse::ResponseCase response_case) {
Expand Down
Loading

0 comments on commit 369bd16

Please sign in to comment.