Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: introduce initial_fetch_timeout option to limit initialization time #6048

Merged
merged 9 commits into from
Mar 15, 2019
10 changes: 10 additions & 0 deletions api/envoy/api/v2/core/config_source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,14 @@ message ConfigSource {
// source in the bootstrap configuration is used.
AggregatedConfigSource ads = 3;
}

// Optional initialization timeout.
// When this timeout is specified, Envoy will wait no longer than the specified time for first
// config response on this xDS subscription during the :ref:`initialization process
// <arch_overview_initialization>`. After reaching the timeout, Envoy will move to the next
// initialization phase, even if the first config is not delivered yet. The timer is activated
// when the xDS API subscription starts, and is disarmed on first config update or on error. 0
// means no timeout - Envoy will wait indefinitely for the first xDS config (unless another
// timeout applies). Default 0.
google.protobuf.Duration initial_fetch_timeout = 4;
}
5 changes: 5 additions & 0 deletions docs/root/intro/arch_overview/init.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _arch_overview_initialization:

Initialization
==============

Expand All @@ -22,3 +24,6 @@ accepting new connections.
* After all of the previous steps have taken place, the listeners start accepting new connections.
This flow ensures that during hot restart the new process is fully capable of accepting and
processing new connections before the draining of the old process begins.

All mentioned "waiting for one response" periods can be limited by setting corresponding
:ref:`initial_fetch_timeout <envoy_api_field_core.ConfigSource.initial_fetch_timeout>`.
27 changes: 25 additions & 2 deletions source/common/config/delta_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ class DeltaSubscriptionImpl
Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method,
Runtime::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings, SubscriptionStats stats)
const RateLimitSettings& rate_limit_settings, SubscriptionStats stats,
std::chrono::milliseconds init_fetch_timeout)
: GrpcStream<envoy::api::v2::DeltaDiscoveryRequest, envoy::api::v2::DeltaDiscoveryResponse,
ResourceNameDiff>(std::move(async_client), service_method, random, dispatcher,
scope, rate_limit_settings),
type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())),
local_info_(local_info), stats_(stats) {
local_info_(local_info), stats_(stats), dispatcher_(dispatcher),
init_fetch_timeout_(init_fetch_timeout) {
request_.set_type_url(type_url_);
request_.mutable_node()->MergeFrom(local_info_.node());
}
Expand Down Expand Up @@ -120,6 +122,7 @@ class DeltaSubscriptionImpl
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& version_info) {
disableInitFetchTimeoutTimer();
htuch marked this conversation as resolved.
Show resolved Hide resolved
callbacks_->onConfigUpdate(added_resources, removed_resources, version_info);
for (const auto& resource : added_resources) {
resources_[resource.name()] = resource.version();
Expand All @@ -134,6 +137,7 @@ class DeltaSubscriptionImpl
void handleResponse(std::unique_ptr<envoy::api::v2::DeltaDiscoveryResponse>&& message) override {
ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url_,
message->system_version_info());
disableInitFetchTimeoutTimer();

request_.set_response_nonce(message->nonce());

Expand Down Expand Up @@ -167,6 +171,7 @@ class DeltaSubscriptionImpl
}

void handleEstablishmentFailure() override {
disableInitFetchTimeoutTimer();
stats_.update_failure_.inc();
ENVOY_LOG(debug, "delta update for {} failed", type_url_);
stats_.update_attempt_.inc();
Expand All @@ -177,6 +182,15 @@ class DeltaSubscriptionImpl
void start(const std::vector<std::string>& resources,
SubscriptionCallbacks<ResourceType>& callbacks) override {
callbacks_ = &callbacks;

if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
ENVOY_LOG(warn, "delta config: initial fetch timed out for {}", type_url_);
callbacks_->onConfigUpdateFailed(nullptr);
});
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}

establishNewStream();
subscribe(resources);
// The attempt stat here is maintained for the purposes of having consistency between ADS and
Expand All @@ -191,6 +205,12 @@ class DeltaSubscriptionImpl
}

private:
void disableInitFetchTimeoutTimer() {
if (init_fetch_timeout_timer_) {
init_fetch_timeout_timer_->disableTimer();
init_fetch_timeout_timer_.reset();
}
}
// A map from resource name to per-resource version.
std::unordered_map<std::string, std::string> resources_;
// The keys of resources_. Only tracked separately because std::map does not provide an iterator
Expand All @@ -207,6 +227,9 @@ class DeltaSubscriptionImpl
const LocalInfo::LocalInfo& local_info_;

SubscriptionStats stats_;
Event::Dispatcher& dispatcher_;
std::chrono::milliseconds init_fetch_timeout_;
Event::TimerPtr init_fetch_timeout_timer_;
};

} // namespace Config
Expand Down
27 changes: 25 additions & 2 deletions source/common/config/grpc_mux_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,25 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>,
GrpcMuxCallbacks,
Logger::Loggable<Logger::Id::config> {
public:
GrpcMuxSubscriptionImpl(GrpcMux& grpc_mux, SubscriptionStats stats)
GrpcMuxSubscriptionImpl(GrpcMux& grpc_mux, SubscriptionStats stats, Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout)
: grpc_mux_(grpc_mux), stats_(stats),
type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())) {}
type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())),
dispatcher_(dispatcher), init_fetch_timeout_(init_fetch_timeout) {}

// Config::Subscription
void start(const std::vector<std::string>& resources,
SubscriptionCallbacks<ResourceType>& callbacks) override {
callbacks_ = &callbacks;

if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
ENVOY_LOG(warn, "gRPC config: initial fetch timed out for {}", type_url_);
callbacks_->onConfigUpdateFailed(nullptr);
});
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}

watch_ = grpc_mux_.subscribe(type_url_, resources, *this);
// The attempt stat here is maintained for the purposes of having consistency between ADS and
// gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an
Expand All @@ -44,6 +55,7 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>,
// Config::GrpcMuxCallbacks
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) override {
disableInitFetchTimeoutTimer();
Protobuf::RepeatedPtrField<ResourceType> typed_resources;
std::transform(resources.cbegin(), resources.cend(),
Protobuf::RepeatedPtrFieldBackInserter(&typed_resources),
Expand All @@ -61,6 +73,7 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>,
}

void onConfigUpdateFailed(const EnvoyException* e) override {
disableInitFetchTimeoutTimer();
// TODO(htuch): Less fragile signal that this is failure vs. reject.
if (e == nullptr) {
stats_.update_failure_.inc();
Expand All @@ -78,11 +91,21 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>,
}

private:
void disableInitFetchTimeoutTimer() {
if (init_fetch_timeout_timer_) {
init_fetch_timeout_timer_->disableTimer();
init_fetch_timeout_timer_.reset();
}
}

GrpcMux& grpc_mux_;
SubscriptionStats stats_;
const std::string type_url_;
SubscriptionCallbacks<ResourceType>* callbacks_{};
GrpcMuxWatchPtr watch_{};
Event::Dispatcher& dispatcher_;
std::chrono::milliseconds init_fetch_timeout_;
Event::TimerPtr init_fetch_timeout_timer_;
};

} // namespace Config
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ class GrpcSubscriptionImpl : public Config::Subscription<ResourceType> {
GrpcSubscriptionImpl(const LocalInfo::LocalInfo& local_info, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random,
const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats,
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings)
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings,
std::chrono::milliseconds init_fetch_timeout)
: grpc_mux_(local_info, std::move(async_client), dispatcher, service_method, random, scope,
rate_limit_settings),
grpc_mux_subscription_(grpc_mux_, stats) {}
grpc_mux_subscription_(grpc_mux_, stats, dispatcher, init_fetch_timeout) {}

// Config::Subscription
void start(const std::vector<std::string>& resources,
Expand Down
26 changes: 24 additions & 2 deletions source/common/config/http_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
const std::string& remote_cluster_name, Event::Dispatcher& dispatcher,
Runtime::RandomGenerator& random, std::chrono::milliseconds refresh_interval,
std::chrono::milliseconds request_timeout,
const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats)
const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats,
std::chrono::milliseconds init_fetch_timeout)
: Http::RestApiFetcher(cm, remote_cluster_name, dispatcher, random, refresh_interval,
request_timeout),
stats_(stats) {
stats_(stats), dispatcher_(dispatcher), init_fetch_timeout_(init_fetch_timeout) {
request_.mutable_node()->CopyFrom(local_info.node());
ASSERT(service_method.options().HasExtension(google::api::http));
const auto& http_rule = service_method.options().GetExtension(google::api::http);
Expand All @@ -51,6 +52,15 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
void start(const std::vector<std::string>& resources,
Config::SubscriptionCallbacks<ResourceType>& callbacks) override {
ASSERT(callbacks_ == nullptr);

if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
ENVOY_LOG(warn, "REST config: initial fetch timed out for", path_);
callbacks_->onConfigUpdateFailed(nullptr);
});
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}

Protobuf::RepeatedPtrField<ProtobufTypes::String> resources_vector(resources.begin(),
resources.end());
request_.mutable_resource_names()->Swap(&resources_vector);
Expand Down Expand Up @@ -78,6 +88,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
}

void parseResponse(const Http::Message& response) override {
disableInitFetchTimeoutTimer();
envoy::api::v2::DiscoveryResponse message;
const auto status = Protobuf::util::JsonStringToMessage(response.bodyAsString(), &message);
if (!status.ok()) {
Expand All @@ -101,6 +112,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
void onFetchComplete() override {}

void onFetchFailure(const EnvoyException* e) override {
disableInitFetchTimeoutTimer();
ENVOY_LOG(warn, "REST config update failed: {}", e != nullptr ? e->what() : "fetch failure");
handleFailure(e);
}
Expand All @@ -111,11 +123,21 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
callbacks_->onConfigUpdateFailed(e);
}

void disableInitFetchTimeoutTimer() {
if (init_fetch_timeout_timer_) {
init_fetch_timeout_timer_->disableTimer();
init_fetch_timeout_timer_.reset();
}
}

std::string path_;
Protobuf::RepeatedPtrField<ProtobufTypes::String> resources_;
Config::SubscriptionCallbacks<ResourceType>* callbacks_{};
envoy::api::v2::DiscoveryRequest request_;
SubscriptionStats stats_;
Event::Dispatcher& dispatcher_;
std::chrono::milliseconds init_fetch_timeout_;
Event::TimerPtr init_fetch_timeout_timer_;
};

} // namespace Config
Expand Down
12 changes: 8 additions & 4 deletions source/common/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class SubscriptionFactory {
local_info, cm, api_config_source.cluster_names()[0], dispatcher, random,
Utility::apiConfigSourceRefreshDelay(api_config_source),
Utility::apiConfigSourceRequestTimeout(api_config_source),
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(rest_method), stats));
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(rest_method), stats,
Utility::configSourceInitialFetchTimeout(config)));
break;
case envoy::api::v2::core::ApiConfigSource::GRPC:
result.reset(new GrpcSubscriptionImpl<ResourceType>(
Expand All @@ -76,7 +77,8 @@ class SubscriptionFactory {
->create(),
dispatcher, random,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), stats,
scope, Utility::parseRateLimitSettings(api_config_source)));
scope, Utility::parseRateLimitSettings(api_config_source),
Utility::configSourceInitialFetchTimeout(config)));
break;
case envoy::api::v2::core::ApiConfigSource::DELTA_GRPC: {
Utility::checkApiConfigSourceSubscriptionBackingCluster(cm.clusters(), api_config_source);
Expand All @@ -86,7 +88,8 @@ class SubscriptionFactory {
api_config_source, scope)
->create(),
dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method),
random, scope, Utility::parseRateLimitSettings(api_config_source), stats));
random, scope, Utility::parseRateLimitSettings(api_config_source), stats,
Utility::configSourceInitialFetchTimeout(config)));
break;
}
default:
Expand All @@ -95,7 +98,8 @@ class SubscriptionFactory {
break;
}
case envoy::api::v2::core::ConfigSource::kAds: {
result.reset(new GrpcMuxSubscriptionImpl<ResourceType>(cm.adsMux(), stats));
result.reset(new GrpcMuxSubscriptionImpl<ResourceType>(
cm.adsMux(), stats, dispatcher, Utility::configSourceInitialFetchTimeout(config)));
break;
}
default:
Expand Down
6 changes: 6 additions & 0 deletions source/common/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ std::chrono::milliseconds Utility::apiConfigSourceRequestTimeout(
PROTOBUF_GET_MS_OR_DEFAULT(api_config_source, request_timeout, 1000));
}

std::chrono::milliseconds
Utility::configSourceInitialFetchTimeout(const envoy::api::v2::core::ConfigSource& config_source) {
return std::chrono::milliseconds(
PROTOBUF_GET_MS_OR_DEFAULT(config_source, initial_fetch_timeout, 0));
}

void Utility::translateCdsConfig(const Json::Object& json_config,
envoy::api::v2::core::ConfigSource& cds_config) {
translateApiConfigSource(json_config.getObject("cluster")->getString("name"),
Expand Down
8 changes: 8 additions & 0 deletions source/common/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ class Utility {
static std::chrono::milliseconds
apiConfigSourceRequestTimeout(const envoy::api::v2::core::ApiConfigSource& api_config_source);

/**
* Extract initial_fetch_timeout as a std::chrono::milliseconds from
* envoy::api::v2::core::ConfigSource. If request_timeout isn't set in the config source, a
* default value of 0s will be returned.
*/
static std::chrono::milliseconds
configSourceInitialFetchTimeout(const envoy::api::v2::core::ConfigSource& config_source);

/**
* Populate an envoy::api::v2::core::ApiConfigSource.
* @param cluster supplies the cluster name for the ApiConfigSource.
Expand Down
13 changes: 13 additions & 0 deletions test/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ envoy_cc_test_library(
],
)

envoy_cc_test(
name = "delta_subscription_impl_test",
srcs = ["delta_subscription_impl_test.cc"],
deps = [
"//source/common/config:delta_subscription_lib",
"//test/mocks/config:config_mocks",
"//test/mocks/event:event_mocks",
"//test/mocks/grpc:grpc_mocks",
"//test/mocks/local_info:local_info_mocks",
"//test/mocks/runtime:runtime_mocks",
],
)

envoy_cc_test(
name = "http_subscription_impl_test",
srcs = ["http_subscription_impl_test.cc"],
Expand Down
Loading