From 4675799d7e531228d48429e74924e900d15e251e Mon Sep 17 00:00:00 2001 From: Marcin Falkowski Date: Mon, 25 Feb 2019 12:47:34 +0100 Subject: [PATCH 1/6] xDS: introduce initial_fetch_timeout option to limit server initialization time Signed-off-by: Marcin Falkowski --- api/envoy/api/v2/core/config_source.proto | 7 +++ source/common/router/rds_impl.cc | 21 +++++++++ source/common/router/rds_impl.h | 7 ++- source/common/upstream/cds_api_impl.cc | 21 +++++++++ source/common/upstream/cds_api_impl.h | 4 +- source/server/lds_api.cc | 16 +++++++ source/server/lds_api.h | 2 + test/common/upstream/cds_api_impl_test.cc | 54 +++++++++++++++++++---- 8 files changed, 119 insertions(+), 13 deletions(-) diff --git a/api/envoy/api/v2/core/config_source.proto b/api/envoy/api/v2/core/config_source.proto index 3be59c1886b5..445c957b7503 100644 --- a/api/envoy/api/v2/core/config_source.proto +++ b/api/envoy/api/v2/core/config_source.proto @@ -104,4 +104,11 @@ message ConfigSource { // source in the bootstrap configuration is used. AggregatedConfigSource ads = 3; } + + // Optional initialization timeout. + // When this timeout is specified, the xDS API will be considered initialized after the specified + // time, even if the first config is not delivered yet. The timer is activated when the xDS API + // starts initializing, and is disarmed when it is initialized (successfully or not). 0 means no + // timeout - Envoy will wait indefinitely for the first xDS config. Default 0. + google.protobuf.Duration initial_fetch_timeout = 4; } diff --git a/source/common/router/rds_impl.cc b/source/common/router/rds_impl.cc index 3908fa799cd9..c6165872e23b 100644 --- a/source/common/router/rds_impl.cc +++ b/source/common/router/rds_impl.cc @@ -73,6 +73,24 @@ RdsRouteConfigSubscription::RdsRouteConfigSubscription( factory_context.clusterManager(), factory_context.random(), *scope_, "envoy.api.v2.RouteDiscoveryService.FetchRoutes", "envoy.api.v2.RouteDiscoveryService.StreamRoutes", factory_context.api()); + + initialization_timeout_ = std::chrono::milliseconds( + PROTOBUF_GET_MS_OR_DEFAULT(rds.config_source(), initial_fetch_timeout, 0)); + if (initialization_timeout_.count() > 0) { + initialization_timeout_timer_ = factory_context.dispatcher().createTimer([this]() -> void { + ENVOY_LOG(warn, "rds: initialization timed out for route_config_name={}", route_config_name_); + onConfigUpdateFailed(nullptr); + }); + } +} + +void RdsRouteConfigSubscription::initialize(std::function callback) { + initialize_callback_ = callback; + if (initialization_timeout_.count() > 0) { + ASSERT(initialization_timeout_timer_); + initialization_timeout_timer_->enableTimer(initialization_timeout_); + } + subscription_->start({route_config_name_}, *this); } RdsRouteConfigSubscription::~RdsRouteConfigSubscription() { @@ -138,6 +156,9 @@ void RdsRouteConfigSubscription::runInitializeCallbackIfAny() { initialize_callback_(); initialize_callback_ = nullptr; } + if (initialization_timeout_timer_) { + initialization_timeout_timer_->disableTimer(); + } } RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( diff --git a/source/common/router/rds_impl.h b/source/common/router/rds_impl.h index 4498eb99bb10..6c0fcce48b1a 100644 --- a/source/common/router/rds_impl.h +++ b/source/common/router/rds_impl.h @@ -101,10 +101,7 @@ class RdsRouteConfigSubscription ~RdsRouteConfigSubscription(); // Init::Target - void initialize(std::function callback) override { - initialize_callback_ = callback; - subscription_->start({route_config_name_}, *this); - } + void initialize(std::function callback) override; // Config::SubscriptionCallbacks void onConfigUpdate(const ResourceVector& resources, const std::string& version_info) override; @@ -130,6 +127,8 @@ class RdsRouteConfigSubscription std::unique_ptr> subscription_; std::function initialize_callback_; + Event::TimerPtr initialization_timeout_timer_; + std::chrono::milliseconds initialization_timeout_; const std::string route_config_name_; Stats::ScopePtr scope_; RdsStats stats_; diff --git a/source/common/upstream/cds_api_impl.cc b/source/common/upstream/cds_api_impl.cc index bfd04b162e9e..889807ca8df6 100644 --- a/source/common/upstream/cds_api_impl.cc +++ b/source/common/upstream/cds_api_impl.cc @@ -35,6 +35,23 @@ CdsApiImpl::CdsApiImpl(const envoy::api::v2::core::ConfigSource& cds_config, Clu cds_config, local_info, dispatcher, cm, random, *scope_, "envoy.api.v2.ClusterDiscoveryService.FetchClusters", "envoy.api.v2.ClusterDiscoveryService.StreamClusters", api); + + initialization_timeout_ = + std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(cds_config, initial_fetch_timeout, 0)); + if (initialization_timeout_.count() > 0) { + initialization_timeout_timer_ = dispatcher.createTimer([this]() -> void { + ENVOY_LOG(warn, "cds: initialization timed out"); + onConfigUpdateFailed(nullptr); + }); + } +} + +void CdsApiImpl::initialize() { + if (initialization_timeout_.count() > 0) { + ASSERT(initialization_timeout_timer_); + initialization_timeout_timer_->enableTimer(initialization_timeout_); + } + subscription_->start({}, *this); } void CdsApiImpl::onConfigUpdate(const ResourceVector& resources, const std::string& version_info) { @@ -117,6 +134,10 @@ void CdsApiImpl::runInitializeCallbackIfAny() { initialize_callback_(); initialize_callback_ = nullptr; } + if (initialization_timeout_timer_) { + initialization_timeout_timer_->disableTimer(); + initialization_timeout_timer_.reset(); + } } } // namespace Upstream diff --git a/source/common/upstream/cds_api_impl.h b/source/common/upstream/cds_api_impl.h index 8fb1fb1cc303..ee22eb35fa05 100644 --- a/source/common/upstream/cds_api_impl.h +++ b/source/common/upstream/cds_api_impl.h @@ -28,7 +28,7 @@ class CdsApiImpl : public CdsApi, Api::Api& api); // Upstream::CdsApi - void initialize() override { subscription_->start({}, *this); } + void initialize() override; void setInitializedCb(std::function callback) override { initialize_callback_ = callback; } @@ -52,6 +52,8 @@ class CdsApiImpl : public CdsApi, std::string version_info_; std::function initialize_callback_; Stats::ScopePtr scope_; + Event::TimerPtr initialization_timeout_timer_; + std::chrono::milliseconds initialization_timeout_; }; } // namespace Upstream diff --git a/source/server/lds_api.cc b/source/server/lds_api.cc index 261f4ca4c7ee..c000a952fa78 100644 --- a/source/server/lds_api.cc +++ b/source/server/lds_api.cc @@ -28,10 +28,23 @@ LdsApiImpl::LdsApiImpl(const envoy::api::v2::core::ConfigSource& lds_config, "envoy.api.v2.ListenerDiscoveryService.StreamListeners", api); Config::Utility::checkLocalInfo("lds", local_info); init_manager.registerTarget(*this, "LDS"); + + initialization_timeout_ = + std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(lds_config, initial_fetch_timeout, 0)); + if (initialization_timeout_.count() > 0) { + initialization_timeout_timer_ = dispatcher.createTimer([this]() -> void { + ENVOY_LOG(warn, "lds: initialization timed out"); + onConfigUpdateFailed(nullptr); + }); + } } void LdsApiImpl::initialize(std::function callback) { initialize_callback_ = callback; + if (initialization_timeout_.count() > 0) { + ASSERT(initialization_timeout_timer_); + initialization_timeout_timer_->enableTimer(initialization_timeout_); + } subscription_->start({}, *this); } @@ -95,6 +108,9 @@ void LdsApiImpl::runInitializeCallbackIfAny() { initialize_callback_(); initialize_callback_ = nullptr; } + if (initialization_timeout_timer_) { + initialization_timeout_timer_->disableTimer(); + } } } // namespace Server diff --git a/source/server/lds_api.h b/source/server/lds_api.h index fefea2e17156..82387fb933d3 100644 --- a/source/server/lds_api.h +++ b/source/server/lds_api.h @@ -49,6 +49,8 @@ class LdsApiImpl : public LdsApi, Stats::ScopePtr scope_; Upstream::ClusterManager& cm_; std::function initialize_callback_; + Event::TimerPtr initialization_timeout_timer_; + std::chrono::milliseconds initialization_timeout_; }; } // namespace Server diff --git a/test/common/upstream/cds_api_impl_test.cc b/test/common/upstream/cds_api_impl_test.cc index aab5b952d9bb..e96dd2d18eee 100644 --- a/test/common/upstream/cds_api_impl_test.cc +++ b/test/common/upstream/cds_api_impl_test.cc @@ -34,6 +34,20 @@ class CdsApiImplTest : public testing::Test { CdsApiImplTest() : request_(&cm_.async_client_), api_(Api::createApiForTest(store_)) {} void setup() { + envoy::api::v2::core::ConfigSource cds_config; + setupConfig(cds_config); + Upstream::ClusterManager::ClusterInfoMap cluster_map; + Upstream::MockClusterMockPrioritySet cluster; + setupClusters(cluster_map, cluster); + + cds_ = CdsApiImpl::create(cds_config, cm_, dispatcher_, random_, local_info_, store_, *api_); + resetCdsInitializedCb(); + + expectRequest(); + cds_->initialize(); + } + + void setupConfig(envoy::api::v2::core::ConfigSource& cds_config) { const std::string config_json = R"EOF( { "cluster": { @@ -43,23 +57,19 @@ class CdsApiImplTest : public testing::Test { )EOF"; Json::ObjectSharedPtr config = Json::Factory::loadFromString(config_json); - envoy::api::v2::core::ConfigSource cds_config; Config::Utility::translateCdsConfig(*config, cds_config); cds_config.mutable_api_config_source()->set_api_type( envoy::api::v2::core::ApiConfigSource::REST); - Upstream::ClusterManager::ClusterInfoMap cluster_map; - Upstream::MockClusterMockPrioritySet cluster; + } + + void setupClusters(Upstream::ClusterManager::ClusterInfoMap& cluster_map, + Upstream::MockClusterMockPrioritySet& cluster) { cluster_map.emplace("foo_cluster", cluster); EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map)); EXPECT_CALL(cluster, info()); EXPECT_CALL(*cluster.info_, addedViaApi()); EXPECT_CALL(cluster, info()); EXPECT_CALL(*cluster.info_, type()); - cds_ = CdsApiImpl::create(cds_config, cm_, dispatcher_, random_, local_info_, store_, *api_); - resetCdsInitializedCb(); - - expectRequest(); - cds_->initialize(); } void resetCdsInitializedCb() { @@ -173,6 +183,7 @@ class CdsApiImplTest : public testing::Test { Http::MockAsyncClientRequest request_; CdsApiPtr cds_; Event::MockTimer* interval_timer_; + Event::MockTimer* initialization_timeout_timer_; Http::AsyncClient::Callbacks* callbacks_{}; ReadyWatcher initialized_; Api::ApiPtr api_; @@ -543,5 +554,32 @@ version_info: '0' EXPECT_EQ(0UL, store_.gauge("cluster_manager.cds.version").value()); } +TEST_F(CdsApiImplTest, InitializationTimeout) { + initialization_timeout_timer_ = new Event::MockTimer(&dispatcher_); + interval_timer_ = new Event::MockTimer(&dispatcher_); + + envoy::api::v2::core::ConfigSource cds_config; + setupConfig(cds_config); + cds_config.mutable_initial_fetch_timeout()->set_seconds(10); + + Upstream::ClusterManager::ClusterInfoMap cluster_map; + Upstream::MockClusterMockPrioritySet cluster; + { + InSequence s; + setupClusters(cluster_map, cluster); + } + + auto cds = CdsApiImpl::create(cds_config, cm_, dispatcher_, random_, local_info_, store_, *api_); + cds->setInitializedCb([this]() -> void { initialized_.ready(); }); + + EXPECT_CALL(*initialization_timeout_timer_, enableTimer(std::chrono::milliseconds(10 * 1000))); + EXPECT_CALL(initialized_, ready()).Times(0); + cds->initialize(); + + EXPECT_CALL(initialized_, ready()); + EXPECT_CALL(*initialization_timeout_timer_, disableTimer()); + initialization_timeout_timer_->callback_(); +} + } // namespace Upstream } // namespace Envoy From f3987791bdaca2ac09625064c6f070c5c146071f Mon Sep 17 00:00:00 2001 From: Marcin Falkowski Date: Sun, 10 Mar 2019 23:23:51 +0100 Subject: [PATCH 2/6] move initial_fetch_timeout to Subscription Signed-off-by: Marcin Falkowski --- api/envoy/api/v2/core/config_source.proto | 11 +- docs/root/intro/arch_overview/init.rst | 5 + .../common/config/delta_subscription_impl.h | 27 ++++- .../config/grpc_mux_subscription_impl.h | 27 ++++- source/common/config/grpc_subscription_impl.h | 5 +- source/common/config/http_subscription_impl.h | 26 +++- source/common/config/subscription_factory.h | 12 +- source/common/config/utility.cc | 6 + source/common/config/utility.h | 8 ++ source/common/router/rds_impl.cc | 21 ---- source/common/router/rds_impl.h | 7 +- source/common/upstream/cds_api_impl.cc | 4 - source/common/upstream/cds_api_impl.h | 4 +- source/server/lds_api.cc | 16 --- source/server/lds_api.h | 2 - test/common/config/BUILD | 13 ++ .../config/delta_subscription_impl_test.cc | 111 ++++++++++++++++++ .../config/grpc_subscription_impl_test.cc | 47 ++++++++ .../config/grpc_subscription_test_harness.h | 6 +- .../config/http_subscription_impl_test.cc | 48 ++++++++ .../config/http_subscription_test_harness.h | 6 +- test/common/config/utility_test.cc | 12 ++ test/common/upstream/cds_api_impl_test.cc | 16 +-- 23 files changed, 356 insertions(+), 84 deletions(-) create mode 100644 test/common/config/delta_subscription_impl_test.cc diff --git a/api/envoy/api/v2/core/config_source.proto b/api/envoy/api/v2/core/config_source.proto index 84924dd68253..8b6014dcbf9d 100644 --- a/api/envoy/api/v2/core/config_source.proto +++ b/api/envoy/api/v2/core/config_source.proto @@ -113,9 +113,12 @@ message ConfigSource { } // Optional initialization timeout. - // When this timeout is specified, the xDS API will be considered initialized after the specified - // time, even if the first config is not delivered yet. The timer is activated when the xDS API - // starts initializing, and is disarmed when it is initialized (successfully or not). 0 means no - // timeout - Envoy will wait indefinitely for the first xDS config. Default 0. + // When this timeout is specified, Envoy will wait no longer than the specified time for first + // config response on this xDS subscription during the :ref:`initialization process + // `. After reaching the timeout, Envoy will move to the next + // initialization phase, even if the first config is not delivered yet. The timer is activated + // when the xDS API subscription starts, and is disarmed on first config update or on error. 0 + // means no timeout - Envoy will wait indefinitely for the first xDS config (unless another + // timeout applies). Default 0. google.protobuf.Duration initial_fetch_timeout = 4; } diff --git a/docs/root/intro/arch_overview/init.rst b/docs/root/intro/arch_overview/init.rst index af0320a57f32..2e05c5a75056 100644 --- a/docs/root/intro/arch_overview/init.rst +++ b/docs/root/intro/arch_overview/init.rst @@ -1,3 +1,5 @@ +.. _arch_overview_initialization: + Initialization ============== @@ -22,3 +24,6 @@ accepting new connections. * After all of the previous steps have taken place, the listeners start accepting new connections. This flow ensures that during hot restart the new process is fully capable of accepting and processing new connections before the draining of the old process begins. + +All mentioned "waiting for one response" periods can be limited by setting corresponding +:ref:`initial_fetch_timeout `. diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 5164d284b6a4..3ca503d3a3c9 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -40,12 +40,14 @@ class DeltaSubscriptionImpl Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, Runtime::RandomGenerator& random, Stats::Scope& scope, - const RateLimitSettings& rate_limit_settings, SubscriptionStats stats) + const RateLimitSettings& rate_limit_settings, SubscriptionStats stats, + std::chrono::milliseconds init_fetch_timeout) : GrpcStream(std::move(async_client), service_method, random, dispatcher, scope, rate_limit_settings), type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())), - local_info_(local_info), stats_(stats) { + local_info_(local_info), stats_(stats), dispatcher_(dispatcher), + init_fetch_timeout_(init_fetch_timeout) { request_.set_type_url(type_url_); request_.mutable_node()->MergeFrom(local_info_.node()); } @@ -120,6 +122,7 @@ class DeltaSubscriptionImpl void onConfigUpdate(const Protobuf::RepeatedPtrField& added_resources, const Protobuf::RepeatedPtrField& removed_resources, const std::string& version_info) { + disableInitFetchTimeoutTimer(); callbacks_->onConfigUpdate(added_resources, removed_resources, version_info); for (const auto& resource : added_resources) { resources_[resource.name()] = resource.version(); @@ -134,6 +137,7 @@ class DeltaSubscriptionImpl void handleResponse(std::unique_ptr&& message) override { ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url_, message->system_version_info()); + disableInitFetchTimeoutTimer(); request_.set_response_nonce(message->nonce()); @@ -167,6 +171,7 @@ class DeltaSubscriptionImpl } void handleEstablishmentFailure() override { + disableInitFetchTimeoutTimer(); stats_.update_failure_.inc(); ENVOY_LOG(debug, "delta update for {} failed", type_url_); stats_.update_attempt_.inc(); @@ -177,6 +182,15 @@ class DeltaSubscriptionImpl void start(const std::vector& resources, SubscriptionCallbacks& callbacks) override { callbacks_ = &callbacks; + + if (init_fetch_timeout_.count() > 0) { + init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { + ENVOY_LOG(warn, "delta config: initial fetch timed out for {}", type_url_); + callbacks_->onConfigUpdateFailed(nullptr); + }); + init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_); + } + establishNewStream(); subscribe(resources); // The attempt stat here is maintained for the purposes of having consistency between ADS and @@ -191,6 +205,12 @@ class DeltaSubscriptionImpl } private: + void disableInitFetchTimeoutTimer() { + if (init_fetch_timeout_timer_) { + init_fetch_timeout_timer_->disableTimer(); + init_fetch_timeout_timer_.reset(); + } + } // A map from resource name to per-resource version. std::unordered_map resources_; // The keys of resources_. Only tracked separately because std::map does not provide an iterator @@ -207,6 +227,9 @@ class DeltaSubscriptionImpl const LocalInfo::LocalInfo& local_info_; SubscriptionStats stats_; + Event::Dispatcher& dispatcher_; + std::chrono::milliseconds init_fetch_timeout_; + Event::TimerPtr init_fetch_timeout_timer_; }; } // namespace Config diff --git a/source/common/config/grpc_mux_subscription_impl.h b/source/common/config/grpc_mux_subscription_impl.h index 2bd080811827..e8b31d77735f 100644 --- a/source/common/config/grpc_mux_subscription_impl.h +++ b/source/common/config/grpc_mux_subscription_impl.h @@ -21,14 +21,25 @@ class GrpcMuxSubscriptionImpl : public Subscription, GrpcMuxCallbacks, Logger::Loggable { public: - GrpcMuxSubscriptionImpl(GrpcMux& grpc_mux, SubscriptionStats stats) + GrpcMuxSubscriptionImpl(GrpcMux& grpc_mux, SubscriptionStats stats, Event::Dispatcher& dispatcher, + std::chrono::milliseconds init_fetch_timeout) : grpc_mux_(grpc_mux), stats_(stats), - type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())) {} + type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())), + dispatcher_(dispatcher), init_fetch_timeout_(init_fetch_timeout) {} // Config::Subscription void start(const std::vector& resources, SubscriptionCallbacks& callbacks) override { callbacks_ = &callbacks; + + if (init_fetch_timeout_.count() > 0) { + init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { + ENVOY_LOG(warn, "gRPC config: initial fetch timed out for {}", type_url_); + callbacks_->onConfigUpdateFailed(nullptr); + }); + init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_); + } + watch_ = grpc_mux_.subscribe(type_url_, resources, *this); // The attempt stat here is maintained for the purposes of having consistency between ADS and // gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an @@ -44,6 +55,7 @@ class GrpcMuxSubscriptionImpl : public Subscription, // Config::GrpcMuxCallbacks void onConfigUpdate(const Protobuf::RepeatedPtrField& resources, const std::string& version_info) override { + disableInitFetchTimeoutTimer(); Protobuf::RepeatedPtrField typed_resources; std::transform(resources.cbegin(), resources.cend(), Protobuf::RepeatedPtrFieldBackInserter(&typed_resources), @@ -61,6 +73,7 @@ class GrpcMuxSubscriptionImpl : public Subscription, } void onConfigUpdateFailed(const EnvoyException* e) override { + disableInitFetchTimeoutTimer(); // TODO(htuch): Less fragile signal that this is failure vs. reject. if (e == nullptr) { stats_.update_failure_.inc(); @@ -78,11 +91,21 @@ class GrpcMuxSubscriptionImpl : public Subscription, } private: + void disableInitFetchTimeoutTimer() { + if (init_fetch_timeout_timer_) { + init_fetch_timeout_timer_->disableTimer(); + init_fetch_timeout_timer_.reset(); + } + } + GrpcMux& grpc_mux_; SubscriptionStats stats_; const std::string type_url_; SubscriptionCallbacks* callbacks_{}; GrpcMuxWatchPtr watch_{}; + Event::Dispatcher& dispatcher_; + std::chrono::milliseconds init_fetch_timeout_; + Event::TimerPtr init_fetch_timeout_timer_; }; } // namespace Config diff --git a/source/common/config/grpc_subscription_impl.h b/source/common/config/grpc_subscription_impl.h index aedbf5adc756..1a0d289bd187 100644 --- a/source/common/config/grpc_subscription_impl.h +++ b/source/common/config/grpc_subscription_impl.h @@ -18,10 +18,11 @@ class GrpcSubscriptionImpl : public Config::Subscription { GrpcSubscriptionImpl(const LocalInfo::LocalInfo& local_info, Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random, const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats, - Stats::Scope& scope, const RateLimitSettings& rate_limit_settings) + Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, + std::chrono::milliseconds init_fetch_timeout) : grpc_mux_(local_info, std::move(async_client), dispatcher, service_method, random, scope, rate_limit_settings), - grpc_mux_subscription_(grpc_mux_, stats) {} + grpc_mux_subscription_(grpc_mux_, stats, dispatcher, init_fetch_timeout) {} // Config::Subscription void start(const std::vector& resources, diff --git a/source/common/config/http_subscription_impl.h b/source/common/config/http_subscription_impl.h index b6ac42863bce..702b0335c3f0 100644 --- a/source/common/config/http_subscription_impl.h +++ b/source/common/config/http_subscription_impl.h @@ -36,10 +36,11 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, const std::string& remote_cluster_name, Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random, std::chrono::milliseconds refresh_interval, std::chrono::milliseconds request_timeout, - const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats) + const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats, + std::chrono::milliseconds init_fetch_timeout) : Http::RestApiFetcher(cm, remote_cluster_name, dispatcher, random, refresh_interval, request_timeout), - stats_(stats) { + stats_(stats), dispatcher_(dispatcher), init_fetch_timeout_(init_fetch_timeout) { request_.mutable_node()->CopyFrom(local_info.node()); ASSERT(service_method.options().HasExtension(google::api::http)); const auto& http_rule = service_method.options().GetExtension(google::api::http); @@ -51,6 +52,15 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, void start(const std::vector& resources, Config::SubscriptionCallbacks& callbacks) override { ASSERT(callbacks_ == nullptr); + + if (init_fetch_timeout_.count() > 0) { + init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { + ENVOY_LOG(warn, "REST config: initial fetch timed out for", path_); + callbacks_->onConfigUpdateFailed(nullptr); + }); + init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_); + } + Protobuf::RepeatedPtrField resources_vector(resources.begin(), resources.end()); request_.mutable_resource_names()->Swap(&resources_vector); @@ -78,6 +88,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, } void parseResponse(const Http::Message& response) override { + disableInitFetchTimeoutTimer(); envoy::api::v2::DiscoveryResponse message; const auto status = Protobuf::util::JsonStringToMessage(response.bodyAsString(), &message); if (!status.ok()) { @@ -101,6 +112,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, void onFetchComplete() override {} void onFetchFailure(const EnvoyException* e) override { + disableInitFetchTimeoutTimer(); ENVOY_LOG(warn, "REST config update failed: {}", e != nullptr ? e->what() : "fetch failure"); handleFailure(e); } @@ -111,11 +123,21 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, callbacks_->onConfigUpdateFailed(e); } + void disableInitFetchTimeoutTimer() { + if (init_fetch_timeout_timer_) { + init_fetch_timeout_timer_->disableTimer(); + init_fetch_timeout_timer_.reset(); + } + } + std::string path_; Protobuf::RepeatedPtrField resources_; Config::SubscriptionCallbacks* callbacks_{}; envoy::api::v2::DiscoveryRequest request_; SubscriptionStats stats_; + Event::Dispatcher& dispatcher_; + std::chrono::milliseconds init_fetch_timeout_; + Event::TimerPtr init_fetch_timeout_timer_; }; } // namespace Config diff --git a/source/common/config/subscription_factory.h b/source/common/config/subscription_factory.h index 4362291ab7a6..36857024f530 100644 --- a/source/common/config/subscription_factory.h +++ b/source/common/config/subscription_factory.h @@ -66,7 +66,8 @@ class SubscriptionFactory { local_info, cm, api_config_source.cluster_names()[0], dispatcher, random, Utility::apiConfigSourceRefreshDelay(api_config_source), Utility::apiConfigSourceRequestTimeout(api_config_source), - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(rest_method), stats)); + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(rest_method), stats, + Utility::configSourceInitialFetchTimeout(config))); break; case envoy::api::v2::core::ApiConfigSource::GRPC: result.reset(new GrpcSubscriptionImpl( @@ -76,7 +77,8 @@ class SubscriptionFactory { ->create(), dispatcher, random, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), stats, - scope, Utility::parseRateLimitSettings(api_config_source))); + scope, Utility::parseRateLimitSettings(api_config_source), + Utility::configSourceInitialFetchTimeout(config))); break; case envoy::api::v2::core::ApiConfigSource::DELTA_GRPC: { Utility::checkApiConfigSourceSubscriptionBackingCluster(cm.clusters(), api_config_source); @@ -86,7 +88,8 @@ class SubscriptionFactory { api_config_source, scope) ->create(), dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), - random, scope, Utility::parseRateLimitSettings(api_config_source), stats)); + random, scope, Utility::parseRateLimitSettings(api_config_source), stats, + Utility::configSourceInitialFetchTimeout(config))); break; } default: @@ -95,7 +98,8 @@ class SubscriptionFactory { break; } case envoy::api::v2::core::ConfigSource::kAds: { - result.reset(new GrpcMuxSubscriptionImpl(cm.adsMux(), stats)); + result.reset(new GrpcMuxSubscriptionImpl( + cm.adsMux(), stats, dispatcher, Utility::configSourceInitialFetchTimeout(config))); break; } default: diff --git a/source/common/config/utility.cc b/source/common/config/utility.cc index d8b11b71d97f..7f68fe26aac7 100644 --- a/source/common/config/utility.cc +++ b/source/common/config/utility.cc @@ -178,6 +178,12 @@ std::chrono::milliseconds Utility::apiConfigSourceRequestTimeout( PROTOBUF_GET_MS_OR_DEFAULT(api_config_source, request_timeout, 1000)); } +std::chrono::milliseconds Utility::configSourceInitialFetchTimeout( + const envoy::api::v2::core::ConfigSource& config_source) { + return std::chrono::milliseconds( + PROTOBUF_GET_MS_OR_DEFAULT(config_source, initial_fetch_timeout, 0)); +} + void Utility::translateCdsConfig(const Json::Object& json_config, envoy::api::v2::core::ConfigSource& cds_config) { translateApiConfigSource(json_config.getObject("cluster")->getString("name"), diff --git a/source/common/config/utility.h b/source/common/config/utility.h index 60c36db3197c..5dacc67bfa7a 100644 --- a/source/common/config/utility.h +++ b/source/common/config/utility.h @@ -104,6 +104,14 @@ class Utility { static std::chrono::milliseconds apiConfigSourceRequestTimeout(const envoy::api::v2::core::ApiConfigSource& api_config_source); + /** + * Extract initial_fetch_timeout as a std::chrono::milliseconds from + * envoy::api::v2::core::ConfigSource. If request_timeout isn't set in the config source, a + * default value of 0s will be returned. + */ + static std::chrono::milliseconds + configSourceInitialFetchTimeout(const envoy::api::v2::core::ConfigSource& config_source); + /** * Populate an envoy::api::v2::core::ApiConfigSource. * @param cluster supplies the cluster name for the ApiConfigSource. diff --git a/source/common/router/rds_impl.cc b/source/common/router/rds_impl.cc index c6165872e23b..3908fa799cd9 100644 --- a/source/common/router/rds_impl.cc +++ b/source/common/router/rds_impl.cc @@ -73,24 +73,6 @@ RdsRouteConfigSubscription::RdsRouteConfigSubscription( factory_context.clusterManager(), factory_context.random(), *scope_, "envoy.api.v2.RouteDiscoveryService.FetchRoutes", "envoy.api.v2.RouteDiscoveryService.StreamRoutes", factory_context.api()); - - initialization_timeout_ = std::chrono::milliseconds( - PROTOBUF_GET_MS_OR_DEFAULT(rds.config_source(), initial_fetch_timeout, 0)); - if (initialization_timeout_.count() > 0) { - initialization_timeout_timer_ = factory_context.dispatcher().createTimer([this]() -> void { - ENVOY_LOG(warn, "rds: initialization timed out for route_config_name={}", route_config_name_); - onConfigUpdateFailed(nullptr); - }); - } -} - -void RdsRouteConfigSubscription::initialize(std::function callback) { - initialize_callback_ = callback; - if (initialization_timeout_.count() > 0) { - ASSERT(initialization_timeout_timer_); - initialization_timeout_timer_->enableTimer(initialization_timeout_); - } - subscription_->start({route_config_name_}, *this); } RdsRouteConfigSubscription::~RdsRouteConfigSubscription() { @@ -156,9 +138,6 @@ void RdsRouteConfigSubscription::runInitializeCallbackIfAny() { initialize_callback_(); initialize_callback_ = nullptr; } - if (initialization_timeout_timer_) { - initialization_timeout_timer_->disableTimer(); - } } RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( diff --git a/source/common/router/rds_impl.h b/source/common/router/rds_impl.h index a147a17772b4..67e086258e21 100644 --- a/source/common/router/rds_impl.h +++ b/source/common/router/rds_impl.h @@ -101,7 +101,10 @@ class RdsRouteConfigSubscription ~RdsRouteConfigSubscription(); // Init::Target - void initialize(std::function callback) override; + void initialize(std::function callback) override { + initialize_callback_ = callback; + subscription_->start({route_config_name_}, *this); + } // Config::SubscriptionCallbacks // TODO(fredlas) deduplicate @@ -132,8 +135,6 @@ class RdsRouteConfigSubscription std::unique_ptr> subscription_; std::function initialize_callback_; - Event::TimerPtr initialization_timeout_timer_; - std::chrono::milliseconds initialization_timeout_; const std::string route_config_name_; Stats::ScopePtr scope_; RdsStats stats_; diff --git a/source/common/upstream/cds_api_impl.cc b/source/common/upstream/cds_api_impl.cc index 70096415447b..805ffe41eed7 100644 --- a/source/common/upstream/cds_api_impl.cc +++ b/source/common/upstream/cds_api_impl.cc @@ -132,10 +132,6 @@ void CdsApiImpl::runInitializeCallbackIfAny() { initialize_callback_(); initialize_callback_ = nullptr; } - if (initialization_timeout_timer_) { - initialization_timeout_timer_->disableTimer(); - initialization_timeout_timer_.reset(); - } } } // namespace Upstream diff --git a/source/common/upstream/cds_api_impl.h b/source/common/upstream/cds_api_impl.h index d46eaa6bb73a..77e215259fd7 100644 --- a/source/common/upstream/cds_api_impl.h +++ b/source/common/upstream/cds_api_impl.h @@ -28,7 +28,7 @@ class CdsApiImpl : public CdsApi, Api::Api& api); // Upstream::CdsApi - void initialize() override; + void initialize() override { subscription_->start({}, *this); } void setInitializedCb(std::function callback) override { initialize_callback_ = callback; } @@ -55,8 +55,6 @@ class CdsApiImpl : public CdsApi, std::string system_version_info_; std::function initialize_callback_; Stats::ScopePtr scope_; - Event::TimerPtr initialization_timeout_timer_; - std::chrono::milliseconds initialization_timeout_; }; } // namespace Upstream diff --git a/source/server/lds_api.cc b/source/server/lds_api.cc index 21c3f8890109..2b5c18629b92 100644 --- a/source/server/lds_api.cc +++ b/source/server/lds_api.cc @@ -28,23 +28,10 @@ LdsApiImpl::LdsApiImpl(const envoy::api::v2::core::ConfigSource& lds_config, "envoy.api.v2.ListenerDiscoveryService.StreamListeners", api); Config::Utility::checkLocalInfo("lds", local_info); init_manager.registerTarget(*this, "LDS"); - - initialization_timeout_ = - std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(lds_config, initial_fetch_timeout, 0)); - if (initialization_timeout_.count() > 0) { - initialization_timeout_timer_ = dispatcher.createTimer([this]() -> void { - ENVOY_LOG(warn, "lds: initialization timed out"); - onConfigUpdateFailed(nullptr); - }); - } } void LdsApiImpl::initialize(std::function callback) { initialize_callback_ = callback; - if (initialization_timeout_.count() > 0) { - ASSERT(initialization_timeout_timer_); - initialization_timeout_timer_->enableTimer(initialization_timeout_); - } subscription_->start({}, *this); } @@ -112,9 +99,6 @@ void LdsApiImpl::runInitializeCallbackIfAny() { initialize_callback_(); initialize_callback_ = nullptr; } - if (initialization_timeout_timer_) { - initialization_timeout_timer_->disableTimer(); - } } } // namespace Server diff --git a/source/server/lds_api.h b/source/server/lds_api.h index d9950919513e..713ead3f118a 100644 --- a/source/server/lds_api.h +++ b/source/server/lds_api.h @@ -54,8 +54,6 @@ class LdsApiImpl : public LdsApi, Stats::ScopePtr scope_; Upstream::ClusterManager& cm_; std::function initialize_callback_; - Event::TimerPtr initialization_timeout_timer_; - std::chrono::milliseconds initialization_timeout_; }; } // namespace Server diff --git a/test/common/config/BUILD b/test/common/config/BUILD index 8f2ce71b9203..923a017ae59b 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -87,6 +87,19 @@ envoy_cc_test_library( ], ) +envoy_cc_test( + name = "delta_subscription_impl_test", + srcs = ["delta_subscription_impl_test.cc"], + deps = [ + "//source/common/config:delta_subscription_lib", + "//test/mocks/config:config_mocks", + "//test/mocks/event:event_mocks", + "//test/mocks/grpc:grpc_mocks", + "//test/mocks/local_info:local_info_mocks", + "//test/mocks/runtime:runtime_mocks", + ], +) + envoy_cc_test( name = "http_subscription_impl_test", srcs = ["http_subscription_impl_test.cc"], diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc new file mode 100644 index 000000000000..a874953596de --- /dev/null +++ b/test/common/config/delta_subscription_impl_test.cc @@ -0,0 +1,111 @@ +#include "common/config/delta_subscription_impl.h" + +#include "test/mocks/config/mocks.h" +#include "test/mocks/event/mocks.h" +#include "test/mocks/grpc/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/runtime/mocks.h" +#include "test/mocks/stats/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::InSequence; +using testing::NiceMock; +using testing::Return; + +namespace Envoy { +namespace Config { +namespace { + +typedef DeltaSubscriptionImpl DeltaEdsSubscriptionImpl; + +class DeltaSubscriptionImplTest : public testing::Test { +public: + DeltaSubscriptionImplTest() + : stats_(Utility::generateStats(stats_store_)), + method_descriptor_(Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.api.v2.EndpointDiscoveryService.StreamEndpoints")), + async_client_(new Grpc::MockAsyncClient()) {} + + void createSubscription(std::chrono::milliseconds init_fetch_timeout) { + EXPECT_CALL(dispatcher_, createTimer_(_)); + subscription_ = std::make_unique( + local_info_, std::unique_ptr(async_client_), dispatcher_, + *method_descriptor_, random_, stats_store_, rate_limit_settings_, stats_, + init_fetch_timeout); + } + + void startSubscription(const std::vector& cluster_names) { + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + EXPECT_CALL(async_stream_, sendMessage(_, _)).Times(2); + subscription_->start(cluster_names, callbacks_); + } + + Stats::IsolatedStoreImpl stats_store_; + SubscriptionStats stats_; + const Protobuf::MethodDescriptor* method_descriptor_; + Grpc::MockAsyncClient* async_client_; + Event::MockDispatcher dispatcher_; + Runtime::MockRandomGenerator random_; + NiceMock local_info_; + Grpc::MockAsyncStream async_stream_; + std::unique_ptr subscription_; + Envoy::Config::RateLimitSettings rate_limit_settings_; + Event::MockTimer* init_timeout_timer_; + NiceMock> callbacks_; +}; + +// Validate that initial fetch timer is created and calls callback on timeout +TEST_F(DeltaSubscriptionImplTest, InitialFetchTimeout) { + InSequence s; + createSubscription(std::chrono::milliseconds(1000)); + + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); + startSubscription({"cluster1"}); + + EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); + init_timeout_timer_->callback_(); +} + +// Validate that initial fetch timer is not created +TEST_F(DeltaSubscriptionImplTest, NoInitialFetchTimeout) { + InSequence s; + createSubscription(std::chrono::milliseconds(0)); + + EXPECT_CALL(dispatcher_, createTimer_(_)).Times(0); + startSubscription({"cluster1"}); +} + +// Validate that initial fetch timer is disabled on config update +TEST_F(DeltaSubscriptionImplTest, DisableInitTimeoutOnSuccess) { + InSequence s; + createSubscription(std::chrono::milliseconds(1000)); + + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); + startSubscription({"cluster1"}); + + EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); + auto response_ = envoy::api::v2::DeltaDiscoveryResponse(); + subscription_->onConfigUpdate(response_.resources(), response_.removed_resources(), ""); +} + +// Validate that initial fetch timer is disabled on config update failed +TEST_F(DeltaSubscriptionImplTest, DisableInitTimeoutOnFail) { + InSequence s; + createSubscription(std::chrono::milliseconds(1000)); + + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); + startSubscription({"cluster1"}); + + EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); + subscription_->handleEstablishmentFailure(); +} + +} // namespace +} // namespace Config +} // namespace Envoy \ No newline at end of file diff --git a/test/common/config/grpc_subscription_impl_test.cc b/test/common/config/grpc_subscription_impl_test.cc index 1293e48b203f..51a5bf70ee33 100644 --- a/test/common/config/grpc_subscription_impl_test.cc +++ b/test/common/config/grpc_subscription_impl_test.cc @@ -9,6 +9,13 @@ namespace Config { namespace { class GrpcSubscriptionImplTest : public testing::Test, public GrpcSubscriptionTestHarness {}; +class GrpcSubscriptionImplInitFetchTimeoutTest : public testing::Test, + public GrpcSubscriptionTestHarness { +public: + GrpcSubscriptionImplInitFetchTimeoutTest() + : GrpcSubscriptionTestHarness(std::chrono::milliseconds(1000)) {} + Event::MockTimer* init_timeout_timer_; +}; // Validate that stream creation results in a timer based retry and can recover. TEST_F(GrpcSubscriptionImplTest, StreamCreationFailure) { @@ -75,6 +82,46 @@ TEST_F(GrpcSubscriptionImplTest, RepeatedNonce) { verifyStats(7, 2, 2, 0, 13237225503670494420U); } +// Validate that initial fetch timer is created and calls callback on timeout +TEST_F(GrpcSubscriptionImplInitFetchTimeoutTest, InitialFetchTimeout) { + InSequence s; + + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + + EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); + init_timeout_timer_->callback_(); + verifyStats(1, 0, 0, 0, 0); +} + +// Validate that initial fetch timer is disabled on config update +TEST_F(GrpcSubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnSuccess) { + InSequence s; + + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + + EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); + deliverConfigUpdate({"cluster0", "cluster1"}, "0", true); +} + +// Validate that initial fetch timer is disabled on config update failed +TEST_F(GrpcSubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnFail) { + InSequence s; + + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + + EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); + deliverConfigUpdate({"cluster0", "cluster1"}, "0", false); +} + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/common/config/grpc_subscription_test_harness.h b/test/common/config/grpc_subscription_test_harness.h index c6316aab957c..45bba9b5719d 100644 --- a/test/common/config/grpc_subscription_test_harness.h +++ b/test/common/config/grpc_subscription_test_harness.h @@ -32,7 +32,9 @@ typedef GrpcSubscriptionImpl GrpcEdsSubsc class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { public: - GrpcSubscriptionTestHarness() + GrpcSubscriptionTestHarness() : GrpcSubscriptionTestHarness(std::chrono::milliseconds(0)) {} + + GrpcSubscriptionTestHarness(std::chrono::milliseconds init_fetch_timeout) : method_descriptor_(Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.api.v2.EndpointDiscoveryService.StreamEndpoints")), async_client_(new Grpc::MockAsyncClient()), timer_(new Event::MockTimer()) { @@ -44,7 +46,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { })); subscription_ = std::make_unique( local_info_, std::unique_ptr(async_client_), dispatcher_, random_, - *method_descriptor_, stats_, stats_store_, rate_limit_settings_); + *method_descriptor_, stats_, stats_store_, rate_limit_settings_, init_fetch_timeout); } ~GrpcSubscriptionTestHarness() { EXPECT_CALL(async_stream_, sendMessage(_, false)); } diff --git a/test/common/config/http_subscription_impl_test.cc b/test/common/config/http_subscription_impl_test.cc index eb5f9df87b48..cb0ac01bb312 100644 --- a/test/common/config/http_subscription_impl_test.cc +++ b/test/common/config/http_subscription_impl_test.cc @@ -4,11 +4,20 @@ #include "gtest/gtest.h" +using testing::InSequence; + namespace Envoy { namespace Config { namespace { class HttpSubscriptionImplTest : public testing::Test, public HttpSubscriptionTestHarness {}; +class HttpSubscriptionImplInitFetchTimeoutTest : public testing::Test, + public HttpSubscriptionTestHarness { +public: + HttpSubscriptionImplInitFetchTimeoutTest() + : HttpSubscriptionTestHarness(std::chrono::milliseconds(1000)) {} + Event::MockTimer* init_timeout_timer_; +}; // Validate that the client can recover from a remote fetch failure. TEST_F(HttpSubscriptionImplTest, OnRequestReset) { @@ -42,6 +51,45 @@ TEST_F(HttpSubscriptionImplTest, BadJsonRecovery) { verifyStats(3, 1, 0, 1, 7148434200721666028); } +TEST_F(HttpSubscriptionImplInitFetchTimeoutTest, InitialFetchTimeout) { + InSequence s; + + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + + EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); + init_timeout_timer_->callback_(); + verifyStats(1, 0, 0, 0, 0); +} + +// Validate that initial fetch timer is disabled on config update +TEST_F(HttpSubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnSuccess) { + InSequence s; + + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + + EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); + deliverConfigUpdate({"cluster0", "cluster1"}, "0", true); +} + +// Validate that initial fetch timer is disabled on config update failed +TEST_F(HttpSubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnFail) { + InSequence s; + + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + + EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); + deliverConfigUpdate({"cluster0", "cluster1"}, "0", false); +} + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/common/config/http_subscription_test_harness.h b/test/common/config/http_subscription_test_harness.h index 762f28454b25..4c7301f51333 100644 --- a/test/common/config/http_subscription_test_harness.h +++ b/test/common/config/http_subscription_test_harness.h @@ -33,7 +33,9 @@ typedef HttpSubscriptionImpl HttpEdsSubsc class HttpSubscriptionTestHarness : public SubscriptionTestHarness { public: - HttpSubscriptionTestHarness() + HttpSubscriptionTestHarness() : HttpSubscriptionTestHarness(std::chrono::milliseconds(0)) {} + + HttpSubscriptionTestHarness(std::chrono::milliseconds init_fetch_timeout) : method_descriptor_(Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.api.v2.EndpointDiscoveryService.FetchEndpoints")), timer_(new Event::MockTimer()), http_request_(&cm_.async_client_) { @@ -45,7 +47,7 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { })); subscription_ = std::make_unique( local_info_, cm_, "eds_cluster", dispatcher_, random_gen_, std::chrono::milliseconds(1), - std::chrono::milliseconds(1000), *method_descriptor_, stats_); + std::chrono::milliseconds(1000), *method_descriptor_, stats_, init_fetch_timeout); } ~HttpSubscriptionTestHarness() { diff --git a/test/common/config/utility_test.cc b/test/common/config/utility_test.cc index 9c4ab302f695..a2cd07509b6b 100644 --- a/test/common/config/utility_test.cc +++ b/test/common/config/utility_test.cc @@ -80,6 +80,18 @@ TEST(UtilityTest, ApiConfigSourceRequestTimeout) { EXPECT_EQ(1234, Utility::apiConfigSourceRequestTimeout(api_config_source).count()); } +TEST(UtilityTest, ConfigSourceDefaultInitFetchTimeout) { + envoy::api::v2::core::ConfigSource config_source; + EXPECT_EQ(0, Utility::configSourceInitialFetchTimeout(config_source).count()); +} + +TEST(UtilityTest, ConfigSourceInitFetchTimeout) { + envoy::api::v2::core::ConfigSource config_source; + config_source.mutable_initial_fetch_timeout()->CopyFrom( + Protobuf::util::TimeUtil::MillisecondsToDuration(654)); + EXPECT_EQ(654, Utility::configSourceInitialFetchTimeout(config_source).count()); +} + TEST(UtilityTest, TranslateApiConfigSource) { envoy::api::v2::core::ApiConfigSource api_config_source_rest_legacy; Utility::translateApiConfigSource("test_rest_legacy_cluster", 10000, diff --git a/test/common/upstream/cds_api_impl_test.cc b/test/common/upstream/cds_api_impl_test.cc index 527a56327e2f..d18aee1f7042 100644 --- a/test/common/upstream/cds_api_impl_test.cc +++ b/test/common/upstream/cds_api_impl_test.cc @@ -36,20 +36,6 @@ class CdsApiImplTest : public testing::Test { CdsApiImplTest() : request_(&cm_.async_client_), api_(Api::createApiForTest(store_)) {} void setup() { - envoy::api::v2::core::ConfigSource cds_config; - setupConfig(cds_config); - Upstream::ClusterManager::ClusterInfoMap cluster_map; - Upstream::MockClusterMockPrioritySet cluster; - setupClusters(cluster_map, cluster); - - cds_ = CdsApiImpl::create(cds_config, cm_, dispatcher_, random_, local_info_, store_, *api_); - resetCdsInitializedCb(); - - expectRequest(); - cds_->initialize(); - } - - void setupConfig(envoy::api::v2::core::ConfigSource& cds_config) { const std::string config_json = R"EOF( { "cluster": { @@ -59,6 +45,7 @@ class CdsApiImplTest : public testing::Test { )EOF"; Json::ObjectSharedPtr config = Json::Factory::loadFromString(config_json); + envoy::api::v2::core::ConfigSource cds_config; Config::Utility::translateCdsConfig(*config, cds_config); cds_config.mutable_api_config_source()->set_api_type( envoy::api::v2::core::ApiConfigSource::REST); @@ -188,7 +175,6 @@ class CdsApiImplTest : public testing::Test { Http::MockAsyncClientRequest request_; CdsApiPtr cds_; Event::MockTimer* interval_timer_; - Event::MockTimer* initialization_timeout_timer_; Http::AsyncClient::Callbacks* callbacks_{}; ReadyWatcher initialized_; Api::ApiPtr api_; From e71fc4ec286639b0944f642d1ed55b659d37f0ab Mon Sep 17 00:00:00 2001 From: Marcin Falkowski Date: Sun, 10 Mar 2019 23:50:36 +0100 Subject: [PATCH 3/6] fix format Signed-off-by: Marcin Falkowski --- source/common/config/utility.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/common/config/utility.cc b/source/common/config/utility.cc index 7f68fe26aac7..225e53ecdd84 100644 --- a/source/common/config/utility.cc +++ b/source/common/config/utility.cc @@ -178,10 +178,10 @@ std::chrono::milliseconds Utility::apiConfigSourceRequestTimeout( PROTOBUF_GET_MS_OR_DEFAULT(api_config_source, request_timeout, 1000)); } -std::chrono::milliseconds Utility::configSourceInitialFetchTimeout( - const envoy::api::v2::core::ConfigSource& config_source) { +std::chrono::milliseconds +Utility::configSourceInitialFetchTimeout(const envoy::api::v2::core::ConfigSource& config_source) { return std::chrono::milliseconds( - PROTOBUF_GET_MS_OR_DEFAULT(config_source, initial_fetch_timeout, 0)); + PROTOBUF_GET_MS_OR_DEFAULT(config_source, initial_fetch_timeout, 0)); } void Utility::translateCdsConfig(const Json::Object& json_config, From 4250208993009317c604f302bd4a9ffebefafaf6 Mon Sep 17 00:00:00 2001 From: Marcin Falkowski Date: Wed, 13 Mar 2019 16:39:09 +0100 Subject: [PATCH 4/6] test refactoring, remove reduntant disableTimeout call Signed-off-by: Marcin Falkowski --- .../common/config/delta_subscription_impl.h | 1 - test/common/config/BUILD | 8 +- .../config/delta_subscription_impl_test.cc | 111 ------------- .../config/delta_subscription_test_harness.h | 153 ++++++++++++++++++ .../filesystem_subscription_test_harness.h | 23 ++- .../config/grpc_subscription_impl_test.cc | 47 ------ .../config/grpc_subscription_test_harness.h | 16 ++ .../config/http_subscription_impl_test.cc | 48 ------ .../config/http_subscription_test_harness.h | 16 ++ test/common/config/subscription_impl_test.cc | 68 +++++++- .../common/config/subscription_test_harness.h | 8 + 11 files changed, 282 insertions(+), 217 deletions(-) delete mode 100644 test/common/config/delta_subscription_impl_test.cc create mode 100644 test/common/config/delta_subscription_test_harness.h diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 3ca503d3a3c9..5afdc83a2dd3 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -122,7 +122,6 @@ class DeltaSubscriptionImpl void onConfigUpdate(const Protobuf::RepeatedPtrField& added_resources, const Protobuf::RepeatedPtrField& removed_resources, const std::string& version_info) { - disableInitFetchTimeoutTimer(); callbacks_->onConfigUpdate(added_resources, removed_resources, version_info); for (const auto& resource : added_resources) { resources_[resource.name()] = resource.version(); diff --git a/test/common/config/BUILD b/test/common/config/BUILD index 923a017ae59b..f3f853f6856e 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -87,10 +87,11 @@ envoy_cc_test_library( ], ) -envoy_cc_test( - name = "delta_subscription_impl_test", - srcs = ["delta_subscription_impl_test.cc"], +envoy_cc_test_library( + name = "delta_subscription_test_harness", + hdrs = ["delta_subscription_test_harness.h"], deps = [ + ":subscription_test_harness", "//source/common/config:delta_subscription_lib", "//test/mocks/config:config_mocks", "//test/mocks/event:event_mocks", @@ -151,6 +152,7 @@ envoy_cc_test( name = "subscription_impl_test", srcs = ["subscription_impl_test.cc"], deps = [ + ":delta_subscription_test_harness", ":filesystem_subscription_test_harness", ":grpc_subscription_test_harness", ":http_subscription_test_harness", diff --git a/test/common/config/delta_subscription_impl_test.cc b/test/common/config/delta_subscription_impl_test.cc deleted file mode 100644 index a874953596de..000000000000 --- a/test/common/config/delta_subscription_impl_test.cc +++ /dev/null @@ -1,111 +0,0 @@ -#include "common/config/delta_subscription_impl.h" - -#include "test/mocks/config/mocks.h" -#include "test/mocks/event/mocks.h" -#include "test/mocks/grpc/mocks.h" -#include "test/mocks/local_info/mocks.h" -#include "test/mocks/runtime/mocks.h" -#include "test/mocks/stats/mocks.h" - -#include "gmock/gmock.h" -#include "gtest/gtest.h" - -using testing::InSequence; -using testing::NiceMock; -using testing::Return; - -namespace Envoy { -namespace Config { -namespace { - -typedef DeltaSubscriptionImpl DeltaEdsSubscriptionImpl; - -class DeltaSubscriptionImplTest : public testing::Test { -public: - DeltaSubscriptionImplTest() - : stats_(Utility::generateStats(stats_store_)), - method_descriptor_(Protobuf::DescriptorPool::generated_pool()->FindMethodByName( - "envoy.api.v2.EndpointDiscoveryService.StreamEndpoints")), - async_client_(new Grpc::MockAsyncClient()) {} - - void createSubscription(std::chrono::milliseconds init_fetch_timeout) { - EXPECT_CALL(dispatcher_, createTimer_(_)); - subscription_ = std::make_unique( - local_info_, std::unique_ptr(async_client_), dispatcher_, - *method_descriptor_, random_, stats_store_, rate_limit_settings_, stats_, - init_fetch_timeout); - } - - void startSubscription(const std::vector& cluster_names) { - EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); - EXPECT_CALL(async_stream_, sendMessage(_, _)).Times(2); - subscription_->start(cluster_names, callbacks_); - } - - Stats::IsolatedStoreImpl stats_store_; - SubscriptionStats stats_; - const Protobuf::MethodDescriptor* method_descriptor_; - Grpc::MockAsyncClient* async_client_; - Event::MockDispatcher dispatcher_; - Runtime::MockRandomGenerator random_; - NiceMock local_info_; - Grpc::MockAsyncStream async_stream_; - std::unique_ptr subscription_; - Envoy::Config::RateLimitSettings rate_limit_settings_; - Event::MockTimer* init_timeout_timer_; - NiceMock> callbacks_; -}; - -// Validate that initial fetch timer is created and calls callback on timeout -TEST_F(DeltaSubscriptionImplTest, InitialFetchTimeout) { - InSequence s; - createSubscription(std::chrono::milliseconds(1000)); - - init_timeout_timer_ = new Event::MockTimer(&dispatcher_); - - EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); - startSubscription({"cluster1"}); - - EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); - init_timeout_timer_->callback_(); -} - -// Validate that initial fetch timer is not created -TEST_F(DeltaSubscriptionImplTest, NoInitialFetchTimeout) { - InSequence s; - createSubscription(std::chrono::milliseconds(0)); - - EXPECT_CALL(dispatcher_, createTimer_(_)).Times(0); - startSubscription({"cluster1"}); -} - -// Validate that initial fetch timer is disabled on config update -TEST_F(DeltaSubscriptionImplTest, DisableInitTimeoutOnSuccess) { - InSequence s; - createSubscription(std::chrono::milliseconds(1000)); - - init_timeout_timer_ = new Event::MockTimer(&dispatcher_); - EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); - startSubscription({"cluster1"}); - - EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); - auto response_ = envoy::api::v2::DeltaDiscoveryResponse(); - subscription_->onConfigUpdate(response_.resources(), response_.removed_resources(), ""); -} - -// Validate that initial fetch timer is disabled on config update failed -TEST_F(DeltaSubscriptionImplTest, DisableInitTimeoutOnFail) { - InSequence s; - createSubscription(std::chrono::milliseconds(1000)); - - init_timeout_timer_ = new Event::MockTimer(&dispatcher_); - EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); - startSubscription({"cluster1"}); - - EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); - subscription_->handleEstablishmentFailure(); -} - -} // namespace -} // namespace Config -} // namespace Envoy \ No newline at end of file diff --git a/test/common/config/delta_subscription_test_harness.h b/test/common/config/delta_subscription_test_harness.h new file mode 100644 index 000000000000..04ed8d553e4c --- /dev/null +++ b/test/common/config/delta_subscription_test_harness.h @@ -0,0 +1,153 @@ +#pragma once + +#include "common/config/delta_subscription_impl.h" + +#include "test/common/config/subscription_test_harness.h" +#include "test/mocks/config/mocks.h" +#include "test/mocks/event/mocks.h" +#include "test/mocks/grpc/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/runtime/mocks.h" +#include "test/mocks/stats/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::InSequence; +using testing::Mock; +using testing::NiceMock; +using testing::Return; + +namespace Envoy { +namespace Config { +namespace { + +typedef DeltaSubscriptionImpl DeltaEdsSubscriptionImpl; + +class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { +public: + DeltaSubscriptionTestHarness() : DeltaSubscriptionTestHarness(std::chrono::milliseconds(0)) {} + DeltaSubscriptionTestHarness(std::chrono::milliseconds init_fetch_timeout) + : method_descriptor_(Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.api.v2.EndpointDiscoveryService.StreamEndpoints")), + async_client_(new Grpc::MockAsyncClient()) { + node_.set_id("fo0"); + EXPECT_CALL(local_info_, node()).WillRepeatedly(testing::ReturnRef(node_)); + EXPECT_CALL(dispatcher_, createTimer_(_)); + subscription_ = std::make_unique( + local_info_, std::unique_ptr(async_client_), dispatcher_, + *method_descriptor_, random_, stats_store_, rate_limit_settings_, stats_, + init_fetch_timeout); + } + + void startSubscription(const std::vector& cluster_names) override { + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + last_cluster_names_ = cluster_names; + expectSendMessage({}, ""); + expectSendMessage(last_cluster_names_, ""); + subscription_->start(cluster_names, callbacks_); + } + + void expectSendMessage(const std::vector& cluster_names, + const std::string& version) override { + UNREFERENCED_PARAMETER(version); + expectSendMessage(cluster_names, {}, Grpc::Status::GrpcStatus::Ok, ""); + } + + void expectSendMessage(const std::vector& subscribe, + const std::vector& unsubscribe, + const Protobuf::int32 error_code, const std::string& error_message) { + envoy::api::v2::DeltaDiscoveryRequest expected_request; + expected_request.mutable_node()->CopyFrom(node_); + for (const auto& resource : subscribe) { + expected_request.add_resource_names_subscribe(resource); + } + for (auto resource = unsubscribe.rbegin(); resource != unsubscribe.rend(); ++resource) { + expected_request.add_resource_names_unsubscribe(*resource); + } + expected_request.set_response_nonce(last_response_nonce_); + expected_request.set_type_url(Config::TypeUrl::get().ClusterLoadAssignment); + + if (error_code != Grpc::Status::GrpcStatus::Ok) { + ::google::rpc::Status* error_detail = expected_request.mutable_error_detail(); + error_detail->set_code(error_code); + error_detail->set_message(error_message); + } + EXPECT_CALL(async_stream_, sendMessage(ProtoEq(expected_request), false)); + } + + void deliverConfigUpdate(const std::vector& cluster_names, + const std::string& version, bool accept) override { + std::unique_ptr response( + new envoy::api::v2::DeltaDiscoveryResponse()); + + last_response_nonce_ = std::to_string(HashUtil::xxHash64(version)); + response->set_nonce(last_response_nonce_); + response->set_system_version_info(version); + + Protobuf::RepeatedPtrField typed_resources; + for (const auto& cluster : cluster_names) { + if (std::find(last_cluster_names_.begin(), last_cluster_names_.end(), cluster) != + last_cluster_names_.end()) { + envoy::api::v2::ClusterLoadAssignment* load_assignment = typed_resources.Add(); + load_assignment->set_cluster_name(cluster); + auto* resource = response->add_resources(); + resource->set_name(cluster); + resource->set_version(version); + resource->mutable_resource()->PackFrom(*load_assignment); + } + } + Protobuf::RepeatedPtrField removed_resources; + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, version)).WillOnce(ThrowOnRejectedConfig(accept)); + if (accept) { + expectSendMessage({}, version); + } else { + EXPECT_CALL(callbacks_, onConfigUpdateFailed(_)); + expectSendMessage({}, {}, Grpc::Status::GrpcStatus::Internal, "bad config"); + } + subscription_->onReceiveMessage(std::move(response)); + Mock::VerifyAndClearExpectations(&async_stream_); + } + + void updateResources(const std::vector& cluster_names) override { + std::vector cluster_superset = cluster_names; + cluster_superset.insert(cluster_superset.end(), last_cluster_names_.begin(), + last_cluster_names_.end()); + expectSendMessage(cluster_names, last_cluster_names_, Grpc::Status::GrpcStatus::Ok, ""); + subscription_->updateResources(cluster_names); + last_cluster_names_ = cluster_names; + } + + void expectConfigUpdateFailed() override { + EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); + } + + void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) override { + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout))); + } + + void expectDisableInitFetchTimeoutTimer() override { + EXPECT_CALL(*init_timeout_timer_, disableTimer()); + } + + void callInitFetchTimeoutCb() override { init_timeout_timer_->callback_(); } + + const Protobuf::MethodDescriptor* method_descriptor_; + Grpc::MockAsyncClient* async_client_; + Event::MockDispatcher dispatcher_; + Runtime::MockRandomGenerator random_; + NiceMock local_info_; + Grpc::MockAsyncStream async_stream_; + std::unique_ptr subscription_; + std::string last_response_nonce_; + std::vector last_cluster_names_; + Envoy::Config::RateLimitSettings rate_limit_settings_; + Event::MockTimer* init_timeout_timer_; + envoy::api::v2::core::Node node_; + NiceMock> callbacks_; +}; + +} // namespace +} // namespace Config +} // namespace Envoy \ No newline at end of file diff --git a/test/common/config/filesystem_subscription_test_harness.h b/test/common/config/filesystem_subscription_test_harness.h index e782dc518955..6e606c9a2d2a 100644 --- a/test/common/config/filesystem_subscription_test_harness.h +++ b/test/common/config/filesystem_subscription_test_harness.h @@ -33,7 +33,11 @@ class FilesystemSubscriptionTestHarness : public SubscriptionTestHarness { api_(Api::createApiForTest(stats_store_)), dispatcher_(api_->allocateDispatcher()), subscription_(*dispatcher_, path_, stats_, *api_) {} - ~FilesystemSubscriptionTestHarness() { EXPECT_EQ(0, ::unlink(path_.c_str())); } + ~FilesystemSubscriptionTestHarness() { + if (::access(path_.c_str(), F_OK) != -1) { + EXPECT_EQ(0, ::unlink(path_.c_str())); + } + } void startSubscription(const std::vector& cluster_names) override { std::ifstream config_file(path_); @@ -95,6 +99,23 @@ class FilesystemSubscriptionTestHarness : public SubscriptionTestHarness { failure + (file_at_start_ ? 0 : 1), version); } + void expectConfigUpdateFailed() override { + // initial_fetch_timeout not implemented + } + + void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) override { + UNREFERENCED_PARAMETER(timeout); + // initial_fetch_timeout not implemented + } + + void expectDisableInitFetchTimeoutTimer() override { + // initial_fetch_timeout not implemented + } + + void callInitFetchTimeoutCb() override { + // initial_fetch_timeout not implemented + } + const std::string path_; std::string version_; Stats::IsolatedStoreImpl stats_store_; diff --git a/test/common/config/grpc_subscription_impl_test.cc b/test/common/config/grpc_subscription_impl_test.cc index 51a5bf70ee33..1293e48b203f 100644 --- a/test/common/config/grpc_subscription_impl_test.cc +++ b/test/common/config/grpc_subscription_impl_test.cc @@ -9,13 +9,6 @@ namespace Config { namespace { class GrpcSubscriptionImplTest : public testing::Test, public GrpcSubscriptionTestHarness {}; -class GrpcSubscriptionImplInitFetchTimeoutTest : public testing::Test, - public GrpcSubscriptionTestHarness { -public: - GrpcSubscriptionImplInitFetchTimeoutTest() - : GrpcSubscriptionTestHarness(std::chrono::milliseconds(1000)) {} - Event::MockTimer* init_timeout_timer_; -}; // Validate that stream creation results in a timer based retry and can recover. TEST_F(GrpcSubscriptionImplTest, StreamCreationFailure) { @@ -82,46 +75,6 @@ TEST_F(GrpcSubscriptionImplTest, RepeatedNonce) { verifyStats(7, 2, 2, 0, 13237225503670494420U); } -// Validate that initial fetch timer is created and calls callback on timeout -TEST_F(GrpcSubscriptionImplInitFetchTimeoutTest, InitialFetchTimeout) { - InSequence s; - - init_timeout_timer_ = new Event::MockTimer(&dispatcher_); - EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); - startSubscription({"cluster0", "cluster1"}); - verifyStats(1, 0, 0, 0, 0); - - EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); - init_timeout_timer_->callback_(); - verifyStats(1, 0, 0, 0, 0); -} - -// Validate that initial fetch timer is disabled on config update -TEST_F(GrpcSubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnSuccess) { - InSequence s; - - init_timeout_timer_ = new Event::MockTimer(&dispatcher_); - EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); - startSubscription({"cluster0", "cluster1"}); - verifyStats(1, 0, 0, 0, 0); - - EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); - deliverConfigUpdate({"cluster0", "cluster1"}, "0", true); -} - -// Validate that initial fetch timer is disabled on config update failed -TEST_F(GrpcSubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnFail) { - InSequence s; - - init_timeout_timer_ = new Event::MockTimer(&dispatcher_); - EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); - startSubscription({"cluster0", "cluster1"}); - verifyStats(1, 0, 0, 0, 0); - - EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); - deliverConfigUpdate({"cluster0", "cluster1"}, "0", false); -} - } // namespace } // namespace Config } // namespace Envoy diff --git a/test/common/config/grpc_subscription_test_harness.h b/test/common/config/grpc_subscription_test_harness.h index 45bba9b5719d..a01b4588be39 100644 --- a/test/common/config/grpc_subscription_test_harness.h +++ b/test/common/config/grpc_subscription_test_harness.h @@ -130,6 +130,21 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { last_cluster_names_ = cluster_names; } + void expectConfigUpdateFailed() override { + EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); + } + + void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) override { + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout))); + } + + void expectDisableInitFetchTimeoutTimer() override { + EXPECT_CALL(*init_timeout_timer_, disableTimer()); + } + + void callInitFetchTimeoutCb() override { init_timeout_timer_->callback_(); } + std::string version_; const Protobuf::MethodDescriptor* method_descriptor_; Grpc::MockAsyncClient* async_client_; @@ -146,6 +161,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { std::vector last_cluster_names_; NiceMock local_info_; Envoy::Config::RateLimitSettings rate_limit_settings_; + Event::MockTimer* init_timeout_timer_; }; // TODO(danielhochman): test with RDS and ensure version_info is same as what API returned diff --git a/test/common/config/http_subscription_impl_test.cc b/test/common/config/http_subscription_impl_test.cc index cb0ac01bb312..eb5f9df87b48 100644 --- a/test/common/config/http_subscription_impl_test.cc +++ b/test/common/config/http_subscription_impl_test.cc @@ -4,20 +4,11 @@ #include "gtest/gtest.h" -using testing::InSequence; - namespace Envoy { namespace Config { namespace { class HttpSubscriptionImplTest : public testing::Test, public HttpSubscriptionTestHarness {}; -class HttpSubscriptionImplInitFetchTimeoutTest : public testing::Test, - public HttpSubscriptionTestHarness { -public: - HttpSubscriptionImplInitFetchTimeoutTest() - : HttpSubscriptionTestHarness(std::chrono::milliseconds(1000)) {} - Event::MockTimer* init_timeout_timer_; -}; // Validate that the client can recover from a remote fetch failure. TEST_F(HttpSubscriptionImplTest, OnRequestReset) { @@ -51,45 +42,6 @@ TEST_F(HttpSubscriptionImplTest, BadJsonRecovery) { verifyStats(3, 1, 0, 1, 7148434200721666028); } -TEST_F(HttpSubscriptionImplInitFetchTimeoutTest, InitialFetchTimeout) { - InSequence s; - - init_timeout_timer_ = new Event::MockTimer(&dispatcher_); - EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); - startSubscription({"cluster0", "cluster1"}); - verifyStats(1, 0, 0, 0, 0); - - EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); - init_timeout_timer_->callback_(); - verifyStats(1, 0, 0, 0, 0); -} - -// Validate that initial fetch timer is disabled on config update -TEST_F(HttpSubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnSuccess) { - InSequence s; - - init_timeout_timer_ = new Event::MockTimer(&dispatcher_); - EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); - startSubscription({"cluster0", "cluster1"}); - verifyStats(1, 0, 0, 0, 0); - - EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); - deliverConfigUpdate({"cluster0", "cluster1"}, "0", true); -} - -// Validate that initial fetch timer is disabled on config update failed -TEST_F(HttpSubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnFail) { - InSequence s; - - init_timeout_timer_ = new Event::MockTimer(&dispatcher_); - EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(1000))); - startSubscription({"cluster0", "cluster1"}); - verifyStats(1, 0, 0, 0, 0); - - EXPECT_CALL(*init_timeout_timer_, disableTimer()).Times(1); - deliverConfigUpdate({"cluster0", "cluster1"}, "0", false); -} - } // namespace } // namespace Config } // namespace Envoy diff --git a/test/common/config/http_subscription_test_harness.h b/test/common/config/http_subscription_test_harness.h index 4c7301f51333..379218ced1fc 100644 --- a/test/common/config/http_subscription_test_harness.h +++ b/test/common/config/http_subscription_test_harness.h @@ -138,6 +138,21 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { timerTick(); } + void expectConfigUpdateFailed() override { + EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); + } + + void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) override { + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout))); + } + + void expectDisableInitFetchTimeoutTimer() override { + EXPECT_CALL(*init_timeout_timer_, disableTimer()); + } + + void callInitFetchTimeoutCb() override { init_timeout_timer_->callback_(); } + void timerTick() { expectSendMessage(cluster_names_, version_); timer_cb_(); @@ -158,6 +173,7 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { Config::MockSubscriptionCallbacks callbacks_; std::unique_ptr subscription_; NiceMock local_info_; + Event::MockTimer* init_timeout_timer_; }; } // namespace Config diff --git a/test/common/config/subscription_impl_test.cc b/test/common/config/subscription_impl_test.cc index 85b07333306a..5a733be4c52e 100644 --- a/test/common/config/subscription_impl_test.cc +++ b/test/common/config/subscription_impl_test.cc @@ -1,29 +1,37 @@ #include +#include "test/common/config/delta_subscription_test_harness.h" #include "test/common/config/filesystem_subscription_test_harness.h" #include "test/common/config/grpc_subscription_test_harness.h" #include "test/common/config/http_subscription_test_harness.h" #include "test/common/config/subscription_test_harness.h" +using testing::InSequence; + namespace Envoy { namespace Config { namespace { enum class SubscriptionType { Grpc, + DeltaGrpc, Http, Filesystem, }; class SubscriptionImplTest : public testing::TestWithParam { public: - SubscriptionImplTest() { + SubscriptionImplTest() : SubscriptionImplTest(std::chrono::milliseconds(0)) {} + SubscriptionImplTest(std::chrono::milliseconds init_fetch_timeout) { switch (GetParam()) { case SubscriptionType::Grpc: - test_harness_ = std::make_unique(); + test_harness_ = std::make_unique(init_fetch_timeout); + break; + case SubscriptionType::DeltaGrpc: + test_harness_ = std::make_unique(init_fetch_timeout); break; case SubscriptionType::Http: - test_harness_ = std::make_unique(); + test_harness_ = std::make_unique(init_fetch_timeout); break; case SubscriptionType::Filesystem: test_harness_ = std::make_unique(); @@ -54,12 +62,29 @@ class SubscriptionImplTest : public testing::TestWithParam { test_harness_->deliverConfigUpdate(cluster_names, version, accept); } + void expectConfigUpdateFailed() { test_harness_->expectConfigUpdateFailed(); } + + void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) { + test_harness_->expectEnableInitFetchTimeoutTimer(timeout); + } + + void expectDisableInitFetchTimeoutTimer() { test_harness_->expectDisableInitFetchTimeoutTimer(); } + + void callInitFetchTimeoutCb() { test_harness_->callInitFetchTimeoutCb(); } + std::unique_ptr test_harness_; }; -INSTANTIATE_TEST_SUITE_P(SubscriptionImplTest, SubscriptionImplTest, - testing::ValuesIn({SubscriptionType::Grpc, SubscriptionType::Http, - SubscriptionType::Filesystem})); +class SubscriptionImplInitFetchTimeoutTest : public SubscriptionImplTest { +public: + SubscriptionImplInitFetchTimeoutTest() : SubscriptionImplTest(std::chrono::milliseconds(1000)) {} +}; + +const auto impls = {SubscriptionType::Grpc, SubscriptionType::DeltaGrpc, SubscriptionType::Http, + SubscriptionType::Filesystem}; +INSTANTIATE_TEST_SUITE_P(SubscriptionImplTest, SubscriptionImplTest, testing::ValuesIn(impls)); +INSTANTIATE_TEST_SUITE_P(SubscriptionImplTest, SubscriptionImplInitFetchTimeoutTest, + testing::ValuesIn(impls)); // Validate basic request-response succeeds. TEST_P(SubscriptionImplTest, InitialRequestResponse) { @@ -117,6 +142,37 @@ TEST_P(SubscriptionImplTest, UpdateResources) { verifyStats(3, 1, 0, 0, 7148434200721666028); } +// Validate that initial fetch timer is created and calls callback on timeout +TEST_P(SubscriptionImplInitFetchTimeoutTest, InitialFetchTimeout) { + InSequence s; + expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds(1000)); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + expectConfigUpdateFailed(); + callInitFetchTimeoutCb(); + verifyStats(1, 0, 0, 0, 0); +} + +// Validate that initial fetch timer is disabled on config update +TEST_P(SubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnSuccess) { + InSequence s; + expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds(1000)); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + expectDisableInitFetchTimeoutTimer(); + deliverConfigUpdate({"cluster0", "cluster1"}, "0", true); +} + +// Validate that initial fetch timer is disabled on config update failed +TEST_P(SubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnFail) { + InSequence s; + expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds(1000)); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + expectDisableInitFetchTimeoutTimer(); + deliverConfigUpdate({"cluster0", "cluster1"}, "0", false); +} + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/common/config/subscription_test_harness.h b/test/common/config/subscription_test_harness.h index 1b50774a256e..f57d4eb136b9 100644 --- a/test/common/config/subscription_test_harness.h +++ b/test/common/config/subscription_test_harness.h @@ -62,6 +62,14 @@ class SubscriptionTestHarness { EXPECT_EQ(connected_state, stats_store_.boolIndicator("control_plane.connected_state").value()); } + virtual void expectConfigUpdateFailed() PURE; + + virtual void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) PURE; + + virtual void expectDisableInitFetchTimeoutTimer() PURE; + + virtual void callInitFetchTimeoutCb() PURE; + Stats::IsolatedStoreImpl stats_store_; SubscriptionStats stats_; }; From 655ca74f8bd3b5baec772b413f2976ef5be7f33d Mon Sep 17 00:00:00 2001 From: Marcin Falkowski Date: Wed, 13 Mar 2019 18:01:00 +0100 Subject: [PATCH 5/6] CI fix Signed-off-by: Marcin Falkowski --- test/common/config/subscription_impl_test.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/common/config/subscription_impl_test.cc b/test/common/config/subscription_impl_test.cc index 5a733be4c52e..35b1f819acb6 100644 --- a/test/common/config/subscription_impl_test.cc +++ b/test/common/config/subscription_impl_test.cc @@ -80,11 +80,11 @@ class SubscriptionImplInitFetchTimeoutTest : public SubscriptionImplTest { SubscriptionImplInitFetchTimeoutTest() : SubscriptionImplTest(std::chrono::milliseconds(1000)) {} }; -const auto impls = {SubscriptionType::Grpc, SubscriptionType::DeltaGrpc, SubscriptionType::Http, - SubscriptionType::Filesystem}; -INSTANTIATE_TEST_SUITE_P(SubscriptionImplTest, SubscriptionImplTest, testing::ValuesIn(impls)); +SubscriptionType types[] = {SubscriptionType::Grpc, SubscriptionType::DeltaGrpc, + SubscriptionType::Http, SubscriptionType::Filesystem}; +INSTANTIATE_TEST_SUITE_P(SubscriptionImplTest, SubscriptionImplTest, testing::ValuesIn(types)); INSTANTIATE_TEST_SUITE_P(SubscriptionImplTest, SubscriptionImplInitFetchTimeoutTest, - testing::ValuesIn(impls)); + testing::ValuesIn(types)); // Validate basic request-response succeeds. TEST_P(SubscriptionImplTest, InitialRequestResponse) { From 8c4f3802c796563d74751da3186be2c1507f84e7 Mon Sep 17 00:00:00 2001 From: Marcin Falkowski Date: Thu, 14 Mar 2019 06:38:09 +0100 Subject: [PATCH 6/6] add release notes Signed-off-by: Marcin Falkowski --- docs/root/intro/version_history.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 9da800cccd68..3f6c0c91bc98 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -18,6 +18,7 @@ Version history * config: removed REST_LEGACY as a valid :ref:`ApiType `. * config: finish cluster warming only when a named response i.e. ClusterLoadAssignment associated to the cluster being warmed comes in the EDS response. This is a behavioural change from the current implementation where warming of cluster completes on missing load assignments also. * config: use Envoy cpuset size to set the default number or worker threads if :option:`--cpuset-threads` is enabled. +* config: added support for :ref:`initial_fetch_timeout `. The timeout is disabled by default. * cors: added :ref:`filter_enabled & shadow_enabled RuntimeFractionalPercent flags ` to filter. * ext_authz: added an configurable option to make the gRPC service cross-compatible with V2Alpha. Note that this feature is already deprecated. It should be used for a short time, and only when transitioning from alpha to V2 release version. * ext_authz: migrated from V2alpha to V2 and improved the documentation.