Skip to content

Commit

Permalink
router: implement RetryPriority (#4437)
Browse files Browse the repository at this point in the history
This wires up the necessary logic to allow registering a
RetryPriorityFactory that can be used to impact which priority is
selected during host selection for retry attempts.

Signed-off-by: Snow Pettersen snowp@squareup.com

Description:
Risk Level: Low, new optional feautre
Testing: Integration test
Docs Changes: n/a
Release Notes: n/a
Part of #3958

Signed-off-by: Snow Pettersen <snowp@squareup.com>
  • Loading branch information
snowp authored and htuch committed Sep 23, 2018
1 parent 087c1a1 commit e8da6f3
Show file tree
Hide file tree
Showing 16 changed files with 205 additions and 10 deletions.
16 changes: 15 additions & 1 deletion include/envoy/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,11 @@ class RetryPolicy {
*/
virtual std::vector<Upstream::RetryHostPredicateSharedPtr> retryHostPredicates() const PURE;

/**
* @return the RetryPriority to use when determining priority load for retries.
*/
virtual Upstream::RetryPrioritySharedPtr retryPriority() const PURE;

/**
* Number of times host selection should be reattempted when selecting a host
* for a retry attempt.
Expand Down Expand Up @@ -233,7 +238,16 @@ class RetryState {
virtual bool shouldSelectAnotherHost(const Upstream::Host& host) PURE;

/**
* @return how many times host selection should be reattempted during host selection.
* Returns a reference to the PriorityLoad that should be used for the next retry.
* @param priority_set current priority set.
* @param priority_load original priority load.
* @return PriorityLoad that should be used to select a priority for the next retry.
*/
virtual const Upstream::PriorityLoad&
priorityLoadForRetry(const Upstream::PrioritySet& priority_set,
const Upstream::PriorityLoad& priority_load) PURE;
/**
* return how many times host selection should be reattempted during host selection.
*/
virtual uint32_t hostSelectionMaxAttempts() const PURE;
};
Expand Down
11 changes: 7 additions & 4 deletions include/envoy/upstream/retry.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ class RetryPriority {
/**
* Determines what PriorityLoad to use.
*
* @param priority_state current state of cluster.
* @param priority_set current priority set of cluster.
* @param original_priority the unmodified PriorityLoad.
* @return a reference to the PriorityLoad to use. Return original_priority if no changes should
* be made.
*/
virtual PriorityLoad& determinePriorityLoad(const PriorityState& priority_state,
const PriorityLoad& original_priority) PURE;
virtual const PriorityLoad& determinePriorityLoad(const PrioritySet& priority_set,
const PriorityLoad& original_priority) PURE;

/**
* Called after a host has been attempted but before host selection for the next attempt has
Expand Down Expand Up @@ -105,7 +105,10 @@ class RetryPriorityFactory {
public:
virtual ~RetryPriorityFactory() {}

virtual void createRetryPriority(RetryPriorityFactoryCallbacks& callbacks) PURE;
virtual void createRetryPriority(RetryPriorityFactoryCallbacks& callbacks,
const Protobuf::Message& config) PURE;

virtual std::string name() const PURE;
};

/**
Expand Down
2 changes: 2 additions & 0 deletions source/common/http/async_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ class AsyncStreamImpl : public AsyncClient::Stream,
std::vector<Upstream::RetryHostPredicateSharedPtr> retryHostPredicates() const override {
return {};
}
Upstream::RetryPrioritySharedPtr retryPriority() const override { return {}; }

uint32_t hostSelectionMaxAttempts() const override { return 1; }
uint32_t numRetries() const override { return 0; }
uint32_t retryOn() const override { return 0; }
Expand Down
6 changes: 6 additions & 0 deletions source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ RetryPolicyImpl::RetryPolicyImpl(const envoy::api::v2::route::RouteAction& confi
->createHostPredicate(*this, host_predicate.config());
}

const auto retry_priority = config.retry_policy().retry_priority();
if (!retry_priority.name().empty()) {
Registry::FactoryRegistry<Upstream::RetryPriorityFactory>::getFactory(retry_priority.name())
->createRetryPriority(*this, retry_priority.config());
}

auto host_selection_attempts = config.retry_policy().host_selection_retry_max_attempts();
if (host_selection_attempts) {
host_selection_attempts_ = host_selection_attempts;
Expand Down
12 changes: 11 additions & 1 deletion source/common/router/config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ typedef std::shared_ptr<VirtualHostImpl> VirtualHostSharedPtr;
/**
* Implementation of RetryPolicy that reads from the proto route config.
*/
class RetryPolicyImpl : public RetryPolicy, Upstream::RetryHostPredicateFactoryCallbacks {
class RetryPolicyImpl : public RetryPolicy,
Upstream::RetryHostPredicateFactoryCallbacks,
Upstream::RetryPriorityFactoryCallbacks {
public:
RetryPolicyImpl(const envoy::api::v2::route::RouteAction& config);

Expand All @@ -194,18 +196,26 @@ class RetryPolicyImpl : public RetryPolicy, Upstream::RetryHostPredicateFactoryC
std::vector<Upstream::RetryHostPredicateSharedPtr> retryHostPredicates() const override {
return retry_host_predicates_;
}
Upstream::RetryPrioritySharedPtr retryPriority() const override { return retry_priority_; }
uint32_t hostSelectionMaxAttempts() const override { return host_selection_attempts_; }

// Upstream::RetryHostPredicateFactoryCallbacks
void addHostPredicate(Upstream::RetryHostPredicateSharedPtr predicate) override {
retry_host_predicates_.emplace_back(predicate);
}

// Upstream::RetryHostPredicateFactoryCallbacks
void addRetryPriority(Upstream::RetryPrioritySharedPtr retry_priority) override {
ASSERT(!retry_priority_);
retry_priority_ = retry_priority;
}

private:
std::chrono::milliseconds per_try_timeout_{0};
uint32_t num_retries_{};
uint32_t retry_on_{};
std::vector<Upstream::RetryHostPredicateSharedPtr> retry_host_predicates_;
Upstream::RetryPrioritySharedPtr retry_priority_;
uint32_t host_selection_attempts_{1};
};

Expand Down
3 changes: 2 additions & 1 deletion source/common/router/retry_state_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ RetryStateImpl::RetryStateImpl(const RetryPolicy& route_policy, Http::HeaderMap&
Runtime::RandomGenerator& random, Event::Dispatcher& dispatcher,
Upstream::ResourcePriority priority)
: cluster_(cluster), runtime_(runtime), random_(random), dispatcher_(dispatcher),
priority_(priority), retry_host_predicates_(route_policy.retryHostPredicates()) {
priority_(priority), retry_host_predicates_(route_policy.retryHostPredicates()),
retry_priority_(route_policy.retryPriority()) {

if (request_headers.EnvoyRetryOn()) {
retry_on_ = parseRetryOn(request_headers.EnvoyRetryOn()->value().c_str());
Expand Down
20 changes: 17 additions & 3 deletions source/common/router/retry_state_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,28 @@ class RetryStateImpl : public RetryState {
RetryStatus shouldRetry(const Http::HeaderMap* response_headers,
const absl::optional<Http::StreamResetReason>& reset_reason,
DoRetryCallback callback) override;

void onHostAttempted(Upstream::HostDescriptionConstSharedPtr host) override {
std::for_each(retry_host_predicates_.begin(), retry_host_predicates_.end(),
[&host](auto predicate) { predicate->onHostAttempted(host); });
if (retry_priority_) {
retry_priority_->onHostAttempted(host);
}
}

bool shouldSelectAnotherHost(const Upstream::Host& host) override {
return std::any_of(
retry_host_predicates_.begin(), retry_host_predicates_.end(),
[&host](auto predicate) { return predicate->shouldSelectAnotherHost(host); });
}

void onHostAttempted(Upstream::HostDescriptionConstSharedPtr host) override {
std::for_each(retry_host_predicates_.begin(), retry_host_predicates_.end(),
[&host](auto predicate) { predicate->onHostAttempted(host); });
const Upstream::PriorityLoad&
priorityLoadForRetry(const Upstream::PrioritySet& priority_set,
const Upstream::PriorityLoad& priority_load) override {
if (!retry_priority_) {
return priority_load;
}
return retry_priority_->determinePriorityLoad(priority_set, priority_load);
}

uint32_t hostSelectionMaxAttempts() const override { return host_selection_max_attempts_; }
Expand All @@ -74,6 +87,7 @@ class RetryStateImpl : public RetryState {
Upstream::ResourcePriority priority_;
BackOffStrategyPtr backoff_strategy_;
std::vector<Upstream::RetryHostPredicateSharedPtr> retry_host_predicates_;
Upstream::RetryPrioritySharedPtr retry_priority_;
uint32_t host_selection_max_attempts_;
};

Expand Down
11 changes: 11 additions & 0 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,17 @@ class Filter : Logger::Loggable<Logger::Id::router>,
return retry_state_->shouldSelectAnotherHost(host);
}

const Upstream::PriorityLoad&
determinePriorityLoad(const Upstream::PrioritySet& priority_set,
const Upstream::PriorityLoad& original_priority_load) override {
// We only modify the priority load on retries.
if (!is_retry_) {
return original_priority_load;
}

return retry_state_->priorityLoadForRetry(priority_set, original_priority_load);
}

uint32_t hostSelectionRetryCount() const override {
if (!is_retry_) {
return 1;
Expand Down
16 changes: 16 additions & 0 deletions test/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,23 @@ void ConfigHelper::finalize(const std::vector<uint32_t>& ports) {
host_socket_addr->set_port_value(ports[port_idx++]);
}
}

// Assign ports to statically defined load_assignment hosts.
for (int j = 0; j < cluster->load_assignment().endpoints_size(); ++j) {
auto locality_lb = cluster->mutable_load_assignment()->mutable_endpoints(j);
for (int k = 0; k < locality_lb->lb_endpoints_size(); ++k) {
auto lb_endpoint = locality_lb->mutable_lb_endpoints(k);
if (lb_endpoint->endpoint().address().has_socket_address()) {
RELEASE_ASSERT(ports.size() > port_idx, "");
lb_endpoint->mutable_endpoint()
->mutable_address()
->mutable_socket_address()
->set_port_value(ports[port_idx++]);
}
}
}
}

if (capture_path) {
const bool has_tls = cluster->has_tls_context();
absl::optional<ProtobufWkt::Struct> tls_config;
Expand Down
2 changes: 2 additions & 0 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ TEST_P(Http2IntegrationTest, GrpcRouterNotFound) { testGrpcRouterNotFound(); }

TEST_P(Http2IntegrationTest, RetryHostPredicateFilter) { testRetryHostPredicateFilter(); }

TEST_P(Http2IntegrationTest, RetryPriority) { testRetryPriority(); }

TEST_P(Http2IntegrationTest, GrpcRetry) { testGrpcRetry(); }

// Send a request with overly large headers, and ensure it results in stream reset.
Expand Down
78 changes: 78 additions & 0 deletions test/integration/http_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,84 @@ void HttpIntegrationTest::testGrpcRetry() {
}
}

// Verifies that a retry priority can be configured and affect the host selected during retries.
// The retry priority will always target P1, which would otherwise never be hit due to P0 being
// healthy.
void HttpIntegrationTest::testRetryPriority() {
const Upstream::PriorityLoad priority_load{0, 100};
Upstream::MockRetryPriorityFactory factory(
std::make_shared<NiceMock<Upstream::MockRetryPriority>>(priority_load));

Registry::InjectFactory<Upstream::RetryPriorityFactory> inject_factory(factory);

envoy::api::v2::route::RouteAction::RetryPolicy retry_policy;
retry_policy.mutable_retry_priority()->set_name(factory.name());

// Add route with custom retry policy
config_helper_.addRoute("host", "/test_retry", "cluster_0", false,
envoy::api::v2::route::RouteAction::NOT_FOUND,
envoy::api::v2::route::VirtualHost::NONE, retry_policy);

// Use load assignments instead of static hosts. Necessary in order to use priorities.
config_helper_.addConfigModifier([](envoy::config::bootstrap::v2::Bootstrap& bootstrap) {
auto cluster = bootstrap.mutable_static_resources()->mutable_clusters(0);
auto load_assignment = cluster->mutable_load_assignment();
load_assignment->set_cluster_name(cluster->name());
const auto& host_address = cluster->hosts(0).socket_address().address();

for (int i = 0; i < 2; ++i) {
auto locality = load_assignment->add_endpoints();
locality->set_priority(i);
locality->mutable_locality()->set_region("region");
locality->mutable_locality()->set_zone("zone");
locality->mutable_locality()->set_sub_zone("sub_zone" + std::to_string(i));
auto lb_endpoint = locality->add_lb_endpoints();
lb_endpoint->mutable_endpoint()->mutable_address()->mutable_socket_address()->set_address(
host_address);
lb_endpoint->mutable_endpoint()->mutable_address()->mutable_socket_address()->set_port_value(
0);
}

cluster->clear_hosts();
});

fake_upstreams_count_ = 2;
initialize();
codec_client_ = makeHttpConnection(lookupPort("http"));
auto response =
codec_client_->makeRequestWithBody(Http::TestHeaderMapImpl{{":method", "POST"},
{":path", "/test_retry"},
{":scheme", "http"},
{":authority", "host"},
{"x-forwarded-for", "10.0.0.1"},
{"x-envoy-retry-on", "5xx"}},
1024);

// Note how we're exepcting each upstream request to hit the same upstream.
waitForNextUpstreamRequest(0);
upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "503"}}, false);

if (fake_upstreams_[0]->httpType() == FakeHttpConnection::Type::HTTP1) {
ASSERT_TRUE(fake_upstream_connection_->waitForDisconnect());
ASSERT_TRUE(fake_upstreams_[1]->waitForHttpConnection(*dispatcher_, fake_upstream_connection_));
} else {
ASSERT_TRUE(upstream_request_->waitForReset());
}

waitForNextUpstreamRequest(1);
upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(512, true);

response->waitForEndStream();
EXPECT_TRUE(upstream_request_->complete());
EXPECT_EQ(1024U, upstream_request_->bodyLength());

EXPECT_TRUE(response->complete());
EXPECT_STREQ("200", response->headers().Status()->value().c_str());
EXPECT_EQ(512U, response->body().size());
}

//
// Verifies that a retry host filter can be configured and affect the host selected during retries.
// The predicate will keep track of the first host attempted, and attempt to route all requests to
// the same host. With a total of two upstream hosts, this should result in us continuously sending
Expand Down
1 change: 1 addition & 0 deletions test/integration/http_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class HttpIntegrationTest : public BaseIntegrationTest {
void testRetryHittingBufferLimit();
void testGrpcRouterNotFound();
void testGrpcRetry();
void testRetryPriority();
void testRetryHostPredicateFilter();
void testHittingDecoderFilterLimit();
void testHittingEncoderFilterLimit();
Expand Down
2 changes: 2 additions & 0 deletions test/integration/integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ TEST_P(IntegrationTest, Retry) { testRetry(); }

TEST_P(IntegrationTest, RetryHostPredicateFilter) { testRetryHostPredicateFilter(); }

TEST_P(IntegrationTest, RetryPriority) { testRetryPriority(); }

TEST_P(IntegrationTest, EnvoyHandling100Continue) { testEnvoyHandling100Continue(); }

TEST_P(IntegrationTest, EnvoyHandlingDuplicate100Continues) { testEnvoyHandling100Continue(true); }
Expand Down
3 changes: 3 additions & 0 deletions test/mocks/router/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class TestRetryPolicy : public RetryPolicy {
uint32_t numRetries() const override { return num_retries_; }
uint32_t retryOn() const override { return retry_on_; }
MOCK_CONST_METHOD0(retryHostPredicates, std::vector<Upstream::RetryHostPredicateSharedPtr>());
MOCK_CONST_METHOD0(retryPriority, Upstream::RetryPrioritySharedPtr());
uint32_t hostSelectionMaxAttempts() const override { return host_selection_max_attempts_; }

std::chrono::milliseconds per_try_timeout_{0};
Expand All @@ -91,6 +92,8 @@ class MockRetryState : public RetryState {
DoRetryCallback callback));
MOCK_METHOD1(onHostAttempted, void(Upstream::HostDescriptionConstSharedPtr));
MOCK_METHOD1(shouldSelectAnotherHost, bool(const Upstream::Host& host));
MOCK_METHOD2(priorityLoadForRetry, Upstream::PriorityLoad&(const Upstream::PrioritySet&,
const Upstream::PriorityLoad&));
MOCK_CONST_METHOD0(hostSelectionMaxAttempts, uint32_t());

DoRetryCallback callback_;
Expand Down
2 changes: 2 additions & 0 deletions test/mocks/upstream/mocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ void MockPrioritySet::runUpdateCallbacks(uint32_t priority, const HostVector& ho
member_update_cb_helper_.runCallbacks(priority, hosts_added, hosts_removed);
}

MockRetryPriority::~MockRetryPriority() {}

MockCluster::MockCluster() {
ON_CALL(*this, prioritySet()).WillByDefault(ReturnRef(priority_set_));
ON_CALL(testing::Const(*this), prioritySet()).WillByDefault(ReturnRef(priority_set_));
Expand Down
30 changes: 30 additions & 0 deletions test/mocks/upstream/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,36 @@ class MockPrioritySet : public PrioritySet {
Common::CallbackManager<uint32_t, const HostVector&, const HostVector&> member_update_cb_helper_;
};

class MockRetryPriority : public RetryPriority {
public:
MockRetryPriority(const PriorityLoad& priority_load) : priority_load_(priority_load) {}
~MockRetryPriority();

const PriorityLoad& determinePriorityLoad(const PrioritySet&, const PriorityLoad&) {
return priority_load_;
}

MOCK_METHOD1(onHostAttempted, void(HostDescriptionConstSharedPtr));

private:
const PriorityLoad& priority_load_;
};

class MockRetryPriorityFactory : public RetryPriorityFactory {
public:
MockRetryPriorityFactory(RetryPrioritySharedPtr retry_priority)
: retry_priority_(retry_priority) {}
void createRetryPriority(RetryPriorityFactoryCallbacks& callbacks,
const Protobuf::Message&) override {
callbacks.addRetryPriority(retry_priority_);
}

std::string name() const override { return "envoy.mock_retry_priority"; }

private:
RetryPrioritySharedPtr retry_priority_;
};

class MockCluster : public Cluster {
public:
MockCluster();
Expand Down

0 comments on commit e8da6f3

Please sign in to comment.