-
Notifications
You must be signed in to change notification settings - Fork 4.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: VHDS: on-demand updates #6552
Changes from all commits
f66b55a
0e6c97e
b447f33
2da973f
21bb7af
2a6c77f
dbafd58
200aa3d
68d7c52
e692f0e
38da6ab
9a14662
33f795b
7a40f59
78acd5c
96a6c90
fe73bbd
374de2c
1c8c52f
c5e314a
28e6ad3
bd11db6
bf63b6d
bbc38b4
01eca26
72896bb
2592cce
e5d9d94
7bab936
852c51b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -150,6 +150,8 @@ class StreamFilterCallbacks { | |
*/ | ||
virtual Router::RouteConstSharedPtr route() PURE; | ||
|
||
virtual bool requestRouteConfigUpdate(std::function<void()> cb) PURE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add Doxygen docs for this? |
||
|
||
/** | ||
* Returns the clusterInfo for the cached route. | ||
* This method is to avoid multiple look ups in the filter chain, it also provides a consistent | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,6 +48,17 @@ class RouteConfigProvider { | |
* Callback used to notify RouteConfigProvider about configuration changes. | ||
*/ | ||
virtual void onConfigUpdate() PURE; | ||
|
||
/** | ||
* Callback used to request an update to the route configuration. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. An update from the management server? |
||
* @param for_domain supplies the domain name that virtual hosts contained in the VHDS response | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a bit confusing, as it is an update request, but the thing we're requesting is referencing the response.. |
||
* must match on | ||
* @param cb callback to be called when the configuration update has been propagated to worker | ||
* threads | ||
* @return whether a request for a configuration update has been successfully scheduled | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How/why can this fail? |
||
*/ | ||
virtual bool requestVirtualHostsUpdate(const std::string& for_domain, | ||
std::function<void()> cb) PURE; | ||
}; | ||
|
||
typedef std::unique_ptr<RouteConfigProvider> RouteConfigProviderPtr; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -76,10 +76,15 @@ void DeltaSubscriptionState::updateResourceInterest( | |
} | ||
} | ||
|
||
void DeltaSubscriptionState::updateResourceInterestViaAliases( | ||
const std::set<std::string>& updates_to_these_aliases) { | ||
aliases_added_.insert(updates_to_these_aliases.begin(), updates_to_these_aliases.end()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I'd want to see some more documentation/comments on how this module works with aliases, could you add some? CC @fredlas for this module review. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One surface-level (since I don't know how it's supposed to work) question to double check you're doing what you intend: is it your intention that aliases are only ever added, not removed? Or, it looks like the actual mechanism is that these aliases just go into resource_names_subscribe; does that mean that to remove them, you are supposed to just do a normal unsubscribe of those names? |
||
} | ||
|
||
// Not having sent any requests yet counts as an "update pending" since you're supposed to resend | ||
// the entirety of your interest at the start of a stream, even if nothing has changed. | ||
bool DeltaSubscriptionState::subscriptionUpdatePending() const { | ||
return !names_added_.empty() || !names_removed_.empty() || | ||
return !aliases_added_.empty() || !names_added_.empty() || !names_removed_.empty() || | ||
!any_request_sent_yet_in_current_stream_; | ||
} | ||
|
||
|
@@ -157,6 +162,28 @@ void DeltaSubscriptionState::handleEstablishmentFailure() { | |
|
||
envoy::api::v2::DeltaDiscoveryRequest DeltaSubscriptionState::getNextRequest() { | ||
envoy::api::v2::DeltaDiscoveryRequest request; | ||
if (!any_request_sent_yet_in_current_stream_) { | ||
populateDiscoveryRequest(request); | ||
} else if (!aliases_added_.empty()) { | ||
populateDiscoveryRequestWithAliases(request); | ||
} else { | ||
populateDiscoveryRequest(request); | ||
} | ||
|
||
request.set_type_url(type_url_); | ||
request.mutable_node()->MergeFrom(local_info_.node()); | ||
return request; | ||
} | ||
|
||
void DeltaSubscriptionState::populateDiscoveryRequestWithAliases( | ||
envoy::api::v2::DeltaDiscoveryRequest& request) { | ||
std::copy(aliases_added_.begin(), aliases_added_.end(), | ||
Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_subscribe())); | ||
aliases_added_.clear(); | ||
} | ||
|
||
void DeltaSubscriptionState::populateDiscoveryRequest( | ||
envoy::api::v2::DeltaDiscoveryRequest& request) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You may be able to factor out the
|
||
if (!any_request_sent_yet_in_current_stream_) { | ||
any_request_sent_yet_in_current_stream_ = true; | ||
// initial_resource_versions "must be populated for first request in a stream". | ||
|
@@ -181,10 +208,6 @@ envoy::api::v2::DeltaDiscoveryRequest DeltaSubscriptionState::getNextRequest() { | |
Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_unsubscribe())); | ||
names_added_.clear(); | ||
names_removed_.clear(); | ||
|
||
request.set_type_url(type_url_); | ||
request.mutable_node()->MergeFrom(local_info_.node()); | ||
return request; | ||
} | ||
|
||
void DeltaSubscriptionState::disableInitFetchTimeoutTimer() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -399,6 +399,7 @@ void ConnectionManagerImpl::chargeTracingStats(const Tracing::Reason& tracing_re | |
|
||
ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager) | ||
: connection_manager_(connection_manager), | ||
route_config_provider_(connection_manager.config_.routeConfigProvider()), | ||
snapped_route_config_(connection_manager.config_.routeConfigProvider().config()), | ||
stream_id_(connection_manager.random_generator_.random()), | ||
request_response_timespan_(new Stats::Timespan( | ||
|
@@ -1125,6 +1126,7 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { | |
Router::RouteConstSharedPtr route; | ||
if (request_headers_ != nullptr) { | ||
route = snapped_route_config_->route(*request_headers_, stream_id_); | ||
// route = route_config_provider_.config()->route(*request_headers_, stream_id_); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? |
||
} | ||
stream_info_.route_entry_ = route ? route->routeEntry() : nullptr; | ||
cached_route_ = std::move(route); | ||
|
@@ -1137,6 +1139,13 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { | |
} | ||
} | ||
|
||
bool ConnectionManagerImpl::ActiveStream::requestRouteConfigUpdate(std::function<void()> cb) { | ||
ASSERT(!request_headers_->Host()->value().empty()); | ||
auto host_header = | ||
Http::LowerCaseString(std::string(request_headers_->Host()->value().getStringView())).get(); | ||
return route_config_provider_.requestVirtualHostsUpdate(host_header, cb); | ||
} | ||
|
||
void ConnectionManagerImpl::ActiveStream::sendLocalReply( | ||
bool is_grpc_request, Code code, absl::string_view body, | ||
const std::function<void(HeaderMap& headers)>& modify_headers, bool is_head_request, | ||
|
@@ -1819,13 +1828,18 @@ Upstream::ClusterInfoConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBas | |
} | ||
|
||
Router::RouteConstSharedPtr ConnectionManagerImpl::ActiveStreamFilterBase::route() { | ||
if (!parent_.cached_route_.has_value()) { | ||
if (!parent_.cached_route_.has_value() || parent_.cached_route_.value() == nullptr) { | ||
parent_.refreshCachedRoute(); | ||
} | ||
|
||
return parent_.cached_route_.value(); | ||
} | ||
|
||
bool ConnectionManagerImpl::ActiveStreamFilterBase::requestRouteConfigUpdate( | ||
std::function<void()> cb) { | ||
return parent_.requestRouteConfigUpdate(cb); | ||
} | ||
|
||
void ConnectionManagerImpl::ActiveStreamFilterBase::clearRouteCache() { | ||
parent_.cached_route_ = absl::optional<Router::RouteConstSharedPtr>(); | ||
parent_.cached_cluster_info_ = absl::optional<Upstream::ClusterInfoConstSharedPtr>(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -153,6 +153,13 @@ void RdsRouteConfigSubscription::onConfigUpdateFailed(const EnvoyException*) { | |
init_target_.ready(); | ||
} | ||
|
||
void RdsRouteConfigSubscription::ondemandUpdate(const std::set<std::string>& aliases) { | ||
if (vhds_subscription_.get() == nullptr) { | ||
return; | ||
} | ||
vhds_subscription_->ondemandUpdate(aliases); | ||
} | ||
|
||
bool RdsRouteConfigSubscription::validateUpdateSize(int num_resources) { | ||
if (num_resources == 0) { | ||
ENVOY_LOG(debug, "Missing RouteConfiguration for {} in onConfigUpdate()", route_config_name_); | ||
|
@@ -172,7 +179,8 @@ RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( | |
Server::Configuration::FactoryContext& factory_context) | ||
: subscription_(std::move(subscription)), | ||
config_update_info_(subscription_->routeConfigUpdate()), factory_context_(factory_context), | ||
tls_(factory_context.threadLocal().allocateSlot()) { | ||
tls_(factory_context.threadLocal().allocateSlot()), | ||
config_update_callbacks_(factory_context.threadLocal().allocateSlot()) { | ||
ConfigConstSharedPtr initial_config; | ||
if (config_update_info_->configInfo().has_value()) { | ||
initial_config = std::make_shared<ConfigImpl>(config_update_info_->routeConfiguration(), | ||
|
@@ -183,6 +191,9 @@ RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( | |
tls_->set([initial_config](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { | ||
return std::make_shared<ThreadLocalConfig>(initial_config); | ||
}); | ||
config_update_callbacks_->set([](Event::Dispatcher&) -> ThreadLocal::ThreadLocalObjectSharedPtr { | ||
return std::make_shared<ThreadLocalCallbacks>(); | ||
}); | ||
subscription_->routeConfigProviders().insert(this); | ||
} | ||
|
||
|
@@ -194,11 +205,30 @@ Router::ConfigConstSharedPtr RdsRouteConfigProviderImpl::config() { | |
return tls_->getTyped<ThreadLocalConfig>().config_; | ||
} | ||
|
||
bool RdsRouteConfigProviderImpl::requestVirtualHostsUpdate(const std::string& for_domain, | ||
std::function<void()> cb) { | ||
if (!config()->usesVhds()) { | ||
return false; | ||
} | ||
factory_context_.dispatcher().post( | ||
[this, for_domain]() -> void { subscription_->ondemandUpdate({for_domain}); }); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's an issue of collision with Envoy callback naming, though, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe |
||
config_update_callbacks_->getTyped<ThreadLocalCallbacks>().callbacks_.push(cb); | ||
|
||
return true; | ||
} | ||
|
||
void RdsRouteConfigProviderImpl::onConfigUpdate() { | ||
ConfigConstSharedPtr new_config( | ||
new ConfigImpl(config_update_info_->routeConfiguration(), factory_context_, false)); | ||
tls_->runOnAllThreads( | ||
[this, new_config]() -> void { tls_->getTyped<ThreadLocalConfig>().config_ = new_config; }); | ||
tls_->runOnAllThreads([this, new_config]() -> void { | ||
tls_->getTyped<ThreadLocalConfig>().config_ = new_config; | ||
auto callbacks = config_update_callbacks_->getTyped<ThreadLocalCallbacks>().callbacks_; | ||
if (!callbacks.empty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be a |
||
auto cb = callbacks.front(); | ||
callbacks.pop(); | ||
cb(); | ||
} | ||
}); | ||
} | ||
|
||
RouteConfigProviderManagerImpl::RouteConfigProviderManagerImpl(Server::Admin& admin) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -306,6 +306,20 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::HeaderMap& headers, bool e | |
// Determine if there is a route entry or a direct response for the request. | ||
route_ = callbacks_->route(); | ||
if (!route_) { | ||
/* | ||
attempting_internal_redirect_with_complete_stream_ = true; | ||
// upstream_request_->upstream_timing_.last_upstream_rx_byte_received_ && | ||
downstream_end_stream_; | ||
|
||
if (//downstream_end_stream_ && | ||
!callbacks_->decodingBuffer() && // Redirects with body not yet supported. | ||
callbacks_->recreateStream()) { | ||
//cluster_->stats().upstream_internal_redirect_succeeded_total_.inc(); | ||
return Http::FilterHeadersStatus::StopIteration; | ||
} | ||
|
||
attempting_internal_redirect_with_complete_stream_ = false; | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? |
||
config_.stats_.no_route_.inc(); | ||
ENVOY_STREAM_LOG(debug, "no cluster match for URL '{}'", *callbacks_, | ||
headers.Path()->value().getStringView()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add Doxygen docs for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I am perceiving the intention correctly, I think you might also want to change the name.
updateResources()
is kind of misnamed; it's not about getting updated resources, but rather updating the set of resource names we care about. Similarly, it looks like this one is about adding to a list of aliases, not updating resources....so I guess I'm really saying now would be a good time to give
updateResources()
a better name. Can I suggest something like eitherupdateResourceInterest
orupdateSubscriptionInterest
?As for the aliases one, since it looks like it's more of a blanket addition than a "replace with this set" (which is how
updateResources
currently works), I would suggest likeaddAliases
oraddResourceAliases
.