Skip to content

Commit

Permalink
xds: introduce initial_fetch_timeout option to limit initialization t…
Browse files Browse the repository at this point in the history
…ime (#6048)

Introduce new optional configuration parameter initial_fetch_timeout to ConfigSource proto.
This parameter can be used to limit time Envoy spend on initialization of CDS, LDS and RDS.
Enabling this timeout prevents Envoy from being stuck in PRE_INITIALIZING or INITIALIZING phase, as described in #5862.

Risk Level: Low (optional feature, disabled by default)
Testing: manual testing, unit tests
Docs Changes: add new parameter initial_fetch_timeout to ConfigSource proto
Release Notes: Added

Fixes #5862

Signed-off-by: Marcin Falkowski <marcin.falkowski@allegro.pl>
  • Loading branch information
MarcinFalkowski authored and htuch committed Mar 15, 2019
1 parent 2953bab commit 0657644
Show file tree
Hide file tree
Showing 18 changed files with 426 additions and 23 deletions.
10 changes: 10 additions & 0 deletions api/envoy/api/v2/core/config_source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,14 @@ message ConfigSource {
// source in the bootstrap configuration is used.
AggregatedConfigSource ads = 3;
}

// Optional initialization timeout.
// When this timeout is specified, Envoy will wait no longer than the specified time for first
// config response on this xDS subscription during the :ref:`initialization process
// <arch_overview_initialization>`. After reaching the timeout, Envoy will move to the next
// initialization phase, even if the first config is not delivered yet. The timer is activated
// when the xDS API subscription starts, and is disarmed on first config update or on error. 0
// means no timeout - Envoy will wait indefinitely for the first xDS config (unless another
// timeout applies). Default 0.
google.protobuf.Duration initial_fetch_timeout = 4;
}
5 changes: 5 additions & 0 deletions docs/root/intro/arch_overview/init.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _arch_overview_initialization:

Initialization
==============

Expand All @@ -22,3 +24,6 @@ accepting new connections.
* After all of the previous steps have taken place, the listeners start accepting new connections.
This flow ensures that during hot restart the new process is fully capable of accepting and
processing new connections before the draining of the old process begins.

All mentioned "waiting for one response" periods can be limited by setting corresponding
:ref:`initial_fetch_timeout <envoy_api_field_core.ConfigSource.initial_fetch_timeout>`.
1 change: 1 addition & 0 deletions docs/root/intro/version_history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Version history
* config: removed REST_LEGACY as a valid :ref:`ApiType <envoy_api_field_core.ApiConfigSource.api_type>`.
* config: finish cluster warming only when a named response i.e. ClusterLoadAssignment associated to the cluster being warmed comes in the EDS response. This is a behavioural change from the current implementation where warming of cluster completes on missing load assignments also.
* config: use Envoy cpuset size to set the default number or worker threads if :option:`--cpuset-threads` is enabled.
* config: added support for :ref:`initial_fetch_timeout <envoy_api_field_core.ConfigSource.initial_fetch_timeout>`. The timeout is disabled by default.
* cors: added :ref:`filter_enabled & shadow_enabled RuntimeFractionalPercent flags <cors-runtime>` to filter.
* ext_authz: added an configurable option to make the gRPC service cross-compatible with V2Alpha. Note that this feature is already deprecated. It should be used for a short time, and only when transitioning from alpha to V2 release version.
* ext_authz: migrated from V2alpha to V2 and improved the documentation.
Expand Down
26 changes: 24 additions & 2 deletions source/common/config/delta_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ class DeltaSubscriptionImpl
Event::Dispatcher& dispatcher,
const Protobuf::MethodDescriptor& service_method,
Runtime::RandomGenerator& random, Stats::Scope& scope,
const RateLimitSettings& rate_limit_settings, SubscriptionStats stats)
const RateLimitSettings& rate_limit_settings, SubscriptionStats stats,
std::chrono::milliseconds init_fetch_timeout)
: GrpcStream<envoy::api::v2::DeltaDiscoveryRequest, envoy::api::v2::DeltaDiscoveryResponse,
ResourceNameDiff>(std::move(async_client), service_method, random, dispatcher,
scope, rate_limit_settings),
type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())),
local_info_(local_info), stats_(stats) {
local_info_(local_info), stats_(stats), dispatcher_(dispatcher),
init_fetch_timeout_(init_fetch_timeout) {
request_.set_type_url(type_url_);
request_.mutable_node()->MergeFrom(local_info_.node());
}
Expand Down Expand Up @@ -134,6 +136,7 @@ class DeltaSubscriptionImpl
void handleResponse(std::unique_ptr<envoy::api::v2::DeltaDiscoveryResponse>&& message) override {
ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url_,
message->system_version_info());
disableInitFetchTimeoutTimer();

request_.set_response_nonce(message->nonce());

Expand Down Expand Up @@ -167,6 +170,7 @@ class DeltaSubscriptionImpl
}

void handleEstablishmentFailure() override {
disableInitFetchTimeoutTimer();
stats_.update_failure_.inc();
ENVOY_LOG(debug, "delta update for {} failed", type_url_);
stats_.update_attempt_.inc();
Expand All @@ -177,6 +181,15 @@ class DeltaSubscriptionImpl
void start(const std::vector<std::string>& resources,
SubscriptionCallbacks<ResourceType>& callbacks) override {
callbacks_ = &callbacks;

if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
ENVOY_LOG(warn, "delta config: initial fetch timed out for {}", type_url_);
callbacks_->onConfigUpdateFailed(nullptr);
});
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}

establishNewStream();
subscribe(resources);
// The attempt stat here is maintained for the purposes of having consistency between ADS and
Expand All @@ -191,6 +204,12 @@ class DeltaSubscriptionImpl
}

private:
void disableInitFetchTimeoutTimer() {
if (init_fetch_timeout_timer_) {
init_fetch_timeout_timer_->disableTimer();
init_fetch_timeout_timer_.reset();
}
}
// A map from resource name to per-resource version.
std::unordered_map<std::string, std::string> resources_;
// The keys of resources_. Only tracked separately because std::map does not provide an iterator
Expand All @@ -207,6 +226,9 @@ class DeltaSubscriptionImpl
const LocalInfo::LocalInfo& local_info_;

SubscriptionStats stats_;
Event::Dispatcher& dispatcher_;
std::chrono::milliseconds init_fetch_timeout_;
Event::TimerPtr init_fetch_timeout_timer_;
};

} // namespace Config
Expand Down
27 changes: 25 additions & 2 deletions source/common/config/grpc_mux_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,25 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>,
GrpcMuxCallbacks,
Logger::Loggable<Logger::Id::config> {
public:
GrpcMuxSubscriptionImpl(GrpcMux& grpc_mux, SubscriptionStats stats)
GrpcMuxSubscriptionImpl(GrpcMux& grpc_mux, SubscriptionStats stats, Event::Dispatcher& dispatcher,
std::chrono::milliseconds init_fetch_timeout)
: grpc_mux_(grpc_mux), stats_(stats),
type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())) {}
type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())),
dispatcher_(dispatcher), init_fetch_timeout_(init_fetch_timeout) {}

// Config::Subscription
void start(const std::vector<std::string>& resources,
SubscriptionCallbacks<ResourceType>& callbacks) override {
callbacks_ = &callbacks;

if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
ENVOY_LOG(warn, "gRPC config: initial fetch timed out for {}", type_url_);
callbacks_->onConfigUpdateFailed(nullptr);
});
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}

watch_ = grpc_mux_.subscribe(type_url_, resources, *this);
// The attempt stat here is maintained for the purposes of having consistency between ADS and
// gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an
Expand All @@ -44,6 +55,7 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>,
// Config::GrpcMuxCallbacks
void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) override {
disableInitFetchTimeoutTimer();
Protobuf::RepeatedPtrField<ResourceType> typed_resources;
std::transform(resources.cbegin(), resources.cend(),
Protobuf::RepeatedPtrFieldBackInserter(&typed_resources),
Expand All @@ -62,6 +74,7 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>,
}

void onConfigUpdateFailed(const EnvoyException* e) override {
disableInitFetchTimeoutTimer();
// TODO(htuch): Less fragile signal that this is failure vs. reject.
if (e == nullptr) {
stats_.update_failure_.inc();
Expand All @@ -79,11 +92,21 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>,
}

private:
void disableInitFetchTimeoutTimer() {
if (init_fetch_timeout_timer_) {
init_fetch_timeout_timer_->disableTimer();
init_fetch_timeout_timer_.reset();
}
}

GrpcMux& grpc_mux_;
SubscriptionStats stats_;
const std::string type_url_;
SubscriptionCallbacks<ResourceType>* callbacks_{};
GrpcMuxWatchPtr watch_{};
Event::Dispatcher& dispatcher_;
std::chrono::milliseconds init_fetch_timeout_;
Event::TimerPtr init_fetch_timeout_timer_;
};

} // namespace Config
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/grpc_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ class GrpcSubscriptionImpl : public Config::Subscription<ResourceType> {
GrpcSubscriptionImpl(const LocalInfo::LocalInfo& local_info, Grpc::AsyncClientPtr async_client,
Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random,
const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats,
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings)
Stats::Scope& scope, const RateLimitSettings& rate_limit_settings,
std::chrono::milliseconds init_fetch_timeout)
: grpc_mux_(local_info, std::move(async_client), dispatcher, service_method, random, scope,
rate_limit_settings),
grpc_mux_subscription_(grpc_mux_, stats) {}
grpc_mux_subscription_(grpc_mux_, stats, dispatcher, init_fetch_timeout) {}

// Config::Subscription
void start(const std::vector<std::string>& resources,
Expand Down
26 changes: 24 additions & 2 deletions source/common/config/http_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
const std::string& remote_cluster_name, Event::Dispatcher& dispatcher,
Runtime::RandomGenerator& random, std::chrono::milliseconds refresh_interval,
std::chrono::milliseconds request_timeout,
const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats)
const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats,
std::chrono::milliseconds init_fetch_timeout)
: Http::RestApiFetcher(cm, remote_cluster_name, dispatcher, random, refresh_interval,
request_timeout),
stats_(stats) {
stats_(stats), dispatcher_(dispatcher), init_fetch_timeout_(init_fetch_timeout) {
request_.mutable_node()->CopyFrom(local_info.node());
ASSERT(service_method.options().HasExtension(google::api::http));
const auto& http_rule = service_method.options().GetExtension(google::api::http);
Expand All @@ -51,6 +52,15 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
void start(const std::vector<std::string>& resources,
Config::SubscriptionCallbacks<ResourceType>& callbacks) override {
ASSERT(callbacks_ == nullptr);

if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
ENVOY_LOG(warn, "REST config: initial fetch timed out for", path_);
callbacks_->onConfigUpdateFailed(nullptr);
});
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}

Protobuf::RepeatedPtrField<ProtobufTypes::String> resources_vector(resources.begin(),
resources.end());
request_.mutable_resource_names()->Swap(&resources_vector);
Expand Down Expand Up @@ -78,6 +88,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
}

void parseResponse(const Http::Message& response) override {
disableInitFetchTimeoutTimer();
envoy::api::v2::DiscoveryResponse message;
const auto status = Protobuf::util::JsonStringToMessage(response.bodyAsString(), &message);
if (!status.ok()) {
Expand All @@ -101,6 +112,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
void onFetchComplete() override {}

void onFetchFailure(const EnvoyException* e) override {
disableInitFetchTimeoutTimer();
ENVOY_LOG(warn, "REST config update failed: {}", e != nullptr ? e->what() : "fetch failure");
handleFailure(e);
}
Expand All @@ -111,11 +123,21 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
callbacks_->onConfigUpdateFailed(e);
}

void disableInitFetchTimeoutTimer() {
if (init_fetch_timeout_timer_) {
init_fetch_timeout_timer_->disableTimer();
init_fetch_timeout_timer_.reset();
}
}

std::string path_;
Protobuf::RepeatedPtrField<ProtobufTypes::String> resources_;
Config::SubscriptionCallbacks<ResourceType>* callbacks_{};
envoy::api::v2::DiscoveryRequest request_;
SubscriptionStats stats_;
Event::Dispatcher& dispatcher_;
std::chrono::milliseconds init_fetch_timeout_;
Event::TimerPtr init_fetch_timeout_timer_;
};

} // namespace Config
Expand Down
12 changes: 8 additions & 4 deletions source/common/config/subscription_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class SubscriptionFactory {
local_info, cm, api_config_source.cluster_names()[0], dispatcher, random,
Utility::apiConfigSourceRefreshDelay(api_config_source),
Utility::apiConfigSourceRequestTimeout(api_config_source),
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(rest_method), stats));
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(rest_method), stats,
Utility::configSourceInitialFetchTimeout(config)));
break;
case envoy::api::v2::core::ApiConfigSource::GRPC:
result.reset(new GrpcSubscriptionImpl<ResourceType>(
Expand All @@ -76,7 +77,8 @@ class SubscriptionFactory {
->create(),
dispatcher, random,
*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), stats,
scope, Utility::parseRateLimitSettings(api_config_source)));
scope, Utility::parseRateLimitSettings(api_config_source),
Utility::configSourceInitialFetchTimeout(config)));
break;
case envoy::api::v2::core::ApiConfigSource::DELTA_GRPC: {
Utility::checkApiConfigSourceSubscriptionBackingCluster(cm.clusters(), api_config_source);
Expand All @@ -86,7 +88,8 @@ class SubscriptionFactory {
api_config_source, scope)
->create(),
dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method),
random, scope, Utility::parseRateLimitSettings(api_config_source), stats));
random, scope, Utility::parseRateLimitSettings(api_config_source), stats,
Utility::configSourceInitialFetchTimeout(config)));
break;
}
default:
Expand All @@ -95,7 +98,8 @@ class SubscriptionFactory {
break;
}
case envoy::api::v2::core::ConfigSource::kAds: {
result.reset(new GrpcMuxSubscriptionImpl<ResourceType>(cm.adsMux(), stats));
result.reset(new GrpcMuxSubscriptionImpl<ResourceType>(
cm.adsMux(), stats, dispatcher, Utility::configSourceInitialFetchTimeout(config)));
break;
}
default:
Expand Down
6 changes: 6 additions & 0 deletions source/common/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ std::chrono::milliseconds Utility::apiConfigSourceRequestTimeout(
PROTOBUF_GET_MS_OR_DEFAULT(api_config_source, request_timeout, 1000));
}

std::chrono::milliseconds
Utility::configSourceInitialFetchTimeout(const envoy::api::v2::core::ConfigSource& config_source) {
return std::chrono::milliseconds(
PROTOBUF_GET_MS_OR_DEFAULT(config_source, initial_fetch_timeout, 0));
}

void Utility::translateCdsConfig(const Json::Object& json_config,
envoy::api::v2::core::ConfigSource& cds_config) {
translateApiConfigSource(json_config.getObject("cluster")->getString("name"),
Expand Down
8 changes: 8 additions & 0 deletions source/common/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ class Utility {
static std::chrono::milliseconds
apiConfigSourceRequestTimeout(const envoy::api::v2::core::ApiConfigSource& api_config_source);

/**
* Extract initial_fetch_timeout as a std::chrono::milliseconds from
* envoy::api::v2::core::ConfigSource. If request_timeout isn't set in the config source, a
* default value of 0s will be returned.
*/
static std::chrono::milliseconds
configSourceInitialFetchTimeout(const envoy::api::v2::core::ConfigSource& config_source);

/**
* Populate an envoy::api::v2::core::ApiConfigSource.
* @param cluster supplies the cluster name for the ApiConfigSource.
Expand Down
15 changes: 15 additions & 0 deletions test/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,20 @@ envoy_cc_test_library(
],
)

envoy_cc_test_library(
name = "delta_subscription_test_harness",
hdrs = ["delta_subscription_test_harness.h"],
deps = [
":subscription_test_harness",
"//source/common/config:delta_subscription_lib",
"//test/mocks/config:config_mocks",
"//test/mocks/event:event_mocks",
"//test/mocks/grpc:grpc_mocks",
"//test/mocks/local_info:local_info_mocks",
"//test/mocks/runtime:runtime_mocks",
],
)

envoy_cc_test(
name = "http_subscription_impl_test",
srcs = ["http_subscription_impl_test.cc"],
Expand Down Expand Up @@ -138,6 +152,7 @@ envoy_cc_test(
name = "subscription_impl_test",
srcs = ["subscription_impl_test.cc"],
deps = [
":delta_subscription_test_harness",
":filesystem_subscription_test_harness",
":grpc_subscription_test_harness",
":http_subscription_test_harness",
Expand Down
Loading

0 comments on commit 0657644

Please sign in to comment.