diff --git a/api/envoy/api/v2/core/config_source.proto b/api/envoy/api/v2/core/config_source.proto index 0b5a1ec4bf9e..8b6014dcbf9d 100644 --- a/api/envoy/api/v2/core/config_source.proto +++ b/api/envoy/api/v2/core/config_source.proto @@ -111,4 +111,14 @@ message ConfigSource { // source in the bootstrap configuration is used. AggregatedConfigSource ads = 3; } + + // Optional initialization timeout. + // When this timeout is specified, Envoy will wait no longer than the specified time for first + // config response on this xDS subscription during the :ref:`initialization process + // <arch_overview_initialization>`. After reaching the timeout, Envoy will move to the next + // initialization phase, even if the first config is not delivered yet. The timer is activated + // when the xDS API subscription starts, and is disarmed on first config update or on error. 0 + // means no timeout - Envoy will wait indefinitely for the first xDS config (unless another + // timeout applies). Default 0. + google.protobuf.Duration initial_fetch_timeout = 4; } diff --git a/docs/root/intro/arch_overview/init.rst b/docs/root/intro/arch_overview/init.rst index af0320a57f32..2e05c5a75056 100644 --- a/docs/root/intro/arch_overview/init.rst +++ b/docs/root/intro/arch_overview/init.rst @@ -1,3 +1,5 @@ +.. _arch_overview_initialization: + Initialization ============== @@ -22,3 +24,6 @@ accepting new connections. * After all of the previous steps have taken place, the listeners start accepting new connections. This flow ensures that during hot restart the new process is fully capable of accepting and processing new connections before the draining of the old process begins. + +All mentioned "waiting for one response" periods can be limited by setting corresponding +:ref:`initial_fetch_timeout <envoy_api_field_core.ConfigSource.initial_fetch_timeout>`. diff --git a/docs/root/intro/version_history.rst b/docs/root/intro/version_history.rst index 65f0c2fc0b0c..52bf50397987 100644 --- a/docs/root/intro/version_history.rst +++ b/docs/root/intro/version_history.rst @@ -18,6 +18,7 @@ Version history * config: removed REST_LEGACY as a valid :ref:`ApiType <envoy_api_field_core.ApiConfigSource.api_type>`. * config: finish cluster warming only when a named response i.e. ClusterLoadAssignment associated to the cluster being warmed comes in the EDS response. This is a behavioural change from the current implementation where warming of cluster completes on missing load assignments also. * config: use Envoy cpuset size to set the default number or worker threads if :option:`--cpuset-threads` is enabled. +* config: added support for :ref:`initial_fetch_timeout <envoy_api_field_core.ConfigSource.initial_fetch_timeout>`. The timeout is disabled by default. * cors: added :ref:`filter_enabled & shadow_enabled RuntimeFractionalPercent flags <cors-runtime>` to filter. * ext_authz: added an configurable option to make the gRPC service cross-compatible with V2Alpha. Note that this feature is already deprecated. It should be used for a short time, and only when transitioning from alpha to V2 release version. * ext_authz: migrated from V2alpha to V2 and improved the documentation. diff --git a/source/common/config/delta_subscription_impl.h b/source/common/config/delta_subscription_impl.h index 5164d284b6a4..5afdc83a2dd3 100644 --- a/source/common/config/delta_subscription_impl.h +++ b/source/common/config/delta_subscription_impl.h @@ -40,12 +40,14 @@ class DeltaSubscriptionImpl Event::Dispatcher& dispatcher, const Protobuf::MethodDescriptor& service_method, Runtime::RandomGenerator& random, Stats::Scope& scope, - const RateLimitSettings& rate_limit_settings, SubscriptionStats stats) + const RateLimitSettings& rate_limit_settings, SubscriptionStats stats, + std::chrono::milliseconds init_fetch_timeout) : GrpcStream<envoy::api::v2::DeltaDiscoveryRequest, envoy::api::v2::DeltaDiscoveryResponse, ResourceNameDiff>(std::move(async_client), service_method, random, dispatcher, scope, rate_limit_settings), type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())), - local_info_(local_info), stats_(stats) { + local_info_(local_info), stats_(stats), dispatcher_(dispatcher), + init_fetch_timeout_(init_fetch_timeout) { request_.set_type_url(type_url_); request_.mutable_node()->MergeFrom(local_info_.node()); } @@ -134,6 +136,7 @@ class DeltaSubscriptionImpl void handleResponse(std::unique_ptr<envoy::api::v2::DeltaDiscoveryResponse>&& message) override { ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url_, message->system_version_info()); + disableInitFetchTimeoutTimer(); request_.set_response_nonce(message->nonce()); @@ -167,6 +170,7 @@ class DeltaSubscriptionImpl } void handleEstablishmentFailure() override { + disableInitFetchTimeoutTimer(); stats_.update_failure_.inc(); ENVOY_LOG(debug, "delta update for {} failed", type_url_); stats_.update_attempt_.inc(); @@ -177,6 +181,15 @@ class DeltaSubscriptionImpl void start(const std::vector<std::string>& resources, SubscriptionCallbacks<ResourceType>& callbacks) override { callbacks_ = &callbacks; + + if (init_fetch_timeout_.count() > 0) { + init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { + ENVOY_LOG(warn, "delta config: initial fetch timed out for {}", type_url_); + callbacks_->onConfigUpdateFailed(nullptr); + }); + init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_); + } + establishNewStream(); subscribe(resources); // The attempt stat here is maintained for the purposes of having consistency between ADS and @@ -191,6 +204,12 @@ class DeltaSubscriptionImpl } private: + void disableInitFetchTimeoutTimer() { + if (init_fetch_timeout_timer_) { + init_fetch_timeout_timer_->disableTimer(); + init_fetch_timeout_timer_.reset(); + } + } // A map from resource name to per-resource version. std::unordered_map<std::string, std::string> resources_; // The keys of resources_. Only tracked separately because std::map does not provide an iterator @@ -207,6 +226,9 @@ class DeltaSubscriptionImpl const LocalInfo::LocalInfo& local_info_; SubscriptionStats stats_; + Event::Dispatcher& dispatcher_; + std::chrono::milliseconds init_fetch_timeout_; + Event::TimerPtr init_fetch_timeout_timer_; }; } // namespace Config diff --git a/source/common/config/grpc_mux_subscription_impl.h b/source/common/config/grpc_mux_subscription_impl.h index 41b67497381a..1da9a2f6a891 100644 --- a/source/common/config/grpc_mux_subscription_impl.h +++ b/source/common/config/grpc_mux_subscription_impl.h @@ -21,14 +21,25 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>, GrpcMuxCallbacks, Logger::Loggable<Logger::Id::config> { public: - GrpcMuxSubscriptionImpl(GrpcMux& grpc_mux, SubscriptionStats stats) + GrpcMuxSubscriptionImpl(GrpcMux& grpc_mux, SubscriptionStats stats, Event::Dispatcher& dispatcher, + std::chrono::milliseconds init_fetch_timeout) : grpc_mux_(grpc_mux), stats_(stats), - type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())) {} + type_url_(Grpc::Common::typeUrl(ResourceType().GetDescriptor()->full_name())), + dispatcher_(dispatcher), init_fetch_timeout_(init_fetch_timeout) {} // Config::Subscription void start(const std::vector<std::string>& resources, SubscriptionCallbacks<ResourceType>& callbacks) override { callbacks_ = &callbacks; + + if (init_fetch_timeout_.count() > 0) { + init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { + ENVOY_LOG(warn, "gRPC config: initial fetch timed out for {}", type_url_); + callbacks_->onConfigUpdateFailed(nullptr); + }); + init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_); + } + watch_ = grpc_mux_.subscribe(type_url_, resources, *this); // The attempt stat here is maintained for the purposes of having consistency between ADS and // gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an @@ -44,6 +55,7 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>, // Config::GrpcMuxCallbacks void onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources, const std::string& version_info) override { + disableInitFetchTimeoutTimer(); Protobuf::RepeatedPtrField<ResourceType> typed_resources; std::transform(resources.cbegin(), resources.cend(), Protobuf::RepeatedPtrFieldBackInserter(&typed_resources), @@ -62,6 +74,7 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>, } void onConfigUpdateFailed(const EnvoyException* e) override { + disableInitFetchTimeoutTimer(); // TODO(htuch): Less fragile signal that this is failure vs. reject. if (e == nullptr) { stats_.update_failure_.inc(); @@ -79,11 +92,21 @@ class GrpcMuxSubscriptionImpl : public Subscription<ResourceType>, } private: + void disableInitFetchTimeoutTimer() { + if (init_fetch_timeout_timer_) { + init_fetch_timeout_timer_->disableTimer(); + init_fetch_timeout_timer_.reset(); + } + } + GrpcMux& grpc_mux_; SubscriptionStats stats_; const std::string type_url_; SubscriptionCallbacks<ResourceType>* callbacks_{}; GrpcMuxWatchPtr watch_{}; + Event::Dispatcher& dispatcher_; + std::chrono::milliseconds init_fetch_timeout_; + Event::TimerPtr init_fetch_timeout_timer_; }; } // namespace Config diff --git a/source/common/config/grpc_subscription_impl.h b/source/common/config/grpc_subscription_impl.h index aedbf5adc756..1a0d289bd187 100644 --- a/source/common/config/grpc_subscription_impl.h +++ b/source/common/config/grpc_subscription_impl.h @@ -18,10 +18,11 @@ class GrpcSubscriptionImpl : public Config::Subscription<ResourceType> { GrpcSubscriptionImpl(const LocalInfo::LocalInfo& local_info, Grpc::AsyncClientPtr async_client, Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random, const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats, - Stats::Scope& scope, const RateLimitSettings& rate_limit_settings) + Stats::Scope& scope, const RateLimitSettings& rate_limit_settings, + std::chrono::milliseconds init_fetch_timeout) : grpc_mux_(local_info, std::move(async_client), dispatcher, service_method, random, scope, rate_limit_settings), - grpc_mux_subscription_(grpc_mux_, stats) {} + grpc_mux_subscription_(grpc_mux_, stats, dispatcher, init_fetch_timeout) {} // Config::Subscription void start(const std::vector<std::string>& resources, diff --git a/source/common/config/http_subscription_impl.h b/source/common/config/http_subscription_impl.h index b6ac42863bce..702b0335c3f0 100644 --- a/source/common/config/http_subscription_impl.h +++ b/source/common/config/http_subscription_impl.h @@ -36,10 +36,11 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, const std::string& remote_cluster_name, Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random, std::chrono::milliseconds refresh_interval, std::chrono::milliseconds request_timeout, - const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats) + const Protobuf::MethodDescriptor& service_method, SubscriptionStats stats, + std::chrono::milliseconds init_fetch_timeout) : Http::RestApiFetcher(cm, remote_cluster_name, dispatcher, random, refresh_interval, request_timeout), - stats_(stats) { + stats_(stats), dispatcher_(dispatcher), init_fetch_timeout_(init_fetch_timeout) { request_.mutable_node()->CopyFrom(local_info.node()); ASSERT(service_method.options().HasExtension(google::api::http)); const auto& http_rule = service_method.options().GetExtension(google::api::http); @@ -51,6 +52,15 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, void start(const std::vector<std::string>& resources, Config::SubscriptionCallbacks<ResourceType>& callbacks) override { ASSERT(callbacks_ == nullptr); + + if (init_fetch_timeout_.count() > 0) { + init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { + ENVOY_LOG(warn, "REST config: initial fetch timed out for", path_); + callbacks_->onConfigUpdateFailed(nullptr); + }); + init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_); + } + Protobuf::RepeatedPtrField<ProtobufTypes::String> resources_vector(resources.begin(), resources.end()); request_.mutable_resource_names()->Swap(&resources_vector); @@ -78,6 +88,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, } void parseResponse(const Http::Message& response) override { + disableInitFetchTimeoutTimer(); envoy::api::v2::DiscoveryResponse message; const auto status = Protobuf::util::JsonStringToMessage(response.bodyAsString(), &message); if (!status.ok()) { @@ -101,6 +112,7 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, void onFetchComplete() override {} void onFetchFailure(const EnvoyException* e) override { + disableInitFetchTimeoutTimer(); ENVOY_LOG(warn, "REST config update failed: {}", e != nullptr ? e->what() : "fetch failure"); handleFailure(e); } @@ -111,11 +123,21 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher, callbacks_->onConfigUpdateFailed(e); } + void disableInitFetchTimeoutTimer() { + if (init_fetch_timeout_timer_) { + init_fetch_timeout_timer_->disableTimer(); + init_fetch_timeout_timer_.reset(); + } + } + std::string path_; Protobuf::RepeatedPtrField<ProtobufTypes::String> resources_; Config::SubscriptionCallbacks<ResourceType>* callbacks_{}; envoy::api::v2::DiscoveryRequest request_; SubscriptionStats stats_; + Event::Dispatcher& dispatcher_; + std::chrono::milliseconds init_fetch_timeout_; + Event::TimerPtr init_fetch_timeout_timer_; }; } // namespace Config diff --git a/source/common/config/subscription_factory.h b/source/common/config/subscription_factory.h index 4362291ab7a6..36857024f530 100644 --- a/source/common/config/subscription_factory.h +++ b/source/common/config/subscription_factory.h @@ -66,7 +66,8 @@ class SubscriptionFactory { local_info, cm, api_config_source.cluster_names()[0], dispatcher, random, Utility::apiConfigSourceRefreshDelay(api_config_source), Utility::apiConfigSourceRequestTimeout(api_config_source), - *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(rest_method), stats)); + *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(rest_method), stats, + Utility::configSourceInitialFetchTimeout(config))); break; case envoy::api::v2::core::ApiConfigSource::GRPC: result.reset(new GrpcSubscriptionImpl<ResourceType>( @@ -76,7 +77,8 @@ class SubscriptionFactory { ->create(), dispatcher, random, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), stats, - scope, Utility::parseRateLimitSettings(api_config_source))); + scope, Utility::parseRateLimitSettings(api_config_source), + Utility::configSourceInitialFetchTimeout(config))); break; case envoy::api::v2::core::ApiConfigSource::DELTA_GRPC: { Utility::checkApiConfigSourceSubscriptionBackingCluster(cm.clusters(), api_config_source); @@ -86,7 +88,8 @@ class SubscriptionFactory { api_config_source, scope) ->create(), dispatcher, *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(grpc_method), - random, scope, Utility::parseRateLimitSettings(api_config_source), stats)); + random, scope, Utility::parseRateLimitSettings(api_config_source), stats, + Utility::configSourceInitialFetchTimeout(config))); break; } default: @@ -95,7 +98,8 @@ class SubscriptionFactory { break; } case envoy::api::v2::core::ConfigSource::kAds: { - result.reset(new GrpcMuxSubscriptionImpl<ResourceType>(cm.adsMux(), stats)); + result.reset(new GrpcMuxSubscriptionImpl<ResourceType>( + cm.adsMux(), stats, dispatcher, Utility::configSourceInitialFetchTimeout(config))); break; } default: diff --git a/source/common/config/utility.cc b/source/common/config/utility.cc index d8b11b71d97f..225e53ecdd84 100644 --- a/source/common/config/utility.cc +++ b/source/common/config/utility.cc @@ -178,6 +178,12 @@ std::chrono::milliseconds Utility::apiConfigSourceRequestTimeout( PROTOBUF_GET_MS_OR_DEFAULT(api_config_source, request_timeout, 1000)); } +std::chrono::milliseconds +Utility::configSourceInitialFetchTimeout(const envoy::api::v2::core::ConfigSource& config_source) { + return std::chrono::milliseconds( + PROTOBUF_GET_MS_OR_DEFAULT(config_source, initial_fetch_timeout, 0)); +} + void Utility::translateCdsConfig(const Json::Object& json_config, envoy::api::v2::core::ConfigSource& cds_config) { translateApiConfigSource(json_config.getObject("cluster")->getString("name"), diff --git a/source/common/config/utility.h b/source/common/config/utility.h index 60c36db3197c..5dacc67bfa7a 100644 --- a/source/common/config/utility.h +++ b/source/common/config/utility.h @@ -104,6 +104,14 @@ class Utility { static std::chrono::milliseconds apiConfigSourceRequestTimeout(const envoy::api::v2::core::ApiConfigSource& api_config_source); + /** + * Extract initial_fetch_timeout as a std::chrono::milliseconds from + * envoy::api::v2::core::ConfigSource. If request_timeout isn't set in the config source, a + * default value of 0s will be returned. + */ + static std::chrono::milliseconds + configSourceInitialFetchTimeout(const envoy::api::v2::core::ConfigSource& config_source); + /** * Populate an envoy::api::v2::core::ApiConfigSource. * @param cluster supplies the cluster name for the ApiConfigSource. diff --git a/test/common/config/BUILD b/test/common/config/BUILD index 8f2ce71b9203..f3f853f6856e 100644 --- a/test/common/config/BUILD +++ b/test/common/config/BUILD @@ -87,6 +87,20 @@ envoy_cc_test_library( ], ) +envoy_cc_test_library( + name = "delta_subscription_test_harness", + hdrs = ["delta_subscription_test_harness.h"], + deps = [ + ":subscription_test_harness", + "//source/common/config:delta_subscription_lib", + "//test/mocks/config:config_mocks", + "//test/mocks/event:event_mocks", + "//test/mocks/grpc:grpc_mocks", + "//test/mocks/local_info:local_info_mocks", + "//test/mocks/runtime:runtime_mocks", + ], +) + envoy_cc_test( name = "http_subscription_impl_test", srcs = ["http_subscription_impl_test.cc"], @@ -138,6 +152,7 @@ envoy_cc_test( name = "subscription_impl_test", srcs = ["subscription_impl_test.cc"], deps = [ + ":delta_subscription_test_harness", ":filesystem_subscription_test_harness", ":grpc_subscription_test_harness", ":http_subscription_test_harness", diff --git a/test/common/config/delta_subscription_test_harness.h b/test/common/config/delta_subscription_test_harness.h new file mode 100644 index 000000000000..04ed8d553e4c --- /dev/null +++ b/test/common/config/delta_subscription_test_harness.h @@ -0,0 +1,153 @@ +#pragma once + +#include "common/config/delta_subscription_impl.h" + +#include "test/common/config/subscription_test_harness.h" +#include "test/mocks/config/mocks.h" +#include "test/mocks/event/mocks.h" +#include "test/mocks/grpc/mocks.h" +#include "test/mocks/local_info/mocks.h" +#include "test/mocks/runtime/mocks.h" +#include "test/mocks/stats/mocks.h" + +#include "gmock/gmock.h" +#include "gtest/gtest.h" + +using testing::InSequence; +using testing::Mock; +using testing::NiceMock; +using testing::Return; + +namespace Envoy { +namespace Config { +namespace { + +typedef DeltaSubscriptionImpl<envoy::api::v2::ClusterLoadAssignment> DeltaEdsSubscriptionImpl; + +class DeltaSubscriptionTestHarness : public SubscriptionTestHarness { +public: + DeltaSubscriptionTestHarness() : DeltaSubscriptionTestHarness(std::chrono::milliseconds(0)) {} + DeltaSubscriptionTestHarness(std::chrono::milliseconds init_fetch_timeout) + : method_descriptor_(Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.api.v2.EndpointDiscoveryService.StreamEndpoints")), + async_client_(new Grpc::MockAsyncClient()) { + node_.set_id("fo0"); + EXPECT_CALL(local_info_, node()).WillRepeatedly(testing::ReturnRef(node_)); + EXPECT_CALL(dispatcher_, createTimer_(_)); + subscription_ = std::make_unique<DeltaEdsSubscriptionImpl>( + local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_, + *method_descriptor_, random_, stats_store_, rate_limit_settings_, stats_, + init_fetch_timeout); + } + + void startSubscription(const std::vector<std::string>& cluster_names) override { + EXPECT_CALL(*async_client_, start(_, _)).WillOnce(Return(&async_stream_)); + last_cluster_names_ = cluster_names; + expectSendMessage({}, ""); + expectSendMessage(last_cluster_names_, ""); + subscription_->start(cluster_names, callbacks_); + } + + void expectSendMessage(const std::vector<std::string>& cluster_names, + const std::string& version) override { + UNREFERENCED_PARAMETER(version); + expectSendMessage(cluster_names, {}, Grpc::Status::GrpcStatus::Ok, ""); + } + + void expectSendMessage(const std::vector<std::string>& subscribe, + const std::vector<std::string>& unsubscribe, + const Protobuf::int32 error_code, const std::string& error_message) { + envoy::api::v2::DeltaDiscoveryRequest expected_request; + expected_request.mutable_node()->CopyFrom(node_); + for (const auto& resource : subscribe) { + expected_request.add_resource_names_subscribe(resource); + } + for (auto resource = unsubscribe.rbegin(); resource != unsubscribe.rend(); ++resource) { + expected_request.add_resource_names_unsubscribe(*resource); + } + expected_request.set_response_nonce(last_response_nonce_); + expected_request.set_type_url(Config::TypeUrl::get().ClusterLoadAssignment); + + if (error_code != Grpc::Status::GrpcStatus::Ok) { + ::google::rpc::Status* error_detail = expected_request.mutable_error_detail(); + error_detail->set_code(error_code); + error_detail->set_message(error_message); + } + EXPECT_CALL(async_stream_, sendMessage(ProtoEq(expected_request), false)); + } + + void deliverConfigUpdate(const std::vector<std::string>& cluster_names, + const std::string& version, bool accept) override { + std::unique_ptr<envoy::api::v2::DeltaDiscoveryResponse> response( + new envoy::api::v2::DeltaDiscoveryResponse()); + + last_response_nonce_ = std::to_string(HashUtil::xxHash64(version)); + response->set_nonce(last_response_nonce_); + response->set_system_version_info(version); + + Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> typed_resources; + for (const auto& cluster : cluster_names) { + if (std::find(last_cluster_names_.begin(), last_cluster_names_.end(), cluster) != + last_cluster_names_.end()) { + envoy::api::v2::ClusterLoadAssignment* load_assignment = typed_resources.Add(); + load_assignment->set_cluster_name(cluster); + auto* resource = response->add_resources(); + resource->set_name(cluster); + resource->set_version(version); + resource->mutable_resource()->PackFrom(*load_assignment); + } + } + Protobuf::RepeatedPtrField<std::string> removed_resources; + EXPECT_CALL(callbacks_, onConfigUpdate(_, _, version)).WillOnce(ThrowOnRejectedConfig(accept)); + if (accept) { + expectSendMessage({}, version); + } else { + EXPECT_CALL(callbacks_, onConfigUpdateFailed(_)); + expectSendMessage({}, {}, Grpc::Status::GrpcStatus::Internal, "bad config"); + } + subscription_->onReceiveMessage(std::move(response)); + Mock::VerifyAndClearExpectations(&async_stream_); + } + + void updateResources(const std::vector<std::string>& cluster_names) override { + std::vector<std::string> cluster_superset = cluster_names; + cluster_superset.insert(cluster_superset.end(), last_cluster_names_.begin(), + last_cluster_names_.end()); + expectSendMessage(cluster_names, last_cluster_names_, Grpc::Status::GrpcStatus::Ok, ""); + subscription_->updateResources(cluster_names); + last_cluster_names_ = cluster_names; + } + + void expectConfigUpdateFailed() override { + EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); + } + + void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) override { + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout))); + } + + void expectDisableInitFetchTimeoutTimer() override { + EXPECT_CALL(*init_timeout_timer_, disableTimer()); + } + + void callInitFetchTimeoutCb() override { init_timeout_timer_->callback_(); } + + const Protobuf::MethodDescriptor* method_descriptor_; + Grpc::MockAsyncClient* async_client_; + Event::MockDispatcher dispatcher_; + Runtime::MockRandomGenerator random_; + NiceMock<LocalInfo::MockLocalInfo> local_info_; + Grpc::MockAsyncStream async_stream_; + std::unique_ptr<DeltaEdsSubscriptionImpl> subscription_; + std::string last_response_nonce_; + std::vector<std::string> last_cluster_names_; + Envoy::Config::RateLimitSettings rate_limit_settings_; + Event::MockTimer* init_timeout_timer_; + envoy::api::v2::core::Node node_; + NiceMock<Config::MockSubscriptionCallbacks<envoy::api::v2::ClusterLoadAssignment>> callbacks_; +}; + +} // namespace +} // namespace Config +} // namespace Envoy \ No newline at end of file diff --git a/test/common/config/filesystem_subscription_test_harness.h b/test/common/config/filesystem_subscription_test_harness.h index e782dc518955..6e606c9a2d2a 100644 --- a/test/common/config/filesystem_subscription_test_harness.h +++ b/test/common/config/filesystem_subscription_test_harness.h @@ -33,7 +33,11 @@ class FilesystemSubscriptionTestHarness : public SubscriptionTestHarness { api_(Api::createApiForTest(stats_store_)), dispatcher_(api_->allocateDispatcher()), subscription_(*dispatcher_, path_, stats_, *api_) {} - ~FilesystemSubscriptionTestHarness() { EXPECT_EQ(0, ::unlink(path_.c_str())); } + ~FilesystemSubscriptionTestHarness() { + if (::access(path_.c_str(), F_OK) != -1) { + EXPECT_EQ(0, ::unlink(path_.c_str())); + } + } void startSubscription(const std::vector<std::string>& cluster_names) override { std::ifstream config_file(path_); @@ -95,6 +99,23 @@ class FilesystemSubscriptionTestHarness : public SubscriptionTestHarness { failure + (file_at_start_ ? 0 : 1), version); } + void expectConfigUpdateFailed() override { + // initial_fetch_timeout not implemented + } + + void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) override { + UNREFERENCED_PARAMETER(timeout); + // initial_fetch_timeout not implemented + } + + void expectDisableInitFetchTimeoutTimer() override { + // initial_fetch_timeout not implemented + } + + void callInitFetchTimeoutCb() override { + // initial_fetch_timeout not implemented + } + const std::string path_; std::string version_; Stats::IsolatedStoreImpl stats_store_; diff --git a/test/common/config/grpc_subscription_test_harness.h b/test/common/config/grpc_subscription_test_harness.h index c6316aab957c..a01b4588be39 100644 --- a/test/common/config/grpc_subscription_test_harness.h +++ b/test/common/config/grpc_subscription_test_harness.h @@ -32,7 +32,9 @@ typedef GrpcSubscriptionImpl<envoy::api::v2::ClusterLoadAssignment> GrpcEdsSubsc class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { public: - GrpcSubscriptionTestHarness() + GrpcSubscriptionTestHarness() : GrpcSubscriptionTestHarness(std::chrono::milliseconds(0)) {} + + GrpcSubscriptionTestHarness(std::chrono::milliseconds init_fetch_timeout) : method_descriptor_(Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.api.v2.EndpointDiscoveryService.StreamEndpoints")), async_client_(new Grpc::MockAsyncClient()), timer_(new Event::MockTimer()) { @@ -44,7 +46,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { })); subscription_ = std::make_unique<GrpcEdsSubscriptionImpl>( local_info_, std::unique_ptr<Grpc::MockAsyncClient>(async_client_), dispatcher_, random_, - *method_descriptor_, stats_, stats_store_, rate_limit_settings_); + *method_descriptor_, stats_, stats_store_, rate_limit_settings_, init_fetch_timeout); } ~GrpcSubscriptionTestHarness() { EXPECT_CALL(async_stream_, sendMessage(_, false)); } @@ -128,6 +130,21 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { last_cluster_names_ = cluster_names; } + void expectConfigUpdateFailed() override { + EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); + } + + void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) override { + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout))); + } + + void expectDisableInitFetchTimeoutTimer() override { + EXPECT_CALL(*init_timeout_timer_, disableTimer()); + } + + void callInitFetchTimeoutCb() override { init_timeout_timer_->callback_(); } + std::string version_; const Protobuf::MethodDescriptor* method_descriptor_; Grpc::MockAsyncClient* async_client_; @@ -144,6 +161,7 @@ class GrpcSubscriptionTestHarness : public SubscriptionTestHarness { std::vector<std::string> last_cluster_names_; NiceMock<LocalInfo::MockLocalInfo> local_info_; Envoy::Config::RateLimitSettings rate_limit_settings_; + Event::MockTimer* init_timeout_timer_; }; // TODO(danielhochman): test with RDS and ensure version_info is same as what API returned diff --git a/test/common/config/http_subscription_test_harness.h b/test/common/config/http_subscription_test_harness.h index 762f28454b25..379218ced1fc 100644 --- a/test/common/config/http_subscription_test_harness.h +++ b/test/common/config/http_subscription_test_harness.h @@ -33,7 +33,9 @@ typedef HttpSubscriptionImpl<envoy::api::v2::ClusterLoadAssignment> HttpEdsSubsc class HttpSubscriptionTestHarness : public SubscriptionTestHarness { public: - HttpSubscriptionTestHarness() + HttpSubscriptionTestHarness() : HttpSubscriptionTestHarness(std::chrono::milliseconds(0)) {} + + HttpSubscriptionTestHarness(std::chrono::milliseconds init_fetch_timeout) : method_descriptor_(Protobuf::DescriptorPool::generated_pool()->FindMethodByName( "envoy.api.v2.EndpointDiscoveryService.FetchEndpoints")), timer_(new Event::MockTimer()), http_request_(&cm_.async_client_) { @@ -45,7 +47,7 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { })); subscription_ = std::make_unique<HttpEdsSubscriptionImpl>( local_info_, cm_, "eds_cluster", dispatcher_, random_gen_, std::chrono::milliseconds(1), - std::chrono::milliseconds(1000), *method_descriptor_, stats_); + std::chrono::milliseconds(1000), *method_descriptor_, stats_, init_fetch_timeout); } ~HttpSubscriptionTestHarness() { @@ -136,6 +138,21 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { timerTick(); } + void expectConfigUpdateFailed() override { + EXPECT_CALL(callbacks_, onConfigUpdateFailed(nullptr)); + } + + void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) override { + init_timeout_timer_ = new Event::MockTimer(&dispatcher_); + EXPECT_CALL(*init_timeout_timer_, enableTimer(std::chrono::milliseconds(timeout))); + } + + void expectDisableInitFetchTimeoutTimer() override { + EXPECT_CALL(*init_timeout_timer_, disableTimer()); + } + + void callInitFetchTimeoutCb() override { init_timeout_timer_->callback_(); } + void timerTick() { expectSendMessage(cluster_names_, version_); timer_cb_(); @@ -156,6 +173,7 @@ class HttpSubscriptionTestHarness : public SubscriptionTestHarness { Config::MockSubscriptionCallbacks<envoy::api::v2::ClusterLoadAssignment> callbacks_; std::unique_ptr<HttpEdsSubscriptionImpl> subscription_; NiceMock<LocalInfo::MockLocalInfo> local_info_; + Event::MockTimer* init_timeout_timer_; }; } // namespace Config diff --git a/test/common/config/subscription_impl_test.cc b/test/common/config/subscription_impl_test.cc index 85b07333306a..35b1f819acb6 100644 --- a/test/common/config/subscription_impl_test.cc +++ b/test/common/config/subscription_impl_test.cc @@ -1,29 +1,37 @@ #include <memory> +#include "test/common/config/delta_subscription_test_harness.h" #include "test/common/config/filesystem_subscription_test_harness.h" #include "test/common/config/grpc_subscription_test_harness.h" #include "test/common/config/http_subscription_test_harness.h" #include "test/common/config/subscription_test_harness.h" +using testing::InSequence; + namespace Envoy { namespace Config { namespace { enum class SubscriptionType { Grpc, + DeltaGrpc, Http, Filesystem, }; class SubscriptionImplTest : public testing::TestWithParam<SubscriptionType> { public: - SubscriptionImplTest() { + SubscriptionImplTest() : SubscriptionImplTest(std::chrono::milliseconds(0)) {} + SubscriptionImplTest(std::chrono::milliseconds init_fetch_timeout) { switch (GetParam()) { case SubscriptionType::Grpc: - test_harness_ = std::make_unique<GrpcSubscriptionTestHarness>(); + test_harness_ = std::make_unique<GrpcSubscriptionTestHarness>(init_fetch_timeout); + break; + case SubscriptionType::DeltaGrpc: + test_harness_ = std::make_unique<DeltaSubscriptionTestHarness>(init_fetch_timeout); break; case SubscriptionType::Http: - test_harness_ = std::make_unique<HttpSubscriptionTestHarness>(); + test_harness_ = std::make_unique<HttpSubscriptionTestHarness>(init_fetch_timeout); break; case SubscriptionType::Filesystem: test_harness_ = std::make_unique<FilesystemSubscriptionTestHarness>(); @@ -54,12 +62,29 @@ class SubscriptionImplTest : public testing::TestWithParam<SubscriptionType> { test_harness_->deliverConfigUpdate(cluster_names, version, accept); } + void expectConfigUpdateFailed() { test_harness_->expectConfigUpdateFailed(); } + + void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) { + test_harness_->expectEnableInitFetchTimeoutTimer(timeout); + } + + void expectDisableInitFetchTimeoutTimer() { test_harness_->expectDisableInitFetchTimeoutTimer(); } + + void callInitFetchTimeoutCb() { test_harness_->callInitFetchTimeoutCb(); } + std::unique_ptr<SubscriptionTestHarness> test_harness_; }; -INSTANTIATE_TEST_SUITE_P(SubscriptionImplTest, SubscriptionImplTest, - testing::ValuesIn({SubscriptionType::Grpc, SubscriptionType::Http, - SubscriptionType::Filesystem})); +class SubscriptionImplInitFetchTimeoutTest : public SubscriptionImplTest { +public: + SubscriptionImplInitFetchTimeoutTest() : SubscriptionImplTest(std::chrono::milliseconds(1000)) {} +}; + +SubscriptionType types[] = {SubscriptionType::Grpc, SubscriptionType::DeltaGrpc, + SubscriptionType::Http, SubscriptionType::Filesystem}; +INSTANTIATE_TEST_SUITE_P(SubscriptionImplTest, SubscriptionImplTest, testing::ValuesIn(types)); +INSTANTIATE_TEST_SUITE_P(SubscriptionImplTest, SubscriptionImplInitFetchTimeoutTest, + testing::ValuesIn(types)); // Validate basic request-response succeeds. TEST_P(SubscriptionImplTest, InitialRequestResponse) { @@ -117,6 +142,37 @@ TEST_P(SubscriptionImplTest, UpdateResources) { verifyStats(3, 1, 0, 0, 7148434200721666028); } +// Validate that initial fetch timer is created and calls callback on timeout +TEST_P(SubscriptionImplInitFetchTimeoutTest, InitialFetchTimeout) { + InSequence s; + expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds(1000)); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + expectConfigUpdateFailed(); + callInitFetchTimeoutCb(); + verifyStats(1, 0, 0, 0, 0); +} + +// Validate that initial fetch timer is disabled on config update +TEST_P(SubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnSuccess) { + InSequence s; + expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds(1000)); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + expectDisableInitFetchTimeoutTimer(); + deliverConfigUpdate({"cluster0", "cluster1"}, "0", true); +} + +// Validate that initial fetch timer is disabled on config update failed +TEST_P(SubscriptionImplInitFetchTimeoutTest, DisableInitTimeoutOnFail) { + InSequence s; + expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds(1000)); + startSubscription({"cluster0", "cluster1"}); + verifyStats(1, 0, 0, 0, 0); + expectDisableInitFetchTimeoutTimer(); + deliverConfigUpdate({"cluster0", "cluster1"}, "0", false); +} + } // namespace } // namespace Config } // namespace Envoy diff --git a/test/common/config/subscription_test_harness.h b/test/common/config/subscription_test_harness.h index 0df305a00f58..5a62f6d861d2 100644 --- a/test/common/config/subscription_test_harness.h +++ b/test/common/config/subscription_test_harness.h @@ -62,6 +62,14 @@ class SubscriptionTestHarness { EXPECT_EQ(connected_state, stats_store_.gauge("control_plane.connected_state").value()); } + virtual void expectConfigUpdateFailed() PURE; + + virtual void expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds timeout) PURE; + + virtual void expectDisableInitFetchTimeoutTimer() PURE; + + virtual void callInitFetchTimeoutCb() PURE; + Stats::IsolatedStoreImpl stats_store_; SubscriptionStats stats_; }; diff --git a/test/common/config/utility_test.cc b/test/common/config/utility_test.cc index 9c4ab302f695..a2cd07509b6b 100644 --- a/test/common/config/utility_test.cc +++ b/test/common/config/utility_test.cc @@ -80,6 +80,18 @@ TEST(UtilityTest, ApiConfigSourceRequestTimeout) { EXPECT_EQ(1234, Utility::apiConfigSourceRequestTimeout(api_config_source).count()); } +TEST(UtilityTest, ConfigSourceDefaultInitFetchTimeout) { + envoy::api::v2::core::ConfigSource config_source; + EXPECT_EQ(0, Utility::configSourceInitialFetchTimeout(config_source).count()); +} + +TEST(UtilityTest, ConfigSourceInitFetchTimeout) { + envoy::api::v2::core::ConfigSource config_source; + config_source.mutable_initial_fetch_timeout()->CopyFrom( + Protobuf::util::TimeUtil::MillisecondsToDuration(654)); + EXPECT_EQ(654, Utility::configSourceInitialFetchTimeout(config_source).count()); +} + TEST(UtilityTest, TranslateApiConfigSource) { envoy::api::v2::core::ApiConfigSource api_config_source_rest_legacy; Utility::translateApiConfigSource("test_rest_legacy_cluster", 10000,