diff --git a/CODEOWNERS b/CODEOWNERS index 20097a403f08..c0918fe362df 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -92,4 +92,6 @@ extensions/filters/common/original_src @snowp @klarose /*/extensions/filters/network/echo @htuch @alyssawilk /*/extensions/filters/udp/udp_proxy @mattklein123 @danzh2010 /*/extensions/clusters/aggregate @yxue @snowp +# support for on-demand VHDS requests +/*/extensions/filters/http/on_demand @dmitri-d @htuch @lambdai /*/extensions/filters/network/local_ratelimit @mattklein123 @junr03 diff --git a/api/BUILD b/api/BUILD index caff7f0860a6..7fe8f9bb2a85 100644 --- a/api/BUILD +++ b/api/BUILD @@ -51,6 +51,7 @@ proto_library( "//envoy/config/filter/http/ip_tagging/v2:pkg", "//envoy/config/filter/http/jwt_authn/v2alpha:pkg", "//envoy/config/filter/http/lua/v2:pkg", + "//envoy/config/filter/http/on_demand/v2:pkg", "//envoy/config/filter/http/original_src/v2alpha1:pkg", "//envoy/config/filter/http/rate_limit/v2:pkg", "//envoy/config/filter/http/rbac/v2:pkg", @@ -143,6 +144,7 @@ proto_library( "//envoy/extensions/filters/http/ip_tagging/v3alpha:pkg", "//envoy/extensions/filters/http/jwt_authn/v3alpha:pkg", "//envoy/extensions/filters/http/lua/v3alpha:pkg", + "//envoy/extensions/filters/http/on_demand/v3alpha:pkg", "//envoy/extensions/filters/http/original_src/v3alpha:pkg", "//envoy/extensions/filters/http/ratelimit/v3alpha:pkg", "//envoy/extensions/filters/http/rbac/v3alpha:pkg", diff --git a/api/docs/BUILD b/api/docs/BUILD index 0a18ecaa79e0..9e20a69f1c8a 100644 --- a/api/docs/BUILD +++ b/api/docs/BUILD @@ -51,6 +51,7 @@ proto_library( "//envoy/config/filter/http/ip_tagging/v2:pkg", "//envoy/config/filter/http/jwt_authn/v2alpha:pkg", "//envoy/config/filter/http/lua/v2:pkg", + "//envoy/config/filter/http/on_demand/v2:pkg", "//envoy/config/filter/http/original_src/v2alpha1:pkg", "//envoy/config/filter/http/rate_limit/v2:pkg", "//envoy/config/filter/http/rbac/v2:pkg", diff --git a/api/envoy/config/filter/http/on_demand/v2/BUILD b/api/envoy/config/filter/http/on_demand/v2/BUILD new file mode 100644 index 000000000000..ef3541ebcb1d --- /dev/null +++ b/api/envoy/config/filter/http/on_demand/v2/BUILD @@ -0,0 +1,9 @@ +# DO NOT EDIT. This file is generated by tools/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"], +) diff --git a/api/envoy/config/filter/http/on_demand/v2/on_demand.proto b/api/envoy/config/filter/http/on_demand/v2/on_demand.proto new file mode 100644 index 000000000000..b9df400acdbd --- /dev/null +++ b/api/envoy/config/filter/http/on_demand/v2/on_demand.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package envoy.config.filter.http.on_demand.v2; + +import "udpa/annotations/migrate.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.config.filter.http.on_demand.v2"; +option java_outer_classname = "OnDemandProto"; +option java_multiple_files = true; +option (udpa.annotations.file_migrate).move_to_package = + "envoy.extensions.filters.http.on_demand.v3alpha"; + +// [#protodoc-title: OnDemand] +// IP tagging :ref:`configuration overview `. +// [#extension: envoy.filters.http.on_demand] + +message OnDemand { +} diff --git a/api/envoy/extensions/filters/http/on_demand/v3alpha/BUILD b/api/envoy/extensions/filters/http/on_demand/v3alpha/BUILD new file mode 100644 index 000000000000..7f2546d2b5fe --- /dev/null +++ b/api/envoy/extensions/filters/http/on_demand/v3alpha/BUILD @@ -0,0 +1,12 @@ +# DO NOT EDIT. This file is generated by tools/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/filter/http/on_demand/v2:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + ], +) diff --git a/api/envoy/extensions/filters/http/on_demand/v3alpha/on_demand.proto b/api/envoy/extensions/filters/http/on_demand/v3alpha/on_demand.proto new file mode 100644 index 000000000000..8c5516adacd6 --- /dev/null +++ b/api/envoy/extensions/filters/http/on_demand/v3alpha/on_demand.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package envoy.extensions.filters.http.on_demand.v3alpha; + +import "udpa/annotations/versioning.proto"; + +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.http.on_demand.v3alpha"; +option java_outer_classname = "OnDemandProto"; +option java_multiple_files = true; + +// [#protodoc-title: OnDemand] +// IP tagging :ref:`configuration overview `. +// [#extension: envoy.filters.http.on_demand] + +message OnDemand { + option (udpa.annotations.versioning).previous_message_type = + "envoy.config.filter.http.on_demand.v2.OnDemand"; +} diff --git a/docs/root/configuration/http/http_filters/http_filters.rst b/docs/root/configuration/http/http_filters/http_filters.rst index 3c5a1eef39cd..2824c1e4c2d2 100644 --- a/docs/root/configuration/http/http_filters/http_filters.rst +++ b/docs/root/configuration/http/http_filters/http_filters.rst @@ -25,6 +25,7 @@ HTTP filters ip_tagging_filter jwt_authn_filter lua_filter + on_demand_updates_filter original_src_filter rate_limit_filter rbac_filter diff --git a/docs/root/configuration/http/http_filters/on_demand_updates_filter.rst b/docs/root/configuration/http/http_filters/on_demand_updates_filter.rst new file mode 100644 index 000000000000..ec6eedc5ab88 --- /dev/null +++ b/docs/root/configuration/http/http_filters/on_demand_updates_filter.rst @@ -0,0 +1,18 @@ +.. _config_http_filters_on_demand: + +On-demand VHDS Updates +====================== + +The on-demand VHDS filter is used to request a :ref:`virtual host ` +data if it's not already present in the :ref:`Route Configuration `. The +contents of the *Host* or *:authority* header is used to create the on-demand request. For an on-demand +request to be created, :ref:`VHDS ` must be enabled and either *Host* +or *:authority* header be present. + +On-demand VHDS cannot be used with SRDS at this point. + +Configuration +------------- +* :ref:`v2 API reference ` +* This filter should be configured with the name *envoy.on_demand*. +* The filter should be placed before *envoy.router* filter in the HttpConnectionManager's filter chain. diff --git a/generated_api_shadow/BUILD b/generated_api_shadow/BUILD index caff7f0860a6..7fe8f9bb2a85 100644 --- a/generated_api_shadow/BUILD +++ b/generated_api_shadow/BUILD @@ -51,6 +51,7 @@ proto_library( "//envoy/config/filter/http/ip_tagging/v2:pkg", "//envoy/config/filter/http/jwt_authn/v2alpha:pkg", "//envoy/config/filter/http/lua/v2:pkg", + "//envoy/config/filter/http/on_demand/v2:pkg", "//envoy/config/filter/http/original_src/v2alpha1:pkg", "//envoy/config/filter/http/rate_limit/v2:pkg", "//envoy/config/filter/http/rbac/v2:pkg", @@ -143,6 +144,7 @@ proto_library( "//envoy/extensions/filters/http/ip_tagging/v3alpha:pkg", "//envoy/extensions/filters/http/jwt_authn/v3alpha:pkg", "//envoy/extensions/filters/http/lua/v3alpha:pkg", + "//envoy/extensions/filters/http/on_demand/v3alpha:pkg", "//envoy/extensions/filters/http/original_src/v3alpha:pkg", "//envoy/extensions/filters/http/ratelimit/v3alpha:pkg", "//envoy/extensions/filters/http/rbac/v3alpha:pkg", diff --git a/generated_api_shadow/envoy/config/filter/http/on_demand/v2/BUILD b/generated_api_shadow/envoy/config/filter/http/on_demand/v2/BUILD new file mode 100644 index 000000000000..ef3541ebcb1d --- /dev/null +++ b/generated_api_shadow/envoy/config/filter/http/on_demand/v2/BUILD @@ -0,0 +1,9 @@ +# DO NOT EDIT. This file is generated by tools/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = ["@com_github_cncf_udpa//udpa/annotations:pkg"], +) diff --git a/generated_api_shadow/envoy/config/filter/http/on_demand/v2/on_demand.proto b/generated_api_shadow/envoy/config/filter/http/on_demand/v2/on_demand.proto new file mode 100644 index 000000000000..b9df400acdbd --- /dev/null +++ b/generated_api_shadow/envoy/config/filter/http/on_demand/v2/on_demand.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package envoy.config.filter.http.on_demand.v2; + +import "udpa/annotations/migrate.proto"; +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.config.filter.http.on_demand.v2"; +option java_outer_classname = "OnDemandProto"; +option java_multiple_files = true; +option (udpa.annotations.file_migrate).move_to_package = + "envoy.extensions.filters.http.on_demand.v3alpha"; + +// [#protodoc-title: OnDemand] +// IP tagging :ref:`configuration overview `. +// [#extension: envoy.filters.http.on_demand] + +message OnDemand { +} diff --git a/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3alpha/BUILD b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3alpha/BUILD new file mode 100644 index 000000000000..7f2546d2b5fe --- /dev/null +++ b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3alpha/BUILD @@ -0,0 +1,12 @@ +# DO NOT EDIT. This file is generated by tools/proto_sync.py. + +load("@envoy_api//bazel:api_build_system.bzl", "api_proto_package") + +licenses(["notice"]) # Apache 2 + +api_proto_package( + deps = [ + "//envoy/config/filter/http/on_demand/v2:pkg", + "@com_github_cncf_udpa//udpa/annotations:pkg", + ], +) diff --git a/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3alpha/on_demand.proto b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3alpha/on_demand.proto new file mode 100644 index 000000000000..8c5516adacd6 --- /dev/null +++ b/generated_api_shadow/envoy/extensions/filters/http/on_demand/v3alpha/on_demand.proto @@ -0,0 +1,20 @@ +syntax = "proto3"; + +package envoy.extensions.filters.http.on_demand.v3alpha; + +import "udpa/annotations/versioning.proto"; + +import "validate/validate.proto"; + +option java_package = "io.envoyproxy.envoy.extensions.filters.http.on_demand.v3alpha"; +option java_outer_classname = "OnDemandProto"; +option java_multiple_files = true; + +// [#protodoc-title: OnDemand] +// IP tagging :ref:`configuration overview `. +// [#extension: envoy.filters.http.on_demand] + +message OnDemand { + option (udpa.annotations.versioning).previous_message_type = + "envoy.config.filter.http.on_demand.v2.OnDemand"; +} diff --git a/include/envoy/http/filter.h b/include/envoy/http/filter.h index e3a50d7ec2cd..e69dc500c879 100644 --- a/include/envoy/http/filter.h +++ b/include/envoy/http/filter.h @@ -193,6 +193,22 @@ class StreamFilterCallbacks { virtual const ScopeTrackedObject& scope() PURE; }; +/** + * RouteConfigUpdatedCallback is used to notify an OnDemandRouteUpdate filter about completion of a + * RouteConfig update. The filter (and the associated ActiveStream) where the original on-demand + * request was originated can be destroyed before a response to an on-demand update request is + * received and updates are propagated. To handle this: + * + * OnDemandRouteUpdate filter instance holds a RouteConfigUpdatedCallbackSharedPtr to a callback. + * Envoy::Router::RdsRouteConfigProviderImpl holds a weak pointer to the RouteConfigUpdatedCallback + * above in an Envoy::Router::UpdateOnDemandCallback struct + * + * In RdsRouteConfigProviderImpl::onConfigUpdate(), before invoking the callback, a check is made to + * verify if the callback is still available. + */ +using RouteConfigUpdatedCallback = std::function; +using RouteConfigUpdatedCallbackSharedPtr = std::shared_ptr; + /** * Stream decoder filter callbacks add additional callbacks that allow a decoding filter to restart * decoding if they decide to hold data (e.g. for buffering or rate limiting). @@ -436,6 +452,23 @@ class StreamDecoderFilterCallbacks : public virtual StreamFilterCallbacks { * @return The socket options to be applied to the upstream request. */ virtual Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const PURE; + + /** + * Schedules a request for a RouteConfiguration update from the management server. + * @param route_config_updated_cb callback to be called when the configuration update has been + * propagated to the worker thread. + */ + virtual void + requestRouteConfigUpdate(RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) PURE; + + /** + * + * @return absl::optional. Contains a value if a non-scoped RDS + * route config provider is used. Scoped RDS provides are not supported at the moment, as + * retrieval of a route configuration in their case requires passing of http request headers + * as a parameter. + */ + virtual absl::optional routeConfig() PURE; }; /** diff --git a/include/envoy/router/rds.h b/include/envoy/router/rds.h index efa2e52edadc..455c9c6f7acb 100644 --- a/include/envoy/router/rds.h +++ b/include/envoy/router/rds.h @@ -3,6 +3,7 @@ #include #include "envoy/config/route/v3alpha/route.pb.h" +#include "envoy/http/filter.h" #include "envoy/router/router.h" namespace Envoy { @@ -54,6 +55,17 @@ class RouteConfigProvider { */ virtual void validateConfig(const envoy::config::route::v3alpha::RouteConfiguration& config) const PURE; + + /** + * Callback used to request an update to the route configuration from the management server. + * @param for_domain supplies the domain name that virtual hosts must match on + * @param thread_local_dispatcher thread-local dispatcher + * @param route_config_updated_cb callback to be called when the configuration update has been + * propagated to worker threads + */ + virtual void requestVirtualHostsUpdate( + const std::string& for_domain, Event::Dispatcher& thread_local_dispatcher, + std::weak_ptr route_config_updated_cb) PURE; }; using RouteConfigProviderPtr = std::unique_ptr; diff --git a/include/envoy/router/route_config_update_receiver.h b/include/envoy/router/route_config_update_receiver.h index 8d66770b1bd4..44f4508197ba 100644 --- a/include/envoy/router/route_config_update_receiver.h +++ b/include/envoy/router/route_config_update_receiver.h @@ -55,6 +55,14 @@ class RouteConfigUpdateReceiver { */ virtual const std::string& configVersion() PURE; + /** + * @return bool return whether VHDS configuration has been changed in the last RDS update. + */ + // TODO(dmitri-d): Consider splitting RouteConfigUpdateReceiver into a RouteConfig state and a + // last update state. The latter could be passed to callbacks as a parameter, which would make the + // intent and the lifecycle of the "last update state" less muddled. + virtual bool vhdsConfigurationChanged() const PURE; + /** * @return uint64_t the hash value of RouteConfiguration. */ @@ -76,6 +84,12 @@ class RouteConfigUpdateReceiver { * @return SystemTime the time of the last update. */ virtual SystemTime lastUpdated() const PURE; + + /** + * @return the union of all resource names and aliases (if any) received with the last VHDS + * update. + */ + virtual const std::set& resourceIdsInLastVhdsUpdate() PURE; }; using RouteConfigUpdatePtr = std::unique_ptr; diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 90d62be087bb..2953ddf84701 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -43,6 +43,7 @@ class DeltaSubscriptionImpl : public Subscription, public SubscriptionCallbacks // Config::Subscription void start(const std::set& resource_names) override; + void updateResourceInterest(const std::set& update_to_these_names) override; // Config::SubscriptionCallbacks (all pass through to callbacks_!) diff --git a/source/common/config/delta_subscription_state.cc b/source/common/config/delta_subscription_state.cc index 5ae88beaae89..59710d14528c 100644 --- a/source/common/config/delta_subscription_state.cc +++ b/source/common/config/delta_subscription_state.cc @@ -75,6 +75,10 @@ void DeltaSubscriptionState::handleGoodResponse( throw EnvoyException( fmt::format("duplicate name {} found among added/updated resources", resource.name())); } + // DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource + if (!resource.has_resource() && resource.aliases_size() > 0) { + continue; + } if (message.type_url() != resource.resource().type_url()) { throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match " "the message-wide type URL {} in DeltaDiscoveryResponse {}", diff --git a/source/common/config/delta_subscription_state.h b/source/common/config/delta_subscription_state.h index 2d0d53b7fa4f..3edf4df961f6 100644 --- a/source/common/config/delta_subscription_state.h +++ b/source/common/config/delta_subscription_state.h @@ -29,6 +29,7 @@ class DeltaSubscriptionState : public Logger::Loggable { // Update which resources we're interested in subscribing to. void updateSubscriptionInterest(const std::set& cur_added, const std::set& cur_removed); + void addAliasesToResolve(const std::set& aliases); // Whether there was a change in our subscription interest we have yet to inform the server of. bool subscriptionUpdatePending() const; @@ -79,6 +80,8 @@ class DeltaSubscriptionState : public Logger::Loggable { void setResourceVersion(const std::string& resource_name, const std::string& resource_version); void setResourceWaitingForServer(const std::string& resource_name); void setLostInterestInResource(const std::string& resource_name); + void + populateDiscoveryRequest(envoy::service::discovery::v3alpha::DeltaDiscoveryResponse& request); // A map from resource name to per-resource version. The keys of this map are exactly the resource // names we are currently interested in. Those in the waitingForServer state currently don't have diff --git a/source/common/config/new_grpc_mux_impl.cc b/source/common/config/new_grpc_mux_impl.cc index 3a6fd886b920..213105f94a21 100644 --- a/source/common/config/new_grpc_mux_impl.cc +++ b/source/common/config/new_grpc_mux_impl.cc @@ -65,6 +65,17 @@ void NewGrpcMuxImpl::onDiscoveryResponse( message->system_version_info(), message->type_url()); return; } + + // When an on-demand request is made a Watch is created using an alias, as the resource name isn't + // known at that point. When an update containing aliases comes back, we update Watches with + // resource names. + for (const auto& r : message->resources()) { + if (r.aliases_size() > 0) { + AddedRemoved converted = sub->second->watch_map_.convertAliasWatchesToNameWatches(r); + sub->second->sub_state_.updateSubscriptionInterest(converted.added_, converted.removed_); + } + } + kickOffAck(sub->second->sub_state_.handleResponse(*message)); } diff --git a/source/common/config/new_grpc_mux_impl.h b/source/common/config/new_grpc_mux_impl.h index 8f52d8c46380..22ee042bbe83 100644 --- a/source/common/config/new_grpc_mux_impl.h +++ b/source/common/config/new_grpc_mux_impl.h @@ -60,6 +60,25 @@ class NewGrpcMuxImpl GrpcMuxCallbacks&) override; void start() override; + struct SubscriptionStuff { + SubscriptionStuff(const std::string& type_url, std::chrono::milliseconds init_fetch_timeout, + Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info) + : sub_state_(type_url, watch_map_, local_info, init_fetch_timeout, dispatcher), + init_fetch_timeout_(init_fetch_timeout) {} + + WatchMap watch_map_; + DeltaSubscriptionState sub_state_; + const std::chrono::milliseconds init_fetch_timeout_; + + SubscriptionStuff(const SubscriptionStuff&) = delete; + SubscriptionStuff& operator=(const SubscriptionStuff&) = delete; + }; + + // for use in tests only + const absl::flat_hash_map>& subscriptions() { + return subscriptions_; + } + private: Watch* addWatch(const std::string& type_url, const std::set& resources, SubscriptionCallbacks& callbacks, std::chrono::milliseconds init_fetch_timeout); @@ -94,19 +113,6 @@ class NewGrpcMuxImpl // description of how it interacts with pause() and resume(). PausableAckQueue pausable_ack_queue_; - struct SubscriptionStuff { - SubscriptionStuff(const std::string& type_url, std::chrono::milliseconds init_fetch_timeout, - Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info) - : sub_state_(type_url, watch_map_, local_info, init_fetch_timeout, dispatcher), - init_fetch_timeout_(init_fetch_timeout) {} - - WatchMap watch_map_; - DeltaSubscriptionState sub_state_; - const std::chrono::milliseconds init_fetch_timeout_; - - SubscriptionStuff(const SubscriptionStuff&) = delete; - SubscriptionStuff& operator=(const SubscriptionStuff&) = delete; - }; // Map key is type_url. absl::flat_hash_map> subscriptions_; diff --git a/source/common/config/watch_map.cc b/source/common/config/watch_map.cc index b03bd52cd0a1..d508c6897b1f 100644 --- a/source/common/config/watch_map.cc +++ b/source/common/config/watch_map.cc @@ -95,6 +95,32 @@ void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField } } +// For responses to on-demand requests, replace the original watch for an alias +// with one for the resource's name +AddedRemoved WatchMap::convertAliasWatchesToNameWatches( + const envoy::service::discovery::v3alpha::Resource& resource) { + absl::flat_hash_set watches_to_update; + for (const auto& alias : resource.aliases()) { + const auto interested_watches = watch_interest_.find(alias); + if (interested_watches != watch_interest_.end()) { + for (const auto& interested_watch : interested_watches->second) { + watches_to_update.insert(interested_watch); + } + } + } + + auto ret = AddedRemoved({}, {}); + for (const auto& watch : watches_to_update) { + const auto& converted_watches = updateWatchInterest(watch, {resource.name()}); + std::copy(converted_watches.added_.begin(), converted_watches.added_.end(), + std::inserter(ret.added_, ret.added_.end())); + std::copy(converted_watches.removed_.begin(), converted_watches.removed_.end(), + std::inserter(ret.removed_, ret.removed_.end())); + } + + return ret; +} + void WatchMap::onConfigUpdate( const Protobuf::RepeatedPtrField& added_resources, const Protobuf::RepeatedPtrField& removed_resources, diff --git a/source/common/config/watch_map.h b/source/common/config/watch_map.h index 6234bd4f0066..e536b2395b9f 100644 --- a/source/common/config/watch_map.h +++ b/source/common/config/watch_map.h @@ -77,6 +77,10 @@ class WatchMap : public SubscriptionCallbacks, public Logger::Loggable& resources, const std::string& version_info) override; diff --git a/source/common/http/async_client_impl.h b/source/common/http/async_client_impl.h index 82ff8a1bab1f..d9e8e1ca4a35 100644 --- a/source/common/http/async_client_impl.h +++ b/source/common/http/async_client_impl.h @@ -83,6 +83,12 @@ class AsyncStreamImpl : public AsyncClient::Stream, AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks, const AsyncClient::StreamOptions& options); + // Http::StreamDecoderFilterCallbacks + void requestRouteConfigUpdate(Http::RouteConfigUpdatedCallbackSharedPtr) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } + absl::optional routeConfig() override { return {}; } + // Http::AsyncClient::Stream void sendHeaders(HeaderMap& headers, bool end_stream) override; void sendData(Buffer::Instance& data, bool end_stream) override; diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index ca3eae934670..30e9c255e152 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -487,6 +487,13 @@ void ConnectionManagerImpl::chargeTracingStats(const Tracing::Reason& tracing_re } } +void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestRouteConfigUpdate( + const std::string host_header, Event::Dispatcher& thread_local_dispatcher, + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) { + route_config_provider_->requestVirtualHostsUpdate(host_header, thread_local_dispatcher, + std::move(route_config_updated_cb)); +} + ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager) : connection_manager_(connection_manager), stream_id_(connection_manager.random_generator_.random()), @@ -502,7 +509,16 @@ ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connect connection_manager.config_.scopedRouteConfigProvider() == nullptr)), "Either routeConfigProvider or scopedRouteConfigProvider should be set in " "ConnectionManagerImpl."); - + if (connection_manager_.config_.isRoutable() && + connection_manager.config_.routeConfigProvider() != nullptr) { + route_config_update_requester_ = + std::make_unique( + connection_manager.config_.routeConfigProvider()); + } else if (connection_manager_.config_.isRoutable() && + connection_manager.config_.scopedRouteConfigProvider() != nullptr) { + route_config_update_requester_ = + std::make_unique(); + } ScopeTrackerScopeState scope(this, connection_manager_.read_callbacks_->connection().dispatcher()); @@ -1364,6 +1380,24 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedTracingCustomTags() { } } +void ConnectionManagerImpl::ActiveStream::requestRouteConfigUpdate( + Event::Dispatcher& thread_local_dispatcher, + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) { + ASSERT(!request_headers_->Host()->value().empty()); + const auto& host_header = + absl::AsciiStrToLower(request_headers_->Host()->value().getStringView()); + route_config_update_requester_->requestRouteConfigUpdate(host_header, thread_local_dispatcher, + std::move(route_config_updated_cb)); +} + +absl::optional ConnectionManagerImpl::ActiveStream::routeConfig() { + if (connection_manager_.config_.routeConfigProvider() == nullptr) { + return {}; + } + return absl::optional( + connection_manager_.config_.routeConfigProvider()->config()); +} + void ConnectionManagerImpl::ActiveStream::sendLocalReply( bool is_grpc_request, Code code, absl::string_view body, const std::function& modify_headers, bool is_head_request, @@ -2261,6 +2295,16 @@ bool ConnectionManagerImpl::ActiveStreamDecoderFilter::recreateStream() { return true; } +void ConnectionManagerImpl::ActiveStreamDecoderFilter::requestRouteConfigUpdate( + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) { + parent_.requestRouteConfigUpdate(dispatcher(), std::move(route_config_updated_cb)); +} + +absl::optional +ConnectionManagerImpl::ActiveStreamDecoderFilter::routeConfig() { + return parent_.routeConfig(); +} + Buffer::WatermarkBufferPtr ConnectionManagerImpl::ActiveStreamEncoderFilter::createBuffer() { auto buffer = new Buffer::WatermarkBuffer([this]() -> void { this->responseDataDrained(); }, [this]() -> void { this->responseDataTooLarge(); }); diff --git a/source/common/http/conn_manager_impl.h b/source/common/http/conn_manager_impl.h index ec9f58326661..96549f865330 100644 --- a/source/common/http/conn_manager_impl.h +++ b/source/common/http/conn_manager_impl.h @@ -312,6 +312,10 @@ class ConnectionManagerImpl : Logger::Loggable, void requestDataTooLarge(); void requestDataDrained(); + void requestRouteConfigUpdate( + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) override; + absl::optional routeConfig() override; + StreamDecoderFilterSharedPtr handle_; bool is_grpc_request_{}; }; @@ -386,6 +390,36 @@ class ConnectionManagerImpl : Logger::Loggable, using ActiveStreamEncoderFilterPtr = std::unique_ptr; + // Used to abstract making of RouteConfig update request. + // RdsRouteConfigUpdateRequester is used when an RdsRouteConfigProvider is configured, + // NullRouteConfigUpdateRequester is used in all other cases (specifically when + // ScopedRdsConfigProvider/InlineScopedRoutesConfigProvider is configured) + class RouteConfigUpdateRequester { + public: + virtual ~RouteConfigUpdateRequester() = default; + virtual void requestRouteConfigUpdate(const std::string, Event::Dispatcher&, + Http::RouteConfigUpdatedCallbackSharedPtr) { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + }; + }; + + class RdsRouteConfigUpdateRequester : public RouteConfigUpdateRequester { + public: + RdsRouteConfigUpdateRequester(Router::RouteConfigProvider* route_config_provider) + : route_config_provider_(route_config_provider) {} + void requestRouteConfigUpdate( + const std::string host_header, Event::Dispatcher& thread_local_dispatcher, + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) override; + + private: + Router::RouteConfigProvider* route_config_provider_; + }; + + class NullRouteConfigUpdateRequester : public RouteConfigUpdateRequester { + public: + NullRouteConfigUpdateRequester() = default; + }; + /** * Wraps a single active stream on the connection. These are either full request/response pairs * or pushes. @@ -505,6 +539,10 @@ class ConnectionManagerImpl : Logger::Loggable, void snapScopedRouteConfig(); void refreshCachedRoute(); + void + requestRouteConfigUpdate(Event::Dispatcher& thread_local_dispatcher, + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb); + absl::optional routeConfig(); void refreshCachedTracingCustomTags(); @@ -644,6 +682,7 @@ class ConnectionManagerImpl : Logger::Loggable, // response. bool encoding_headers_only_{}; Network::Socket::OptionsSharedPtr upstream_options_; + std::unique_ptr route_config_update_requester_; std::unique_ptr tracing_custom_tags_{nullptr}; }; diff --git a/source/common/router/config_impl.h b/source/common/router/config_impl.h index fab8f504106e..897afe4c58fe 100644 --- a/source/common/router/config_impl.h +++ b/source/common/router/config_impl.h @@ -805,9 +805,9 @@ class RouteMatcher { RouteConstSharedPtr route(const Http::HeaderMap& headers, const StreamInfo::StreamInfo& stream_info, uint64_t random_value) const; -private: const VirtualHostImpl* findVirtualHost(const Http::HeaderMap& headers) const; +private: using WildcardVirtualHosts = std::map, std::greater<>>; using SubstringFunction = std::function; @@ -843,6 +843,10 @@ class ConfigImpl : public Config { const HeaderParser& requestHeaderParser() const { return *request_headers_parser_; }; const HeaderParser& responseHeaderParser() const { return *response_headers_parser_; }; + bool virtualHostExists(const Http::HeaderMap& headers) const { + return route_matcher_->findVirtualHost(headers) != nullptr; + } + // Router::Config RouteConstSharedPtr route(const Http::HeaderMap& headers, const StreamInfo::StreamInfo& stream_info, diff --git a/source/common/router/rds_impl.cc b/source/common/router/rds_impl.cc index 154cd5e83a7a..a1c1779d66e6 100644 --- a/source/common/router/rds_impl.cc +++ b/source/common/router/rds_impl.cc @@ -17,6 +17,7 @@ #include "common/common/fmt.h" #include "common/config/api_version.h" #include "common/config/utility.h" +#include "common/http/header_map_impl.h" #include "common/protobuf/utility.h" #include "common/router/config_impl.h" @@ -118,12 +119,13 @@ void RdsRouteConfigSubscription::onConfigUpdate( if (config_update_info_->onRdsUpdate(route_config, version_info)) { stats_.config_reload_.inc(); - if (config_update_info_->routeConfiguration().has_vhds()) { - ENVOY_LOG(debug, "rds: vhds configuration present, starting vhds: config_name={} hash={}", - route_config_name_, config_update_info_->configHash()); + if (config_update_info_->routeConfiguration().has_vhds() && + config_update_info_->vhdsConfigurationChanged()) { + ENVOY_LOG( + debug, + "rds: vhds configuration present/changed, (re)starting vhds: config_name={} hash={}", + route_config_name_, config_update_info_->configHash()); maybeCreateInitManager(version_info, noop_init_manager, resume_rds); - // TODO(dmitri-d): It's unsafe to depend directly on factory context here, - // the listener might have been torn down, need to remove this. vhds_subscription_ = std::make_unique( config_update_info_, factory_context_, stat_prefix_, route_config_providers_, config_update_info_->routeConfiguration().vhds().config_source().resource_api_version()); @@ -136,7 +138,10 @@ void RdsRouteConfigSubscription::onConfigUpdate( for (auto* provider : route_config_providers_) { provider->onConfigUpdate(); } - vhds_subscription_.release(); + // RDS update removed VHDS configuration + if (!config_update_info_->routeConfiguration().has_vhds()) { + vhds_subscription_.release(); + } } update_callback_manager_.runCallbacks(); } @@ -191,6 +196,13 @@ void RdsRouteConfigSubscription::onConfigUpdateFailed( init_target_.ready(); } +void RdsRouteConfigSubscription::updateOnDemand(const std::string& aliases) { + if (vhds_subscription_.get() == nullptr) { + return; + } + vhds_subscription_->updateOnDemand(aliases); +} + bool RdsRouteConfigSubscription::validateUpdateSize(int num_resources) { if (num_resources == 0) { ENVOY_LOG(debug, "Missing RouteConfiguration for {} in onConfigUpdate()", route_config_name_); @@ -263,6 +275,35 @@ void RdsRouteConfigProviderImpl::onConfigUpdate() { prev_config->config_ = new_config; return previous; }); + + const auto aliases = config_update_info_->resourceIdsInLastVhdsUpdate(); + // Regular (non-VHDS) RDS updates don't populate aliases fields in resources. + if (aliases.empty()) { + return; + } + + const auto config = std::static_pointer_cast(new_config); + // Notifies connections that RouteConfiguration update has been propagated. + // Callbacks processing is performed in FIFO order. The callback is skipped if alias used in + // the VHDS update request do not match the aliases in the update response + for (auto it = config_update_callbacks_.begin(); it != config_update_callbacks_.end();) { + auto found = aliases.find(it->alias_); + if (found != aliases.end()) { + // TODO(dmitri-d) HeaderMapImpl is expensive, need to profile this + Http::HeaderMapImpl host_header; + host_header.setHost(VhdsSubscription::aliasToDomainName(it->alias_)); + const bool host_exists = config->virtualHostExists(host_header); + auto current_cb = it->cb_; + it->thread_local_dispatcher_.post([current_cb, host_exists] { + if (auto cb = current_cb.lock()) { + (*cb)(host_exists); + } + }); + it = config_update_callbacks_.erase(it); + } else { + it++; + } + } } void RdsRouteConfigProviderImpl::validateConfig( @@ -271,6 +312,20 @@ void RdsRouteConfigProviderImpl::validateConfig( ConfigImpl validation_config(config, factory_context_, validator_, false); } +// Schedules a VHDS request on the main thread and queues up the callback to use when the VHDS +// response has been propagated to the worker thread that was the request origin. +void RdsRouteConfigProviderImpl::requestVirtualHostsUpdate( + const std::string& for_domain, Event::Dispatcher& thread_local_dispatcher, + std::weak_ptr route_config_updated_cb) { + auto alias = + VhdsSubscription::domainNameToAlias(config_update_info_->routeConfigName(), for_domain); + factory_context_.dispatcher().post([this, alias, &thread_local_dispatcher, + route_config_updated_cb]() -> void { + subscription_->updateOnDemand(alias); + config_update_callbacks_.push_back({alias, thread_local_dispatcher, route_config_updated_cb}); + }); +} + RouteConfigProviderManagerImpl::RouteConfigProviderManagerImpl(Server::Admin& admin) { config_tracker_entry_ = admin.getConfigTracker().add("routes", [this] { return dumpRouteConfigs(); }); diff --git a/source/common/router/rds_impl.h b/source/common/router/rds_impl.h index 8601c4194b09..22dd91612c1c 100644 --- a/source/common/router/rds_impl.h +++ b/source/common/router/rds_impl.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -76,6 +77,10 @@ class StaticRouteConfigProviderImpl : public RouteConfigProvider { SystemTime lastUpdated() const override { return last_updated_; } void onConfigUpdate() override {} void validateConfig(const envoy::config::route::v3alpha::RouteConfiguration&) const override {} + void requestVirtualHostsUpdate(const std::string&, Event::Dispatcher&, + std::weak_ptr) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } private: ConfigConstSharedPtr config_; @@ -114,6 +119,7 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks, return route_config_providers_; } RouteConfigUpdatePtr& routeConfigUpdate() { return config_update_info_; } + void updateOnDemand(const std::string& aliases); void maybeCreateInitManager(const std::string& version_info, std::unique_ptr& init_manager, std::unique_ptr& resume_rds); @@ -175,6 +181,12 @@ class RdsRouteConfigSubscription : Envoy::Config::SubscriptionCallbacks, using RdsRouteConfigSubscriptionSharedPtr = std::shared_ptr; +struct UpdateOnDemandCallback { + const std::string alias_; + Event::Dispatcher& thread_local_dispatcher_; + std::weak_ptr cb_; +}; + /** * Implementation of RouteConfigProvider that fetches the route configuration dynamically using * the subscription. @@ -193,6 +205,9 @@ class RdsRouteConfigProviderImpl : public RouteConfigProvider, } SystemTime lastUpdated() const override { return config_update_info_->lastUpdated(); } void onConfigUpdate() override; + void requestVirtualHostsUpdate( + const std::string& for_domain, Event::Dispatcher& thread_local_dispatcher, + std::weak_ptr route_config_updated_cb) override; void validateConfig(const envoy::config::route::v3alpha::RouteConfiguration& config) const override; @@ -210,6 +225,7 @@ class RdsRouteConfigProviderImpl : public RouteConfigProvider, Server::Configuration::ServerFactoryContext& factory_context_; ProtobufMessage::ValidationVisitor& validator_; ThreadLocal::SlotPtr tls_; + std::list config_update_callbacks_; friend class RouteConfigProviderManagerImpl; }; diff --git a/source/common/router/route_config_update_receiver_impl.cc b/source/common/router/route_config_update_receiver_impl.cc index 7b67b6559999..b3108414f0c0 100644 --- a/source/common/router/route_config_update_receiver_impl.cc +++ b/source/common/router/route_config_update_receiver_impl.cc @@ -21,48 +21,79 @@ bool RouteConfigUpdateReceiverImpl::onRdsUpdate( if (new_hash == last_config_hash_) { return false; } - route_config_proto_ = rc; last_config_hash_ = new_hash; + const uint64_t new_vhds_config_hash = rc.has_vhds() ? MessageUtil::hash(rc.vhds()) : 0ul; + vhds_configuration_changed_ = new_vhds_config_hash != last_vhds_config_hash_; + last_vhds_config_hash_ = new_vhds_config_hash; + initializeRdsVhosts(route_config_proto_); + onUpdateCommon(route_config_proto_, version_info); + return true; +} + +void RouteConfigUpdateReceiverImpl::onUpdateCommon( + const envoy::config::route::v3alpha::RouteConfiguration& rc, const std::string& version_info) { last_config_version_ = version_info; last_updated_ = time_source_.systemTime(); - initializeVhosts(route_config_proto_); - config_info_.emplace(RouteConfigProvider::ConfigInfo{route_config_proto_, last_config_version_}); - return true; + rebuildRouteConfig(rds_virtual_hosts_, vhds_virtual_hosts_, route_config_proto_); + config_info_.emplace(RouteConfigProvider::ConfigInfo{rc, last_config_version_}); } bool RouteConfigUpdateReceiverImpl::onVhdsUpdate( const Protobuf::RepeatedPtrField& added_resources, const Protobuf::RepeatedPtrField& removed_resources, const std::string& version_info) { - removeVhosts(virtual_hosts_, removed_resources); - updateVhosts(virtual_hosts_, added_resources); - rebuildRouteConfig(virtual_hosts_, route_config_proto_); + collectResourceIdsInUpdate(added_resources); + const bool removed = removeVhosts(vhds_virtual_hosts_, removed_resources); + const bool updated = updateVhosts(vhds_virtual_hosts_, added_resources); + onUpdateCommon(route_config_proto_, version_info); + return removed || updated || !resource_ids_in_last_update_.empty(); +} - return onRdsUpdate(route_config_proto_, version_info); +void RouteConfigUpdateReceiverImpl::collectResourceIdsInUpdate( + const Protobuf::RepeatedPtrField& + added_resources) { + resource_ids_in_last_update_.clear(); + for (const auto& resource : added_resources) { + resource_ids_in_last_update_.emplace(resource.name()); + std::copy(resource.aliases().begin(), resource.aliases().end(), + std::inserter(resource_ids_in_last_update_, resource_ids_in_last_update_.end())); + } } -void RouteConfigUpdateReceiverImpl::initializeVhosts( +void RouteConfigUpdateReceiverImpl::initializeRdsVhosts( const envoy::config::route::v3alpha::RouteConfiguration& route_configuration) { - virtual_hosts_.clear(); + rds_virtual_hosts_.clear(); for (const auto& vhost : route_configuration.virtual_hosts()) { - virtual_hosts_.emplace(vhost.name(), vhost); + rds_virtual_hosts_.emplace(vhost.name(), vhost); } } -void RouteConfigUpdateReceiverImpl::removeVhosts( - std::unordered_map& vhosts, +bool RouteConfigUpdateReceiverImpl::removeVhosts( + std::map& vhosts, const Protobuf::RepeatedPtrField& removed_vhost_names) { + bool vhosts_removed = false; for (const auto& vhost_name : removed_vhost_names) { - vhosts.erase(vhost_name); + auto found = vhosts.find(vhost_name); + if (found != vhosts.end()) { + vhosts_removed = true; + vhosts.erase(vhost_name); + } } + return vhosts_removed; } -void RouteConfigUpdateReceiverImpl::updateVhosts( - std::unordered_map& vhosts, +bool RouteConfigUpdateReceiverImpl::updateVhosts( + std::map& vhosts, const Protobuf::RepeatedPtrField& added_resources) { + bool vhosts_added = false; for (const auto& resource : added_resources) { + // the management server returns empty resources (they contain no virtual hosts in this case) + // for aliases that it couldn't resolve. + if (onDemandFetchFailed(resource)) { + continue; + } envoy::config::route::v3alpha::VirtualHost vhost = MessageUtil::anyConvert(resource.resource()); MessageUtil::validate(vhost, validation_visitor_); @@ -71,16 +102,27 @@ void RouteConfigUpdateReceiverImpl::updateVhosts( vhosts.erase(found); } vhosts.emplace(vhost.name(), vhost); + vhosts_added = true; } + return vhosts_added; } void RouteConfigUpdateReceiverImpl::rebuildRouteConfig( - const std::unordered_map& vhosts, + const std::map& rds_vhosts, + const std::map& vhds_vhosts, envoy::config::route::v3alpha::RouteConfiguration& route_config) { route_config.clear_virtual_hosts(); - for (const auto& vhost : vhosts) { + for (const auto& vhost : rds_vhosts) { route_config.mutable_virtual_hosts()->Add()->CopyFrom(vhost.second); } + for (const auto& vhost : vhds_vhosts) { + route_config.mutable_virtual_hosts()->Add()->CopyFrom(vhost.second); + } +} + +bool RouteConfigUpdateReceiverImpl::onDemandFetchFailed( + const envoy::service::discovery::v3alpha::Resource& resource) const { + return !resource.has_resource(); } } // namespace Router diff --git a/source/common/router/route_config_update_receiver_impl.h b/source/common/router/route_config_update_receiver_impl.h index 2cf3ead863e6..e61fd878a2b7 100644 --- a/source/common/router/route_config_update_receiver_impl.h +++ b/source/common/router/route_config_update_receiver_impl.h @@ -19,21 +19,26 @@ class RouteConfigUpdateReceiverImpl : public RouteConfigUpdateReceiver { public: RouteConfigUpdateReceiverImpl(TimeSource& time_source, ProtobufMessage::ValidationVisitor& validation_visitor) - : time_source_(time_source), last_config_hash_(0ull), - validation_visitor_(validation_visitor) {} + : time_source_(time_source), last_config_hash_(0ull), last_vhds_config_hash_(0ul), + validation_visitor_(validation_visitor), vhds_configuration_changed_(true) {} void - initializeVhosts(const envoy::config::route::v3alpha::RouteConfiguration& route_configuration); - void - removeVhosts(std::unordered_map& vhosts, - const Protobuf::RepeatedPtrField& removed_vhost_names); - void - updateVhosts(std::unordered_map& vhosts, - const Protobuf::RepeatedPtrField& - added_resources); + initializeRdsVhosts(const envoy::config::route::v3alpha::RouteConfiguration& route_configuration); + void collectResourceIdsInUpdate( + const Protobuf::RepeatedPtrField& + added_resources); + bool removeVhosts(std::map& vhosts, + const Protobuf::RepeatedPtrField& removed_vhost_names); + bool updateVhosts(std::map& vhosts, + const Protobuf::RepeatedPtrField& + added_resources); void rebuildRouteConfig( - const std::unordered_map& vhosts, + const std::map& rds_vhosts, + const std::map& vhds_vhosts, envoy::config::route::v3alpha::RouteConfiguration& route_config); + bool onDemandFetchFailed(const envoy::service::discovery::v3alpha::Resource& resource) const; + void onUpdateCommon(const envoy::config::route::v3alpha::RouteConfiguration& rc, + const std::string& version_info); // Router::RouteConfigUpdateReceiver bool onRdsUpdate(const envoy::config::route::v3alpha::RouteConfiguration& rc, @@ -48,20 +53,28 @@ class RouteConfigUpdateReceiverImpl : public RouteConfigUpdateReceiver { absl::optional configInfo() const override { return config_info_; } + bool vhdsConfigurationChanged() const override { return vhds_configuration_changed_; } const envoy::config::route::v3alpha::RouteConfiguration& routeConfiguration() override { return route_config_proto_; } SystemTime lastUpdated() const override { return last_updated_; } + const std::set& resourceIdsInLastVhdsUpdate() override { + return resource_ids_in_last_update_; + } private: TimeSource& time_source_; envoy::config::route::v3alpha::RouteConfiguration route_config_proto_; uint64_t last_config_hash_; + uint64_t last_vhds_config_hash_; std::string last_config_version_; SystemTime last_updated_; - std::unordered_map virtual_hosts_; + std::map rds_virtual_hosts_; + std::map vhds_virtual_hosts_; absl::optional config_info_; ProtobufMessage::ValidationVisitor& validation_visitor_; + std::set resource_ids_in_last_update_; + bool vhds_configuration_changed_; }; } // namespace Router diff --git a/source/common/router/vhds.cc b/source/common/router/vhds.cc index fdd6f042c018..fbe50d102325 100644 --- a/source/common/router/vhds.cc +++ b/source/common/router/vhds.cc @@ -47,6 +47,10 @@ VhdsSubscription::VhdsSubscription(RouteConfigUpdatePtr& config_update_info, loadTypeUrl(resource_api_version), *scope_, *this); } +void VhdsSubscription::updateOnDemand(const std::string& with_route_config_name_prefix) { + subscription_->updateResourceInterest({with_route_config_name_prefix}); +} + void VhdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason, const EnvoyException*) { ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason); diff --git a/source/common/router/vhds.h b/source/common/router/vhds.h index 5f1f9d466b4a..41eb42c2eb08 100644 --- a/source/common/router/vhds.h +++ b/source/common/router/vhds.h @@ -46,6 +46,15 @@ class VhdsSubscription : Envoy::Config::SubscriptionCallbacks, ~VhdsSubscription() override { init_target_.ready(); } void registerInitTargetWithInitManager(Init::Manager& m) { m.add(init_target_); } + void updateOnDemand(const std::string& with_route_config_name_prefix); + static std::string domainNameToAlias(const std::string& route_config_name, + const std::string& domain) { + return route_config_name + "/" + domain; + } + static std::string aliasToDomainName(const std::string& alias) { + const auto pos = alias.find_last_of("/"); + return pos == std::string::npos ? alias : alias.substr(pos + 1); + } private: // Config::SubscriptionCallbacks diff --git a/source/extensions/extensions_build_config.bzl b/source/extensions/extensions_build_config.bzl index 795a12c607c6..7ed275617ce1 100644 --- a/source/extensions/extensions_build_config.bzl +++ b/source/extensions/extensions_build_config.bzl @@ -52,6 +52,7 @@ EXTENSIONS = { "envoy.filters.http.ip_tagging": "//source/extensions/filters/http/ip_tagging:config", "envoy.filters.http.jwt_authn": "//source/extensions/filters/http/jwt_authn:config", "envoy.filters.http.lua": "//source/extensions/filters/http/lua:config", + "envoy.filters.http.on_demand": "//source/extensions/filters/http/on_demand:config", "envoy.filters.http.original_src": "//source/extensions/filters/http/original_src:config", "envoy.filters.http.ratelimit": "//source/extensions/filters/http/ratelimit:config", "envoy.filters.http.rbac": "//source/extensions/filters/http/rbac:config", diff --git a/source/extensions/filters/http/on_demand/BUILD b/source/extensions/filters/http/on_demand/BUILD new file mode 100644 index 000000000000..2332afdae292 --- /dev/null +++ b/source/extensions/filters/http/on_demand/BUILD @@ -0,0 +1,41 @@ +licenses(["notice"]) # Apache 2 + +# On-demand RDS update HTTP filter + +load( + "//bazel:envoy_build_system.bzl", + "envoy_cc_extension", + "envoy_cc_library", + "envoy_package", +) + +envoy_package() + +envoy_cc_library( + name = "on_demand_update_lib", + srcs = ["on_demand_update.cc"], + hdrs = ["on_demand_update.h"], + deps = [ + "//include/envoy/event:dispatcher_interface", + "//include/envoy/http:filter_interface", + "//include/envoy/server:filter_config_interface", + "//source/common/common:assert_lib", + "//source/common/common:enum_to_int", + "//source/common/http:codes_lib", + "//source/common/http:header_map_lib", + ], +) + +envoy_cc_extension( + name = "config", + srcs = ["config.cc"], + hdrs = ["config.h"], + security_posture = "robust_to_untrusted_downstream", + deps = [ + "//include/envoy/registry", + "//source/extensions/filters/http:well_known_names", + "//source/extensions/filters/http/common:factory_base_lib", + "//source/extensions/filters/http/on_demand:on_demand_update_lib", + "@envoy_api//envoy/config/filter/http/on_demand/v2:pkg_cc_proto", + ], +) diff --git a/source/extensions/filters/http/on_demand/config.cc b/source/extensions/filters/http/on_demand/config.cc new file mode 100644 index 000000000000..b86756078c58 --- /dev/null +++ b/source/extensions/filters/http/on_demand/config.cc @@ -0,0 +1,29 @@ +#include "extensions/filters/http/on_demand/config.h" + +#include "envoy/config/filter/http/on_demand/v2/on_demand.pb.validate.h" + +#include "extensions/filters/http/on_demand/on_demand_update.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace OnDemand { + +Http::FilterFactoryCb OnDemandFilterFactory::createFilterFactoryFromProtoTyped( + const envoy::config::filter::http::on_demand::v2::OnDemand&, const std::string&, + Server::Configuration::FactoryContext&) { + return [](Http::FilterChainFactoryCallbacks& callbacks) -> void { + callbacks.addStreamDecoderFilter( + std::make_shared()); + }; +} + +/** + * Static registration for the on-demand filter. @see RegisterFactory. + */ +REGISTER_FACTORY(OnDemandFilterFactory, Server::Configuration::NamedHttpFilterConfigFactory); + +} // namespace OnDemand +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/on_demand/config.h b/source/extensions/filters/http/on_demand/config.h new file mode 100644 index 000000000000..1f63e9cc53f4 --- /dev/null +++ b/source/extensions/filters/http/on_demand/config.h @@ -0,0 +1,30 @@ +#pragma once + +#include "envoy/config/filter/http/on_demand/v2/on_demand.pb.h" + +#include "extensions/filters/http/common/factory_base.h" +#include "extensions/filters/http/well_known_names.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace OnDemand { + +/** + * Config registration for the OnDemand filter. @see NamedHttpFilterConfigFactory. + */ +class OnDemandFilterFactory + : public Common::FactoryBase { +public: + OnDemandFilterFactory() : FactoryBase(HttpFilterNames::get().OnDemand) {} + +private: + Http::FilterFactoryCb createFilterFactoryFromProtoTyped( + const envoy::config::filter::http::on_demand::v2::OnDemand& proto_config, const std::string&, + Server::Configuration::FactoryContext& context) override; +}; + +} // namespace OnDemand +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/on_demand/on_demand_update.cc b/source/extensions/filters/http/on_demand/on_demand_update.cc new file mode 100644 index 000000000000..55c56a875ac3 --- /dev/null +++ b/source/extensions/filters/http/on_demand/on_demand_update.cc @@ -0,0 +1,61 @@ +#include "extensions/filters/http/on_demand/on_demand_update.h" + +#include "common/common/assert.h" +#include "common/common/enum_to_int.h" +#include "common/http/codes.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace OnDemand { + +Http::FilterHeadersStatus OnDemandRouteUpdate::decodeHeaders(Http::HeaderMap&, bool) { + if (callbacks_->route() != nullptr || + !(callbacks_->routeConfig().has_value() && callbacks_->routeConfig().value()->usesVhds())) { + filter_iteration_state_ = Http::FilterHeadersStatus::Continue; + return filter_iteration_state_; + } + route_config_updated_callback_ = + std::make_shared(Http::RouteConfigUpdatedCallback( + [this](bool route_exists) -> void { onRouteConfigUpdateCompletion(route_exists); })); + callbacks_->requestRouteConfigUpdate(route_config_updated_callback_); + filter_iteration_state_ = Http::FilterHeadersStatus::StopIteration; + return filter_iteration_state_; +} + +Http::FilterDataStatus OnDemandRouteUpdate::decodeData(Buffer::Instance&, bool) { + return filter_iteration_state_ == Http::FilterHeadersStatus::StopIteration + ? Http::FilterDataStatus::StopIterationAndWatermark + : Http::FilterDataStatus::Continue; +} + +Http::FilterTrailersStatus OnDemandRouteUpdate::decodeTrailers(Http::HeaderMap&) { + return Http::FilterTrailersStatus::Continue; +} + +void OnDemandRouteUpdate::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) { + callbacks_ = &callbacks; +} + +// This is the callback which is called when an update requested in requestRouteConfigUpdate() +// has been propagated to workers, at which point the request processing is restarted from the +// beginning. +void OnDemandRouteUpdate::onRouteConfigUpdateCompletion(bool route_exists) { + filter_iteration_state_ = Http::FilterHeadersStatus::Continue; + + if (route_exists && // route can be resolved after an on-demand + // VHDS update + !callbacks_->decodingBuffer() && // Redirects with body not yet supported. + callbacks_->recreateStream()) { + return; + } + + // route cannot be resolved after an on-demand VHDS update or + // recreating stream failed, continue the filter-chain + callbacks_->continueDecoding(); +} + +} // namespace OnDemand +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/on_demand/on_demand_update.h b/source/extensions/filters/http/on_demand/on_demand_update.h new file mode 100644 index 000000000000..3bfeff60dbc5 --- /dev/null +++ b/source/extensions/filters/http/on_demand/on_demand_update.h @@ -0,0 +1,40 @@ +#pragma once + +#include "envoy/http/filter.h" + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace OnDemand { + +class OnDemandRouteUpdate : public Http::StreamDecoderFilter { +public: + OnDemandRouteUpdate() = default; + + void onRouteConfigUpdateCompletion(bool route_exists); + + void setFilterIterationState(Envoy::Http::FilterHeadersStatus status) { + filter_iteration_state_ = status; + } + + // Http::StreamDecoderFilter + Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& headers, bool end_stream) override; + + Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override; + + Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap& trailers) override; + + void setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) override; + + void onDestroy() override {} + +private: + Http::StreamDecoderFilterCallbacks* callbacks_{}; + Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_callback_; + Envoy::Http::FilterHeadersStatus filter_iteration_state_{Http::FilterHeadersStatus::Continue}; +}; + +} // namespace OnDemand +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy diff --git a/source/extensions/filters/http/router/BUILD b/source/extensions/filters/http/router/BUILD index 7d43a8bae8ec..918d431ad637 100644 --- a/source/extensions/filters/http/router/BUILD +++ b/source/extensions/filters/http/router/BUILD @@ -22,6 +22,7 @@ envoy_cc_extension( "//source/common/router:shadow_writer_lib", "//source/extensions/filters/http:well_known_names", "//source/extensions/filters/http/common:factory_base_lib", + "//source/extensions/filters/http/on_demand:on_demand_update_lib", "@envoy_api//envoy/extensions/filters/http/router/v3alpha:pkg_cc_proto", ], ) diff --git a/source/extensions/filters/http/well_known_names.h b/source/extensions/filters/http/well_known_names.h index 75307d2f04ba..897ba25e5999 100644 --- a/source/extensions/filters/http/well_known_names.h +++ b/source/extensions/filters/http/well_known_names.h @@ -44,6 +44,8 @@ class HttpFilterNameValues { const std::string HealthCheck = "envoy.health_check"; // Lua filter const std::string Lua = "envoy.lua"; + // On-demand RDS updates filter + const std::string OnDemand = "envoy.on_demand"; // Squash filter const std::string Squash = "envoy.squash"; // External Authorization filter diff --git a/source/server/http/admin.h b/source/server/http/admin.h index ba3b9bc39491..58e9fb7aca29 100644 --- a/source/server/http/admin.h +++ b/source/server/http/admin.h @@ -184,6 +184,10 @@ class AdminImpl : public Admin, SystemTime lastUpdated() const override { return time_source_.systemTime(); } void onConfigUpdate() override {} void validateConfig(const envoy::config::route::v3alpha::RouteConfiguration&) const override {} + void requestVirtualHostsUpdate(const std::string&, Event::Dispatcher&, + std::weak_ptr) override { + NOT_IMPLEMENTED_GCOVR_EXCL_LINE; + } Router::ConfigConstSharedPtr config_; TimeSource& time_source_; diff --git a/test/common/config/new_grpc_mux_impl_test.cc b/test/common/config/new_grpc_mux_impl_test.cc index 3f475f00826d..f3ea773c4c49 100644 --- a/test/common/config/new_grpc_mux_impl_test.cc +++ b/test/common/config/new_grpc_mux_impl_test.cc @@ -118,6 +118,74 @@ TEST_F(NewGrpcMuxImplTest, DiscoveryResponseNonexistentSub) { } } +// DeltaDiscoveryResponse that comes in response to an on-demand request updates the watch with +// resource's name. The watch is initially created with an alias used in the on-demand request. +TEST_F(NewGrpcMuxImplTest, ConfigUpdateWithAliases) { + setup(); + + const std::string& type_url = Config::TypeUrl::get().VirtualHost; + auto* watch = grpc_mux_->addOrUpdateWatch(type_url, nullptr, {"domain1.test"}, callbacks_, + std::chrono::milliseconds(0)); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + grpc_mux_->start(); + + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("1"); + + envoy::config::route::v3alpha::VirtualHost vhost; + vhost.set_name("vhost_1"); + vhost.add_domains("domain1.test"); + vhost.add_domains("domain2.test"); + + response->add_resources()->mutable_resource()->PackFrom(vhost); + response->mutable_resources()->at(0).set_name("vhost_1"); + response->mutable_resources()->at(0).add_aliases("domain1.test"); + response->mutable_resources()->at(0).add_aliases("domain2.test"); + + grpc_mux_->onDiscoveryResponse(std::move(response)); + + const auto& subscriptions = grpc_mux_->subscriptions(); + auto sub = subscriptions.find(type_url); + + EXPECT_TRUE(sub != subscriptions.end()); + const auto found_resource_names = sub->second->watch_map_.updateWatchInterest(watch, {}); + EXPECT_TRUE(found_resource_names.removed_.find("vhost_1") != found_resource_names.removed_.end()); +} + +// DeltaDiscoveryResponse that comes in response to an on-demand request that couldn't be resolved +// will contain an empty Resource. The Resource's aliases field will be populated with the alias +// originally used in the request. +TEST_F(NewGrpcMuxImplTest, ConfigUpdateWithNotFoundResponse) { + setup(); + + const std::string& type_url = Config::TypeUrl::get().VirtualHost; + auto* watch = grpc_mux_->addOrUpdateWatch(type_url, nullptr, {"domain1.test"}, callbacks_, + std::chrono::milliseconds(0)); + + EXPECT_CALL(*async_client_, startRaw(_, _, _, _)).WillOnce(Return(&async_stream_)); + grpc_mux_->start(); + + auto response = std::make_unique(); + response->set_type_url(type_url); + response->set_system_version_info("1"); + + response->add_resources(); + response->mutable_resources()->at(0).set_name("not-found"); + response->mutable_resources()->at(0).add_aliases("domain1.test"); + + grpc_mux_->onDiscoveryResponse(std::move(response)); + + const auto& subscriptions = grpc_mux_->subscriptions(); + auto sub = subscriptions.find(type_url); + + EXPECT_TRUE(sub != subscriptions.end()); + const auto found_resource_names = sub->second->watch_map_.updateWatchInterest(watch, {}); + EXPECT_TRUE(found_resource_names.removed_.find("not-found") != + found_resource_names.removed_.end()); +} + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/common/config/watch_map_test.cc b/test/common/config/watch_map_test.cc index 4c75855c89f7..6148cb3766ad 100644 --- a/test/common/config/watch_map_test.cc +++ b/test/common/config/watch_map_test.cc @@ -427,6 +427,47 @@ TEST(WatchMapTest, OnConfigUpdateFailed) { watch_map.onConfigUpdateFailed(ConfigUpdateFailureReason::UpdateRejected, nullptr); } +// verifies that a watch is updated with the resource name +TEST(WatchMapTest, ConvertAliasWatchesToNameWatches) { + NamedMockSubscriptionCallbacks callbacks; + WatchMap watch_map; + Watch* watch = watch_map.addWatch(callbacks); + watch_map.updateWatchInterest(watch, {"alias"}); + + envoy::service::discovery::v3alpha::Resource resource; + resource.set_name("resource"); + resource.set_version("version"); + for (const auto alias : {"alias", "alias1", "alias2"}) { + resource.add_aliases(alias); + } + + AddedRemoved converted = watch_map.convertAliasWatchesToNameWatches(resource); + + EXPECT_EQ(std::set{"resource"}, converted.added_); + EXPECT_EQ(std::set{"alias"}, converted.removed_); +} + +// verifies that if a resource contains an alias the same as its name, and the watch has been set +// with that alias, the watch won't be updated +TEST(WatchMapTest, ConvertAliasWatchesToNameWatchesAliasIsSameAsName) { + NamedMockSubscriptionCallbacks callbacks; + WatchMap watch_map; + Watch* watch = watch_map.addWatch(callbacks); + watch_map.updateWatchInterest(watch, {"name-and-alias"}); + + envoy::service::discovery::v3alpha::Resource resource; + resource.set_name("name-and-alias"); + resource.set_version("version"); + for (const auto alias : {"name-and-alias", "alias1", "alias2"}) { + resource.add_aliases(alias); + } + + AddedRemoved converted = watch_map.convertAliasWatchesToNameWatches(resource); + + EXPECT_TRUE(converted.added_.empty()); + EXPECT_TRUE(converted.removed_.empty()); +} + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/common/http/conn_manager_impl_fuzz_test.cc b/test/common/http/conn_manager_impl_fuzz_test.cc index 528f50da2db4..1d19543d5787 100644 --- a/test/common/http/conn_manager_impl_fuzz_test.cc +++ b/test/common/http/conn_manager_impl_fuzz_test.cc @@ -48,6 +48,19 @@ namespace Http { class FuzzConfig : public ConnectionManagerConfig { public: + struct RouteConfigProvider : public Router::RouteConfigProvider { + RouteConfigProvider(TimeSource& time_source) : time_source_(time_source) {} + + // Router::RouteConfigProvider + Router::ConfigConstSharedPtr config() override { return route_config_; } + absl::optional configInfo() const override { return {}; } + SystemTime lastUpdated() const override { return time_source_.systemTime(); } + void onConfigUpdate() override {} + + TimeSource& time_source_; + std::shared_ptr route_config_{new NiceMock()}; + }; + FuzzConfig() : stats_{{ALL_HTTP_CONN_MAN_STATS(POOL_COUNTER(fake_stats_), POOL_GAUGE(fake_stats_), POOL_HISTOGRAM(fake_stats_))}, diff --git a/test/common/http/conn_manager_impl_test.cc b/test/common/http/conn_manager_impl_test.cc index 13875418a3d7..a1ee2e3fed90 100644 --- a/test/common/http/conn_manager_impl_test.cc +++ b/test/common/http/conn_manager_impl_test.cc @@ -69,6 +69,19 @@ namespace Http { class HttpConnectionManagerImplTest : public testing::Test, public ConnectionManagerConfig { public: + struct RouteConfigProvider : public Router::RouteConfigProvider { + RouteConfigProvider(TimeSource& time_source) : time_source_(time_source) {} + + // Router::RouteConfigProvider + Router::ConfigConstSharedPtr config() override { return route_config_; } + absl::optional configInfo() const override { return {}; } + SystemTime lastUpdated() const override { return time_source_.systemTime(); } + void onConfigUpdate() override {} + + TimeSource& time_source_; + std::shared_ptr route_config_{new NiceMock()}; + }; + HttpConnectionManagerImplTest() : http_context_(fake_stats_.symbolTable()), access_log_path_("dummy_path"), access_logs_{ diff --git a/test/common/router/vhds_test.cc b/test/common/router/vhds_test.cc index 8807adf27d4b..2cfeadf4fcfb 100644 --- a/test/common/router/vhds_test.cc +++ b/test/common/router/vhds_test.cc @@ -143,8 +143,8 @@ TEST_F(VhdsTest, VhdsAddsVirtualHosts) { messageDifferencer_.Equals(vhost, config_update_info->routeConfiguration().virtual_hosts(0))); } -// verify addition/updating of virtual hosts to already existing ones -TEST_F(VhdsTest, VhdsAddsToExistingVirtualHosts) { +// verify that an RDS update of virtual hosts leaves VHDS virtual hosts intact +TEST_F(VhdsTest, RdsUpdatesVirtualHosts) { const auto route_config = TestUtility::parseYaml(R"EOF( name: my_route @@ -162,29 +162,7 @@ name: my_route envoy_grpc: cluster_name: xds_cluster )EOF"); - RouteConfigUpdatePtr config_update_info = makeRouteConfigUpdate(route_config); - - VhdsSubscription subscription(config_update_info, factory_context_, context_, providers_); - EXPECT_EQ(1UL, config_update_info->routeConfiguration().virtual_hosts_size()); - EXPECT_EQ("vhost_rds1", config_update_info->routeConfiguration().virtual_hosts(0).name()); - - auto vhost = buildVirtualHost("vhost1", "vhost.first"); - const auto& added_resources = buildAddedResources({vhost}); - const Protobuf::RepeatedPtrField removed_resources; - factory_context_.cluster_manager_.subscription_factory_.callbacks_->onConfigUpdate( - added_resources, removed_resources, "1"); - - EXPECT_EQ(2UL, config_update_info->routeConfiguration().virtual_hosts_size()); - auto actual_vhost_0 = config_update_info->routeConfiguration().virtual_hosts(0); - auto actual_vhost_1 = config_update_info->routeConfiguration().virtual_hosts(1); - EXPECT_TRUE("vhost_rds1" == actual_vhost_0.name() || "vhost_rds1" == actual_vhost_1.name()); - EXPECT_TRUE(messageDifferencer_.Equals(vhost, actual_vhost_0) || - messageDifferencer_.Equals(vhost, actual_vhost_1)); -} - -// verify removal of virtual hosts -TEST_F(VhdsTest, VhdsRemovesAnExistingVirtualHost) { - const auto route_config = + const auto updated_route_config = TestUtility::parseYaml(R"EOF( name: my_route virtual_hosts: @@ -193,38 +171,11 @@ name: my_route routes: - match: { prefix: "/rdsone" } route: { cluster: my_service } -vhds: - config_source: - api_config_source: - api_type: DELTA_GRPC - grpc_services: - envoy_grpc: - cluster_name: xds_cluster - )EOF"); - RouteConfigUpdatePtr config_update_info = makeRouteConfigUpdate(route_config); - VhdsSubscription subscription(config_update_info, factory_context_, context_, providers_); - EXPECT_EQ(1UL, config_update_info->routeConfiguration().virtual_hosts_size()); - EXPECT_EQ("vhost_rds1", config_update_info->routeConfiguration().virtual_hosts(0).name()); - - const Protobuf::RepeatedPtrField added_resources; - const auto removed_resources = buildRemovedResources({"vhost_rds1"}); - factory_context_.cluster_manager_.subscription_factory_.callbacks_->onConfigUpdate( - added_resources, removed_resources, "1"); - - EXPECT_EQ(0UL, config_update_info->routeConfiguration().virtual_hosts_size()); -} - -// verify vhds overwrites existing virtual hosts -TEST_F(VhdsTest, VhdsOverwritesAnExistingVirtualHost) { - const auto route_config = - TestUtility::parseYaml(R"EOF( -name: my_route -virtual_hosts: -- name: vhost_rds1 - domains: ["vhost.rds.first"] +- name: vhost_rds2 + domains: ["vhost.rds.second"] routes: - - match: { prefix: "/rdsone" } - route: { cluster: my_service } + - match: { prefix: "/rdstwo" } + route: { cluster: my_other_service } vhds: config_source: api_config_source: @@ -239,15 +190,25 @@ name: my_route EXPECT_EQ(1UL, config_update_info->routeConfiguration().virtual_hosts_size()); EXPECT_EQ("vhost_rds1", config_update_info->routeConfiguration().virtual_hosts(0).name()); - auto vhost = buildVirtualHost("vhost_rds1", "vhost.rds.first.mk2"); + auto vhost = buildVirtualHost("vhost_vhds1", "vhost.first"); const auto& added_resources = buildAddedResources({vhost}); const Protobuf::RepeatedPtrField removed_resources; factory_context_.cluster_manager_.subscription_factory_.callbacks_->onConfigUpdate( added_resources, removed_resources, "1"); + EXPECT_EQ(2UL, config_update_info->routeConfiguration().virtual_hosts_size()); - EXPECT_EQ(1UL, config_update_info->routeConfiguration().virtual_hosts_size()); - EXPECT_TRUE( - messageDifferencer_.Equals(vhost, config_update_info->routeConfiguration().virtual_hosts(0))); + config_update_info->onRdsUpdate(updated_route_config, "2"); + + EXPECT_EQ(3UL, config_update_info->routeConfiguration().virtual_hosts_size()); + auto actual_vhost_0 = config_update_info->routeConfiguration().virtual_hosts(0); + auto actual_vhost_1 = config_update_info->routeConfiguration().virtual_hosts(1); + auto actual_vhost_2 = config_update_info->routeConfiguration().virtual_hosts(2); + EXPECT_TRUE("vhost_rds1" == actual_vhost_0.name() || "vhost_rds1" == actual_vhost_1.name() || + "vhost_rds1" == actual_vhost_2.name()); + EXPECT_TRUE("vhost_rds2" == actual_vhost_0.name() || "vhost_rds2" == actual_vhost_1.name() || + "vhost_rds2" == actual_vhost_2.name()); + EXPECT_TRUE("vhost_vhds1" == actual_vhost_0.name() || "vhost_vhds1" == actual_vhost_1.name() || + "vhost_vhds1" == actual_vhost_2.name()); } // verify vhds validates VirtualHosts in added_resources diff --git a/test/extensions/filters/http/on_demand/BUILD b/test/extensions/filters/http/on_demand/BUILD new file mode 100644 index 000000000000..9a5acca7688f --- /dev/null +++ b/test/extensions/filters/http/on_demand/BUILD @@ -0,0 +1,26 @@ +licenses(["notice"]) # Apache 2 + +load( + "//bazel:envoy_build_system.bzl", + "envoy_package", +) +load( + "//test/extensions:extensions_build_system.bzl", + "envoy_extension_cc_test", +) + +envoy_package() + +envoy_extension_cc_test( + name = "on_demand_filter_test", + srcs = ["on_demand_filter_test.cc"], + extension_name = "envoy.filters.http.on_demand", + deps = [ + "//source/common/http:header_map_lib", + "//source/common/protobuf:utility_lib", + "//source/extensions/filters/http/on_demand:on_demand_update_lib", + "//test/mocks/http:http_mocks", + "//test/mocks/runtime:runtime_mocks", + "//test/test_common:utility_lib", + ], +) diff --git a/test/extensions/filters/http/on_demand/on_demand_filter_test.cc b/test/extensions/filters/http/on_demand/on_demand_filter_test.cc new file mode 100644 index 000000000000..a457266d7368 --- /dev/null +++ b/test/extensions/filters/http/on_demand/on_demand_filter_test.cc @@ -0,0 +1,110 @@ +#include + +#include "common/http/header_map_impl.h" + +#include "extensions/filters/http/on_demand/on_demand_update.h" + +#include "test/mocks/http/mocks.h" +#include "test/mocks/runtime/mocks.h" +#include "test/test_common/utility.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::Return; + +namespace Envoy { +namespace Extensions { +namespace HttpFilters { +namespace OnDemand { + +class OnDemandFilterTest : public testing::Test { +public: + void SetUp() override { + filter_ = std::make_unique(); + filter_->setDecoderFilterCallbacks(decoder_callbacks_); + } + + std::unique_ptr filter_; + NiceMock decoder_callbacks_; +}; + +// tests decodeHeaders() when no cached route is available and vhds is configured +TEST_F(OnDemandFilterTest, TestDecodeHeaders) { + Http::HeaderMapImpl headers; + std::shared_ptr route_config_ptr{new NiceMock()}; + EXPECT_CALL(decoder_callbacks_, route()).WillOnce(Return(nullptr)); + EXPECT_CALL(decoder_callbacks_, routeConfig()).Times(2).WillRepeatedly(Return(route_config_ptr)); + EXPECT_CALL(*route_config_ptr, usesVhds()).WillOnce(Return(true)); + EXPECT_CALL(decoder_callbacks_, requestRouteConfigUpdate(_)); + EXPECT_EQ(Http::FilterHeadersStatus::StopIteration, filter_->decodeHeaders(headers, true)); +} + +// tests decodeHeaders() when no cached route is available +TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteAvailable) { + Http::HeaderMapImpl headers; + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(headers, true)); +} + +// tests decodeHeaders() when no route configuration is available +TEST_F(OnDemandFilterTest, TestDecodeHeadersWhenRouteConfigIsNotAvailable) { + Http::HeaderMapImpl headers; + std::shared_ptr route_config_ptr{new NiceMock()}; + EXPECT_CALL(decoder_callbacks_, route()).WillOnce(Return(nullptr)); + EXPECT_CALL(decoder_callbacks_, routeConfig()).WillOnce(Return(absl::nullopt)); + EXPECT_EQ(Http::FilterHeadersStatus::Continue, filter_->decodeHeaders(headers, true)); +} + +TEST_F(OnDemandFilterTest, TestDecodeTrailers) { + Http::HeaderMapImpl headers; + EXPECT_EQ(Http::FilterTrailersStatus::Continue, filter_->decodeTrailers(headers)); +} + +// tests decodeData() when filter state is Http::FilterHeadersStatus::Continue +TEST_F(OnDemandFilterTest, TestDecodeDataReturnsContinue) { + Buffer::OwnedImpl buffer; + filter_->setFilterIterationState(Http::FilterHeadersStatus::Continue); + EXPECT_EQ(Http::FilterDataStatus::Continue, filter_->decodeData(buffer, false)); +} + +// tests decodeData() when filter state is Http::FilterHeadersStatus::StopIteration +TEST_F(OnDemandFilterTest, TestDecodeDataReturnsStopIteration) { + Buffer::OwnedImpl buffer; + filter_->setFilterIterationState(Http::FilterHeadersStatus::StopIteration); + EXPECT_EQ(Http::FilterDataStatus::StopIterationAndWatermark, filter_->decodeData(buffer, false)); +} + +// tests onRouteConfigUpdateCompletion() route hasn't been resolved +TEST_F(OnDemandFilterTest, + TestOnRouteConfigUpdateCompletionContinuesDecodingWhenRouteDoesNotExist) { + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + filter_->onRouteConfigUpdateCompletion(false); +} + +// tests onRouteConfigUpdateCompletion() when redirect contains a body +TEST_F(OnDemandFilterTest, TestOnRouteConfigUpdateCompletionContinuesDecodingWithRedirectWithBody) { + Buffer::OwnedImpl buffer; + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(&buffer)); + filter_->onRouteConfigUpdateCompletion(true); +} + +// tests onRouteConfigUpdateCompletion() when ActiveStream recreation fails +TEST_F(OnDemandFilterTest, onRouteConfigUpdateCompletionContinuesDecodingIfRedirectFails) { + EXPECT_CALL(decoder_callbacks_, continueDecoding()); + EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(nullptr)); + EXPECT_CALL(decoder_callbacks_, recreateStream()).WillOnce(Return(false)); + filter_->onRouteConfigUpdateCompletion(true); +} + +// tests onRouteConfigUpdateCompletion() when route was resolved +TEST_F(OnDemandFilterTest, onRouteConfigUpdateCompletionRestartsActiveStream) { + EXPECT_CALL(decoder_callbacks_, decodingBuffer()).WillOnce(Return(nullptr)); + EXPECT_CALL(decoder_callbacks_, recreateStream()).WillOnce(Return(true)); + filter_->onRouteConfigUpdateCompletion(true); +} + +} // namespace OnDemand +} // namespace HttpFilters +} // namespace Extensions +} // namespace Envoy \ No newline at end of file diff --git a/test/extensions/filters/http/router/config_test.cc b/test/extensions/filters/http/router/config_test.cc index a7df8c3dd45b..bf5f8d7bd34b 100644 --- a/test/extensions/filters/http/router/config_test.cc +++ b/test/extensions/filters/http/router/config_test.cc @@ -79,7 +79,7 @@ TEST(RouterFilterConfigTest, RouterV2Filter) { RouterFilterConfig factory; Http::FilterFactoryCb cb = factory.createFilterFactoryFromProto(router_config, "stats.", context); Http::MockFilterChainFactoryCallbacks filter_callback; - EXPECT_CALL(filter_callback, addStreamDecoderFilter(_)); + EXPECT_CALL(filter_callback, addStreamDecoderFilter(_)).Times(1); cb(filter_callback); } @@ -89,7 +89,7 @@ TEST(RouterFilterConfigTest, RouterFilterWithEmptyProtoConfig) { Http::FilterFactoryCb cb = factory.createFilterFactoryFromProto(*factory.createEmptyConfigProto(), "stats.", context); Http::MockFilterChainFactoryCallbacks filter_callback; - EXPECT_CALL(filter_callback, addStreamDecoderFilter(_)); + EXPECT_CALL(filter_callback, addStreamDecoderFilter(_)).Times(1); cb(filter_callback); } diff --git a/test/extensions/filters/network/http_connection_manager/config_test.cc b/test/extensions/filters/network/http_connection_manager/config_test.cc index a34ef8174e41..9af56924fdb8 100644 --- a/test/extensions/filters/network/http_connection_manager/config_test.cc +++ b/test/extensions/filters/network/http_connection_manager/config_test.cc @@ -1105,13 +1105,13 @@ TEST_F(FilterChainTest, createCustomUpgradeFilterChain) { { Http::MockFilterChainFactoryCallbacks callbacks; - EXPECT_CALL(callbacks, addStreamDecoderFilter(_)); // Router + EXPECT_CALL(callbacks, addStreamDecoderFilter(_)).Times(1); EXPECT_TRUE(config.createUpgradeFilterChain("websocket", nullptr, callbacks)); } { Http::MockFilterChainFactoryCallbacks callbacks; - EXPECT_CALL(callbacks, addStreamDecoderFilter(_)); // Router + EXPECT_CALL(callbacks, addStreamDecoderFilter(_)).Times(1); EXPECT_CALL(callbacks, addStreamFilter(_)).Times(2); // Dynamo EXPECT_TRUE(config.createUpgradeFilterChain("Foo", nullptr, callbacks)); } diff --git a/test/integration/BUILD b/test/integration/BUILD index 2c7423931c77..178b4c241696 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -419,6 +419,7 @@ envoy_cc_test_library( ":test_host_predicate_lib", "//include/envoy/event:timer_interface", "//source/common/common:thread_annotations", + "//source/extensions/filters/http/on_demand:config", "//source/extensions/filters/http/router:config", "//source/extensions/filters/network/http_connection_manager:config", "//test/common/upstream:utility_lib", diff --git a/test/integration/integration.h b/test/integration/integration.h index 0b0c12274233..b4adeb2ee7a8 100644 --- a/test/integration/integration.h +++ b/test/integration/integration.h @@ -291,10 +291,21 @@ class BaseIntegrationTest : protected Logger::Loggable { sendDeltaDiscoveryResponse(type_url, added_or_updated, removed, version, xds_stream_); } template - void sendDeltaDiscoveryResponse(const std::string& type_url, - const std::vector& added_or_updated, - const std::vector& removed, - const std::string& version, FakeStreamPtr& stream) { + void + sendDeltaDiscoveryResponse(const std::string& type_url, const std::vector& added_or_updated, + const std::vector& removed, const std::string& version, + FakeStreamPtr& stream, const std::vector& aliases = {}) { + auto response = + createDeltaDiscoveryResponse(type_url, added_or_updated, removed, version, aliases); + stream->sendGrpcMessage(response); + } + + template + envoy::api::v2::DeltaDiscoveryResponse + createDeltaDiscoveryResponse(const std::string& type_url, const std::vector& added_or_updated, + const std::vector& removed, const std::string& version, + const std::vector& aliases) { + API_NO_BOOST(envoy::api::v2::DeltaDiscoveryResponse) response; response.set_system_version_info("system_version_info_this_is_a_test"); response.set_type_url(type_url); @@ -305,11 +316,14 @@ class BaseIntegrationTest : protected Logger::Loggable { resource->set_name(TestUtility::xdsResourceName(temp_any)); resource->set_version(version); resource->mutable_resource()->PackFrom(API_DOWNGRADE(message)); + for (const auto alias : aliases) { + resource->add_aliases(alias); + } } *response.mutable_removed_resources() = {removed.begin(), removed.end()}; static int next_nonce_counter = 0; response.set_nonce(absl::StrCat("nonce", next_nonce_counter++)); - stream->sendGrpcMessage(response); + return response; } private: diff --git a/test/integration/vhds_integration_test.cc b/test/integration/vhds_integration_test.cc index 8bba23b788ad..e8cdeb421489 100644 --- a/test/integration/vhds_integration_test.cc +++ b/test/integration/vhds_integration_test.cc @@ -60,6 +60,7 @@ const char Config[] = R"EOF( "@type": type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager stat_prefix: config_test http_filters: + - name: envoy.on_demand - name: envoy.router codec_type: HTTP2 rds: @@ -111,6 +112,8 @@ name: my_route cluster_name: xds_cluster )EOF"; +const std::string RouteConfigName = "my_route"; + const char VhostTemplate[] = R"EOF( name: {} domains: [{}] @@ -240,6 +243,10 @@ class VhdsIntegrationTest : public HttpIntegrationTest, return fmt::format(VhostTemplate, name, domain); } + std::string vhdsRequestResourceName(const std::string& host_header) { + return RouteConfigName + "/" + host_header; + } + envoy::config::route::v3alpha::VirtualHost buildVirtualHost() { return TestUtility::parseYaml( virtualHostYaml("vhost_0", "host")); @@ -317,14 +324,86 @@ class VhdsIntegrationTest : public HttpIntegrationTest, use_rds_with_vhosts ? RdsConfigWithVhosts : RdsConfig); } + void notifyAboutAliasResolutionFailure(const std::string& version, FakeStreamPtr& stream, + const std::vector& aliases = {}) { + envoy::api::v2::DeltaDiscoveryResponse response; + response.set_system_version_info("system_version_info_this_is_a_test"); + response.set_type_url(Config::TypeUrl::get().VirtualHost); + auto* resource = response.add_resources(); + resource->set_name("cannot-resolve-alias"); + resource->set_version(version); + for (const auto alias : aliases) { + resource->add_aliases(alias); + } + response.set_nonce("noncense"); + stream->sendGrpcMessage(response); + } + + void sendDeltaDiscoveryResponseWithUnresolvedAliases( + const std::vector& added_or_updated, + const std::vector& removed, const std::string& version, FakeStreamPtr& stream, + const std::vector& aliases, const std::vector& unresolved_aliases) { + auto response = createDeltaDiscoveryResponse( + Config::TypeUrl::get().VirtualHost, added_or_updated, removed, version, aliases); + for (const auto& unresolved_alias : unresolved_aliases) { + auto* resource = response.add_resources(); + resource->set_name(unresolved_alias); + resource->set_version(version); + resource->add_aliases(unresolved_alias); + } + stream->sendGrpcMessage(response); + } + + // used in VhdsOnDemandUpdateWithResourceNameAsAlias test + // to create a DeltaDiscoveryResponse with a resource name matching the value used to create an + // on-demand request + envoy::api::v2::DeltaDiscoveryResponse createDeltaDiscoveryResponseWithResourceNameUsedAsAlias() { + API_NO_BOOST(envoy::api::v2::DeltaDiscoveryResponse) ret; + ret.set_system_version_info("system_version_info_this_is_a_test"); + ret.set_type_url(Config::TypeUrl::get().VirtualHost); + + auto* resource = ret.add_resources(); + resource->set_name("my_route/vhost_1"); + resource->set_version("4"); + resource->mutable_resource()->PackFrom( + API_DOWNGRADE(TestUtility::parseYaml( + virtualHostYaml("vhost_1", "vhost_1, vhost.first")))); + resource->add_aliases("my_route/vhost.first"); + ret.set_nonce("test-nonce-0"); + + return ret; + } + FakeStreamPtr vhds_stream_; bool use_rds_with_vhosts{false}; }; +INSTANTIATE_TEST_SUITE_P(IpVersionsClientType, VhdsIntegrationTest, GRPC_CLIENT_INTEGRATION_PARAMS); + +TEST_P(VhdsIntegrationTest, RdsUpdateWithoutVHDSChangesDoesNotRestartVHDS) { + testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/", "host"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // Update RouteConfig, but don't change VHDS config + sendSotwDiscoveryResponse( + Config::TypeUrl::get().RouteConfiguration, + {TestUtility::parseYaml( + RdsConfigWithVhosts)}, + "2"); + + // Confirm vhost_0 that was originally configured via VHDS is reachable + testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/", "host"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); +} + // tests a scenario when: // - a spontaneous VHDS DiscoveryResponse adds two virtual hosts // - the next spontaneous VHDS DiscoveryResponse removes newly added virtual hosts -// - Upstream makes a request to an (now) unknown domain, which fails +// - Upstream makes a request to an (now) unknown domain +// - A VHDS DiscoveryResponse received containing update for the domain +// - Upstream receives a 200 response TEST_P(VhdsIntegrationTest, VhdsVirtualHostAddUpdateRemove) { // Calls our initialize(), which includes establishing a listener, route, and cluster. testRouterHeaderOnlyRequestAndResponse(nullptr, 1); @@ -353,13 +432,24 @@ TEST_P(VhdsIntegrationTest, VhdsVirtualHostAddUpdateRemove) { // an upstream request to an (now) unknown domain codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); Http::TestHeaderMapImpl request_headers{{":method", "GET"}, - {":path", "/one"}, + {":path", "/"}, {":scheme", "http"}, {":authority", "vhost.first"}, {"x-lyft-user-id", "123"}}; IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, + {vhdsRequestResourceName("vhost.first")}, {}, + vhds_stream_)); + sendDeltaDiscoveryResponse( + Config::TypeUrl::get().VirtualHost, {buildVirtualHost2()}, {}, "4", vhds_stream_, + {"my_route/vhost.first"}); + + waitForNextUpstreamRequest(1); + // Send response headers, and end_stream if there is no response body. + upstream_request_->encodeHeaders(default_response_headers_, true); + response->waitForHeaders(); - EXPECT_EQ("404", response->headers().Status()->value().getStringView()); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); cleanupUpstreamAndDownstream(); } @@ -368,7 +458,9 @@ TEST_P(VhdsIntegrationTest, VhdsVirtualHostAddUpdateRemove) { // - an RDS exchange contains a non-empty virtual_hosts array // - a spontaneous VHDS DiscoveryResponse adds two virtual hosts // - the next spontaneous VHDS DiscoveryResponse removes newly added virtual hosts -// - Upstream makes a request to an (now) unknown domain, which fails +// - Upstream makes a request to an (now) unknown domain +// - A VHDS DiscoveryResponse received containing update for the domain +// - Upstream receives a 200 response TEST_P(VhdsIntegrationTest, RdsWithVirtualHostsVhdsVirtualHostAddUpdateRemove) { // RDS exchange with a non-empty virtual_hosts field useRdsWithVhosts(); @@ -407,11 +499,146 @@ TEST_P(VhdsIntegrationTest, RdsWithVirtualHostsVhdsVirtualHostAddUpdateRemove) { codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); Http::TestHeaderMapImpl request_headers{{":method", "GET"}, - {":path", "/one"}, + {":path", "/"}, {":scheme", "http"}, {":authority", "vhost.first"}, {"x-lyft-user-id", "123"}}; IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, + {vhdsRequestResourceName("vhost.first")}, {}, + vhds_stream_)); + sendDeltaDiscoveryResponse( + Config::TypeUrl::get().VirtualHost, {buildVirtualHost2()}, {}, "4", vhds_stream_, + {"my_route/vhost.first"}); + + waitForNextUpstreamRequest(1); + // Send response headers, and end_stream if there is no response body. + upstream_request_->encodeHeaders(default_response_headers_, true); + + response->waitForHeaders(); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + + cleanupUpstreamAndDownstream(); +} + +// tests a scenario where: +// a Resource received in a DeltaDiscoveryResponse has name that matches the value used in the +// on-demand request +TEST_P(VhdsIntegrationTest, VhdsOnDemandUpdateWithResourceNameAsAlias) { + // RDS exchange with a non-empty virtual_hosts field + useRdsWithVhosts(); + + testRouterHeaderOnlyRequestAndResponse(nullptr, 1); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // verify that rds-based virtual host can be resolved + testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/rdsone", "vhost.rds.first"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // Attempt to make a request to an unknown host + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + Http::TestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "vhost_1"}, + {"x-lyft-user-id", "123"}}; + IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, + {vhdsRequestResourceName("vhost_1")}, {}, vhds_stream_)); + + envoy::api::v2::DeltaDiscoveryResponse vhds_update = + createDeltaDiscoveryResponseWithResourceNameUsedAsAlias(); + vhds_stream_->sendGrpcMessage(vhds_update); + + waitForNextUpstreamRequest(1); + // Send response headers, and end_stream if there is no response body. + upstream_request_->encodeHeaders(default_response_headers_, true); + + response->waitForHeaders(); + EXPECT_EQ("200", response->headers().Status()->value().getStringView()); + + cleanupUpstreamAndDownstream(); +} + +// tests a scenario when: +// - an RDS exchange contains a non-empty virtual_hosts array +// - a spontaneous VHDS DiscoveryResponse adds two virtual hosts +// - the next spontaneous VHDS DiscoveryResponse removes newly added virtual hosts +// - Upstream makes a request to an (now) unknown domain +// - A VHDS DiscoveryResponse received but contains no update for the domain (the management server +// couldn't resolve it) +// - Upstream receives a 404 response +TEST_P(VhdsIntegrationTest, VhdsOnDemandUpdateFailToResolveTheAlias) { + // RDS exchange with a non-empty virtual_hosts field + useRdsWithVhosts(); + + testRouterHeaderOnlyRequestAndResponse(nullptr, 1); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // verify that rds-based virtual host can be resolved + testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/rdsone", "vhost.rds.first"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // Attempt to make a request to an unknown host + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + Http::TestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "vhost.third"}, + {"x-lyft-user-id", "123"}}; + IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, + {vhdsRequestResourceName("vhost.third")}, {}, + vhds_stream_)); + // Send an empty response back (the management server isn't aware of vhost.third) + notifyAboutAliasResolutionFailure("4", vhds_stream_, {"my_route/vhost.third"}); + + response->waitForHeaders(); + EXPECT_EQ("404", response->headers().Status()->value().getStringView()); + + cleanupUpstreamAndDownstream(); +} + +// tests a scenario when: +// - an RDS exchange contains a non-empty virtual_hosts array +// - a spontaneous VHDS DiscoveryResponse adds two virtual hosts +// - the next spontaneous VHDS DiscoveryResponse removes newly added virtual hosts +// - Upstream makes a request to an (now) unknown domain +// - A VHDS DiscoveryResponse received that contains update for vhost.first host, but vhost.third +// couldn't be resolved +// - Upstream receives a 404 response +TEST_P(VhdsIntegrationTest, VhdsOnDemandUpdateFailToResolveOneAliasOutOfSeveral) { + // RDS exchange with a non-empty virtual_hosts field + useRdsWithVhosts(); + + testRouterHeaderOnlyRequestAndResponse(nullptr, 1); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // verify that rds-based virtual host can be resolved + testRouterHeaderOnlyRequestAndResponse(nullptr, 1, "/rdsone", "vhost.rds.first"); + cleanupUpstreamAndDownstream(); + codec_client_->waitForDisconnect(); + + // Attempt to make a request to an unknown host + codec_client_ = makeHttpConnection(makeClientConnection((lookupPort("http")))); + Http::TestHeaderMapImpl request_headers{{":method", "GET"}, + {":path", "/"}, + {":scheme", "http"}, + {":authority", "vhost.third"}, + {"x-lyft-user-id", "123"}}; + IntegrationStreamDecoderPtr response = codec_client_->makeHeaderOnlyRequest(request_headers); + EXPECT_TRUE(compareDeltaDiscoveryRequest(Config::TypeUrl::get().VirtualHost, + {vhdsRequestResourceName("vhost.third")}, {}, + vhds_stream_)); + // Send an empty response back (the management server isn't aware of vhost.third) + sendDeltaDiscoveryResponseWithUnresolvedAliases({buildVirtualHost2()}, {}, "4", vhds_stream_, + {"vhost.first"}, {"my_route/vhost.third"}); + response->waitForHeaders(); EXPECT_EQ("404", response->headers().Status()->value().getStringView()); diff --git a/test/mocks/http/mocks.cc b/test/mocks/http/mocks.cc index 4fb7b6243768..281b8e0c8c72 100644 --- a/test/mocks/http/mocks.cc +++ b/test/mocks/http/mocks.cc @@ -68,6 +68,8 @@ MockStreamDecoderFilterCallbacks::MockStreamDecoderFilterCallbacks() { absl::string_view details) { sendLocalReply_(code, body, modify_headers, grpc_status, details); })); + ON_CALL(*this, routeConfig()) + .WillByDefault(Return(absl::optional())); } MockStreamDecoderFilterCallbacks::~MockStreamDecoderFilterCallbacks() = default; diff --git a/test/mocks/http/mocks.h b/test/mocks/http/mocks.h index 4a0c208b77ac..51eb569c87c0 100644 --- a/test/mocks/http/mocks.h +++ b/test/mocks/http/mocks.h @@ -136,6 +136,8 @@ class MockStreamDecoderFilterCallbacks : public StreamDecoderFilterCallbacks, MOCK_METHOD0(resetStream, void()); MOCK_METHOD0(clusterInfo, Upstream::ClusterInfoConstSharedPtr()); MOCK_METHOD0(route, Router::RouteConstSharedPtr()); + MOCK_METHOD1(requestRouteConfigUpdate, void(Http::RouteConfigUpdatedCallbackSharedPtr)); + MOCK_METHOD0(routeConfig, absl::optional()); MOCK_METHOD0(clearRouteCache, void()); MOCK_METHOD0(streamId, uint64_t()); MOCK_METHOD0(streamInfo, StreamInfo::StreamInfo&()); @@ -209,6 +211,8 @@ class MockStreamEncoderFilterCallbacks : public StreamEncoderFilterCallbacks, MOCK_METHOD0(resetStream, void()); MOCK_METHOD0(clusterInfo, Upstream::ClusterInfoConstSharedPtr()); MOCK_METHOD0(route, Router::RouteConstSharedPtr()); + MOCK_METHOD1(requestRouteConfigUpdate, void(std::function)); + MOCK_METHOD0(canRequestRouteConfigUpdate, bool()); MOCK_METHOD0(clearRouteCache, void()); MOCK_METHOD0(streamId, uint64_t()); MOCK_METHOD0(streamInfo, StreamInfo::StreamInfo&()); diff --git a/test/mocks/router/mocks.h b/test/mocks/router/mocks.h index 9ca6a4d53776..286234a462bd 100644 --- a/test/mocks/router/mocks.h +++ b/test/mocks/router/mocks.h @@ -431,6 +431,9 @@ class MockRouteConfigProvider : public RouteConfigProvider { MOCK_METHOD0(onConfigUpdate, void()); MOCK_CONST_METHOD1(validateConfig, void(const envoy::config::route::v3alpha::RouteConfiguration&)); + MOCK_METHOD3(requestVirtualHostsUpdate, + void(const std::string&, Event::Dispatcher&, + std::weak_ptr route_config_updated_cb)); std::shared_ptr> route_config_{new NiceMock()}; };