Skip to content

Commit

Permalink
xDS: gRPC connection failure shouldn't make Envoy continue startup (e…
Browse files Browse the repository at this point in the history
…nvoyproxy#8152)

Currently, if gRPC config stream disconnected while Envoy waiting for
initial xDS response, xDS implementations' onConfigUpdateFailed() will
allow Envoy startup to continue. This may cause Envoy begins taking
traffics while route/cluster/endpoint config are still missing and
return "404 NR" or "503 NR".

This change makes Envoy waiting for initial xDS response until
initial_fetch_timeout if specified.

Risk Level: Medium
Testing: existing test cases updated
Fixes envoyproxy#8046

Signed-off-by: lhuang8 <lhuang8@ebay.com>
  • Loading branch information
l8huang authored and danzh1989 committed Sep 24, 2019
1 parent 0ae0551 commit a68f896
Show file tree
Hide file tree
Showing 26 changed files with 110 additions and 64 deletions.
5 changes: 2 additions & 3 deletions source/common/config/delta_subscription_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,10 @@ void DeltaSubscriptionState::handleBadResponse(const EnvoyException& e, UpdateAc
}

void DeltaSubscriptionState::handleEstablishmentFailure() {
disableInitFetchTimeoutTimer();
// New gRPC stream will be established and send requests again.
// If init_fetch_timeout is non-zero, server will continue startup after it timeout
stats_.update_failure_.inc();
stats_.update_attempt_.inc();
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
nullptr);
}

envoy::api::v2::DeltaDiscoveryRequest DeltaSubscriptionState::getNextRequest() {
Expand Down
7 changes: 7 additions & 0 deletions source/common/config/grpc_mux_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,14 @@ void GrpcMuxSubscriptionImpl::onConfigUpdateFailed(ConfigUpdateFailureReason rea
ENVOY_LOG(warn, "gRPC config for {} rejected: {}", type_url_, e->what());
break;
}

stats_.update_attempt_.inc();
if (reason == Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure) {
// New gRPC stream will be established and send requests again.
// If init_fetch_timeout is non-zero, server will continue startup after it timeout
return;
}

callbacks_.onConfigUpdateFailed(reason, e);
}

Expand Down
50 changes: 34 additions & 16 deletions source/common/config/http_subscription_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ HttpSubscriptionImpl::HttpSubscriptionImpl(
void HttpSubscriptionImpl::start(const std::set<std::string>& resource_names) {
if (init_fetch_timeout_.count() > 0) {
init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
ENVOY_LOG(warn, "REST config: initial fetch timed out for", path_);
stats_.init_fetch_timeout_.inc();
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout,
nullptr);
handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
});
init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
}
Expand Down Expand Up @@ -77,8 +74,7 @@ void HttpSubscriptionImpl::parseResponse(const Http::Message& response) {
try {
MessageUtil::loadFromJson(response.bodyAsString(), message, validation_visitor_);
} catch (const EnvoyException& e) {
ENVOY_LOG(warn, "REST config JSON conversion error: {}", e.what());
handleFailure(nullptr);
handleFailure(Config::ConfigUpdateFailureReason::UpdateRejected, &e);
return;
}
try {
Expand All @@ -87,23 +83,45 @@ void HttpSubscriptionImpl::parseResponse(const Http::Message& response) {
stats_.version_.set(HashUtil::xxHash64(request_.version_info()));
stats_.update_success_.inc();
} catch (const EnvoyException& e) {
ENVOY_LOG(warn, "REST config update rejected: {}", e.what());
stats_.update_rejected_.inc();
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
handleFailure(Config::ConfigUpdateFailureReason::UpdateRejected, &e);
}
}

void HttpSubscriptionImpl::onFetchComplete() {}

void HttpSubscriptionImpl::onFetchFailure(const EnvoyException* e) {
disableInitFetchTimeoutTimer();
ENVOY_LOG(warn, "REST config update failed: {}", e != nullptr ? e->what() : "fetch failure");
handleFailure(e);
void HttpSubscriptionImpl::onFetchFailure(Config::ConfigUpdateFailureReason reason,
const EnvoyException* e) {
handleFailure(reason, e);
}

void HttpSubscriptionImpl::handleFailure(const EnvoyException* e) {
stats_.update_failure_.inc();
callbacks_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, e);
void HttpSubscriptionImpl::handleFailure(Config::ConfigUpdateFailureReason reason,
const EnvoyException* e) {

switch (reason) {
case Config::ConfigUpdateFailureReason::ConnectionFailure:
ENVOY_LOG(warn, "REST update for {} failed", path_);
stats_.update_failure_.inc();
break;
case Config::ConfigUpdateFailureReason::FetchTimedout:
ENVOY_LOG(warn, "REST config: initial fetch timeout for {}", path_);
stats_.init_fetch_timeout_.inc();
disableInitFetchTimeoutTimer();
break;
case Config::ConfigUpdateFailureReason::UpdateRejected:
ASSERT(e != nullptr);
ENVOY_LOG(warn, "REST config for {} rejected: {}", path_, e->what());
stats_.update_rejected_.inc();
disableInitFetchTimeoutTimer();
break;
}

if (reason == Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure) {
// New requests will be sent again.
// If init_fetch_timeout is non-zero, server will continue startup after it timeout
return;
}

callbacks_.onConfigUpdateFailed(reason, e);
}

void HttpSubscriptionImpl::disableInitFetchTimeoutTimer() {
Expand Down
4 changes: 2 additions & 2 deletions source/common/config/http_subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class HttpSubscriptionImpl : public Http::RestApiFetcher,
void createRequest(Http::Message& request) override;
void parseResponse(const Http::Message& response) override;
void onFetchComplete() override;
void onFetchFailure(const EnvoyException* e) override;
void onFetchFailure(Config::ConfigUpdateFailureReason reason, const EnvoyException* e) override;

private:
void handleFailure(const EnvoyException* e);
void handleFailure(Config::ConfigUpdateFailureReason reason, const EnvoyException* e);
void disableInitFetchTimeoutTimer();

std::string path_;
Expand Down
8 changes: 5 additions & 3 deletions source/common/http/rest_api_fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ void RestApiFetcher::onSuccess(Http::MessagePtr&& response) {
try {
parseResponse(*response);
} catch (EnvoyException& e) {
onFetchFailure(&e);
onFetchFailure(Config::ConfigUpdateFailureReason::UpdateRejected, &e);
}

requestComplete();
}

void RestApiFetcher::onFailure(Http::AsyncClient::FailureReason) {
onFetchFailure(nullptr);
void RestApiFetcher::onFailure(Http::AsyncClient::FailureReason reason) {
// Currently Http::AsyncClient::FailureReason only has one value: "Reset".
ASSERT(reason == Http::AsyncClient::FailureReason::Reset);
onFetchFailure(Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr);
requestComplete();
}

Expand Down
5 changes: 4 additions & 1 deletion source/common/http/rest_api_fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <chrono>
#include <string>

#include "envoy/config/subscription.h"
#include "envoy/event/dispatcher.h"
#include "envoy/runtime/runtime.h"
#include "envoy/upstream/cluster_manager.h"
Expand Down Expand Up @@ -46,9 +47,11 @@ class RestApiFetcher : public Http::AsyncClient::Callbacks {

/**
* This will be called if the fetch fails (either due to non-200 response, network error, etc.).
* @param reason supplies the fetch failure reason.
* @param e supplies any exception data on why the fetch failed. May be nullptr.
*/
virtual void onFetchFailure(const EnvoyException* e) PURE;
virtual void onFetchFailure(Config::ConfigUpdateFailureReason reason,
const EnvoyException* e) PURE;

protected:
const std::string remote_cluster_name_;
Expand Down
5 changes: 3 additions & 2 deletions source/common/router/rds_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ void RdsRouteConfigSubscription::onConfigUpdate(
}
}

void RdsRouteConfigSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
const EnvoyException*) {
void RdsRouteConfigSubscription::onConfigUpdateFailed(
Envoy::Config::ConfigUpdateFailureReason reason, const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
init_target_.ready();
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/scoped_rds.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ class ScopedRdsConfigSubscription : public Envoy::Config::DeltaConfigSubscriptio
void onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& version_info) override;
void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) override {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
DeltaConfigSubscriptionInstance::onConfigUpdateFailed();
}
std::string resourceName(const ProtobufWkt::Any& resource) override {
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/vhds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ VhdsSubscription::VhdsSubscription(RouteConfigUpdatePtr& config_update_info,
*scope_, *this);
}

void VhdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
void VhdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
init_target_.ready();
Expand Down
3 changes: 2 additions & 1 deletion source/common/runtime/runtime_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,8 +543,9 @@ void RtdsSubscription::onConfigUpdate(
onConfigUpdate(unwrapped_resource, resources[0].version());
}

void RtdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
void RtdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
init_target_.ready();
Expand Down
4 changes: 3 additions & 1 deletion source/common/secret/sds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ void SdsApi::onConfigUpdate(const Protobuf::RepeatedPtrField<envoy::api::v2::Res
onConfigUpdate(unwrapped_resource, resources[0].version());
}

void SdsApi::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason, const EnvoyException*) {
void SdsApi::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad config.
init_target_.ready();
}
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ void CdsApiImpl::onConfigUpdate(
}
}

void CdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
void CdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
runInitializeCallbackIfAny();
Expand Down
6 changes: 1 addition & 5 deletions source/common/upstream/eds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,7 @@ bool EdsClusterImpl::updateHostsPerLocality(

void EdsClusterImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
// We should not call onPreInitComplete if this method is called because of stream disconnection.
// This might potentially hang the initialization forever, if init_fetch_timeout is disabled.
if (reason == Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure) {
return;
}
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad config.
onPreInitComplete();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ void ClientSslAuthConfig::parseResponse(const Http::Message& message) {
stats_.total_principals_.set(new_principals->size());
}

void ClientSslAuthConfig::onFetchFailure(const EnvoyException*) { stats_.update_failure_.inc(); }
void ClientSslAuthConfig::onFetchFailure(Config::ConfigUpdateFailureReason, const EnvoyException*) {
stats_.update_failure_.inc();
}

static const std::string Path = "/v1/certs/list/approved";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <unordered_set>

#include "envoy/config/filter/network/client_ssl_auth/v2/client_ssl_auth.pb.h"
#include "envoy/config/subscription.h"
#include "envoy/network/filter.h"
#include "envoy/runtime/runtime.h"
#include "envoy/stats/scope.h"
Expand Down Expand Up @@ -94,7 +95,7 @@ class ClientSslAuthConfig : public Http::RestApiFetcher {
void createRequest(Http::Message& request) override;
void parseResponse(const Http::Message& response) override;
void onFetchComplete() override {}
void onFetchFailure(const EnvoyException* e) override;
void onFetchFailure(Config::ConfigUpdateFailureReason reason, const EnvoyException* e) override;

ThreadLocal::SlotPtr tls_;
Network::Address::IpList ip_white_list_;
Expand Down
3 changes: 2 additions & 1 deletion source/server/lds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ void LdsApiImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::An
onConfigUpdate(to_add_repeated, to_remove_repeated, version_info);
}

void LdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason,
void LdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
const EnvoyException*) {
ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
// We need to allow server startup to continue, even if we have a bad
// config.
init_target_.ready();
Expand Down
8 changes: 8 additions & 0 deletions test/common/config/delta_subscription_state_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,14 @@ TEST_F(DeltaSubscriptionStateTest, AddedAndRemoved) {
ack.error_detail_.message());
}

TEST_F(DeltaSubscriptionStateTest, handleEstablishmentFailure) {
EXPECT_CALL(callbacks_, onConfigUpdateFailed(_, _)).Times(0);

state_.handleEstablishmentFailure();
EXPECT_EQ(stats_.update_failure_.value(), 1);
EXPECT_EQ(stats_.update_attempt_.value(), 1);
}

} // namespace
} // namespace Config
} // namespace Envoy
8 changes: 6 additions & 2 deletions test/common/config/grpc_subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ TEST_F(GrpcSubscriptionImplTest, StreamCreationFailure) {
InSequence s;
EXPECT_CALL(*async_client_, startRaw(_, _, _)).WillOnce(Return(nullptr));

// onConfigUpdateFailed() should not be called for gRPC stream connection failure
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _))
.Times(0);
EXPECT_CALL(random_, random());
EXPECT_CALL(*timer_, enableTimer(_, _));
subscription_->start({"cluster0", "cluster1"});
Expand All @@ -38,8 +40,10 @@ TEST_F(GrpcSubscriptionImplTest, StreamCreationFailure) {
TEST_F(GrpcSubscriptionImplTest, RemoteStreamClose) {
startSubscription({"cluster0", "cluster1"});
EXPECT_TRUE(statsAre(1, 0, 0, 0, 0, 0));
// onConfigUpdateFailed() should not be called for gRPC stream connection failure
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _))
.Times(0);
EXPECT_CALL(*timer_, enableTimer(_, _));
EXPECT_CALL(random_, random());
subscription_->grpcMux().grpcStreamForTest().onRemoteClose(Grpc::Status::GrpcStatus::Canceled,
Expand Down
11 changes: 6 additions & 5 deletions test/common/config/http_subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ TEST_F(HttpSubscriptionImplTest, OnRequestReset) {
EXPECT_CALL(random_gen_, random()).WillOnce(Return(0));
EXPECT_CALL(*timer_, enableTimer(_, _));
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _))
.Times(0);
http_callbacks_->onFailure(Http::AsyncClient::FailureReason::Reset);
EXPECT_TRUE(statsAre(1, 0, 0, 1, 0, 0));
timerTick();
Expand All @@ -34,14 +35,14 @@ TEST_F(HttpSubscriptionImplTest, BadJsonRecovery) {
EXPECT_CALL(random_gen_, random()).WillOnce(Return(0));
EXPECT_CALL(*timer_, enableTimer(_, _));
EXPECT_CALL(callbacks_,
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, _));
onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, _));
http_callbacks_->onSuccess(std::move(message));
EXPECT_TRUE(statsAre(1, 0, 0, 1, 0, 0));
EXPECT_TRUE(statsAre(1, 0, 1, 0, 0, 0));
request_in_progress_ = false;
timerTick();
EXPECT_TRUE(statsAre(2, 0, 0, 1, 0, 0));
EXPECT_TRUE(statsAre(2, 0, 1, 0, 0, 0));
deliverConfigUpdate({"cluster0", "cluster1"}, "0", true);
EXPECT_TRUE(statsAre(3, 1, 0, 1, 0, 7148434200721666028));
EXPECT_TRUE(statsAre(3, 1, 1, 0, 0, 7148434200721666028));
}

TEST_F(HttpSubscriptionImplTest, ConfigNotModified) {
Expand Down
3 changes: 2 additions & 1 deletion test/common/config/subscription_factory_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,8 @@ TEST_F(SubscriptionFactoryTest, GrpcSubscription) {
}));
EXPECT_CALL(random_, random());
EXPECT_CALL(dispatcher_, createTimer_(_)).Times(2);
EXPECT_CALL(callbacks_, onConfigUpdateFailed(_, _));
// onConfigUpdateFailed() should not be called for gRPC stream connection failure
EXPECT_CALL(callbacks_, onConfigUpdateFailed(_, _)).Times(0);
subscriptionFromConfigSource(config)->start({"static_cluster"});
}

Expand Down
4 changes: 4 additions & 0 deletions test/common/config/subscription_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ TEST_P(SubscriptionImplInitFetchTimeoutTest, InitialFetchTimeout) {
expectEnableInitFetchTimeoutTimer(std::chrono::milliseconds(1000));
startSubscription({"cluster0", "cluster1"});
statsAre(1, 0, 0, 0, 0, 0);
if (GetParam() == SubscriptionType::Http) {
expectDisableInitFetchTimeoutTimer();
}
expectConfigUpdateFailed();

callInitFetchTimeoutCb();
statsAre(1, 0, 0, 0, 1, 0);
}
Expand Down
4 changes: 2 additions & 2 deletions test/common/router/rds_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ TEST_F(RdsImplTest, FailureSubscription) {
setup();

EXPECT_CALL(init_watcher_, ready());
rds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
{});
// onConfigUpdateFailed() should not be called for gRPC stream connection failure
rds_callbacks_->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, {});
}

class RouteConfigProviderManagerImplTest : public RdsTestBase {
Expand Down
5 changes: 3 additions & 2 deletions test/common/runtime/runtime_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -845,8 +845,9 @@ TEST_F(RtdsLoaderImplTest, FailureSubscription) {
setup();

EXPECT_CALL(init_watcher_, ready());
rtds_callbacks_[0]->onConfigUpdateFailed(
Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, {});
// onConfigUpdateFailed() should not be called for gRPC stream connection failure
rtds_callbacks_[0]->onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout,
{});

EXPECT_EQ(0, store_.counter("runtime.load_error").value());
EXPECT_EQ(1, store_.counter("runtime.load_success").value());
Expand Down
Loading

0 comments on commit a68f896

Please sign in to comment.