Skip to content

Commit

Permalink
Quota cache to calculate key from ReferencedAttributes. (envoyproxy#105)
Browse files Browse the repository at this point in the history
  • Loading branch information
qiwzhang authored Sep 1, 2017
1 parent c74804a commit 5025ded
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 31 deletions.
115 changes: 86 additions & 29 deletions mixerclient/src/quota_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,32 @@ QuotaCache::CacheElem::CacheElem(const std::string& name) : name_(name) {
void QuotaCache::CacheElem::Alloc(int amount, QuotaPrefetch::DoneFunc fn) {
quota_->amount = amount;
quota_->best_effort = true;
quota_->response_func =
[fn](const CheckResponse::QuotaResult* result) -> bool {
quota_->response_func = [fn](
const Attributes&, const CheckResponse::QuotaResult* result) -> bool {
int amount = -1;
milliseconds expire;
milliseconds expire = duration_cast<milliseconds>(minutes(1));
if (result != nullptr) {
amount = result->granted_amount();
expire = ToMilliseonds(result->valid_duration());
if (result->has_valid_duration()) {
expire = ToMilliseonds(result->valid_duration());
}
}
fn(amount, expire, system_clock::now());
return true;
};
}

bool QuotaCache::CacheElem::Quota(int amount, CheckResult::Quota* quota) {
void QuotaCache::CacheElem::Quota(int amount, CheckResult::Quota* quota) {
quota_ = quota;
bool ret = prefetch_->Check(amount, system_clock::now());
if (prefetch_->Check(amount, system_clock::now())) {
quota->result = CheckResult::Quota::Passed;
} else {
quota->result = CheckResult::Quota::Rejected;
}

// A hack that requires prefetch code to call transport Alloc() function
// within Check() call.
quota_ = nullptr;
return ret;
}

QuotaCache::CheckResult::CheckResult() : status_(Code::UNAVAILABLE, "") {}
Expand Down Expand Up @@ -111,7 +117,7 @@ void QuotaCache::CheckResult::SetResponse(const Status& status,
<< quota.name;
}
}
if (!quota.response_func(result)) {
if (!quota.response_func(attributes, result)) {
if (!rejected_quota_names.empty()) {
rejected_quota_names += ",";
}
Expand Down Expand Up @@ -140,43 +146,94 @@ QuotaCache::~QuotaCache() {
FlushAll();
}

void QuotaCache::CheckCache(const Attributes& request, bool use_cache,
void QuotaCache::CheckCache(const Attributes& request, bool check_use_cache,
CheckResult::Quota* quota) {
if (!cache_ || !use_cache) {
// If check is not using cache, that check may be rejected.
// If quota cache is used, quota amount is already substracted from the cache.
// If the check is rejected, there is not easy way to add them back to cache.
// The workaround is not to use quota cache if check is not in the cache.
if (!cache_ || !check_use_cache) {
quota->best_effort = false;
quota->result = CheckResult::Quota::Pending;
quota->response_func =
[](const CheckResponse::QuotaResult* result) -> bool {
quota->response_func = [](
const Attributes&, const CheckResponse::QuotaResult* result) -> bool {
// nullptr means connection error, for quota, it is fail open for
// connection error.
return result == nullptr || result->granted_amount() > 0;
};
return;
}

// TODO: for now, quota cache key always is the quota name.
// Need to use ReferencedAttributes from Mixer server to calculate cache key.
std::string signature = quota->name;
std::lock_guard<std::mutex> lock(cache_mutex_);
PerQuotaReferenced& quota_ref = quota_referenced_map_[quota->name];
for (const auto& it : quota_ref.referenced_map) {
const Referenced& referenced = it.second;
std::string signature;
if (!referenced.Signature(request, &signature)) {
continue;
}
QuotaLRUCache::ScopedLookup lookup(cache_.get(), signature);
if (lookup.Found()) {
CacheElem* cache_elem = lookup.value();
cache_elem->Quota(quota->amount, quota);
return;
}
}

if (!quota_ref.pending_item) {
quota_ref.pending_item.reset(new CacheElem(quota->name));
}
quota_ref.pending_item->Quota(quota->amount, quota);

auto saved_func = quota->response_func;
std::string quota_name = quota->name;
quota->response_func = [saved_func, quota_name, this](
const Attributes& attributes,
const CheckResponse::QuotaResult* result) -> bool {
SetResponse(attributes, quota_name, result);
if (saved_func) {
return saved_func(attributes, result);
}
return true;
};
}

void QuotaCache::SetResponse(const Attributes& attributes,
const std::string& quota_name,
const CheckResponse::QuotaResult* result) {
if (result == nullptr) {
return;
}

Referenced referenced;
if (!referenced.Fill(result->referenced_attributes())) {
return;
}

std::string signature;
if (!referenced.Signature(attributes, &signature)) {
GOOGLE_LOG(ERROR) << "Quota response referenced mismatchs with request";
GOOGLE_LOG(ERROR) << "Request attributes: " << attributes.DebugString();
GOOGLE_LOG(ERROR) << "Referenced attributes: " << referenced.DebugString();
return;
}

std::lock_guard<std::mutex> lock(cache_mutex_);
QuotaLRUCache::ScopedLookup lookup(cache_.get(), signature);
CacheElem* cache_elem;
if (!lookup.Found()) {
cache_elem = new CacheElem(quota->name);
cache_->Insert(signature, cache_elem, 1);
} else {
cache_elem = lookup.value();
if (lookup.Found()) {
// Not to override the existing cache entry.
return;
}

if (cache_elem->Quota(quota->amount, quota)) {
quota->result = CheckResult::Quota::Passed;
} else {
// TODO: for multiple quota metrics in a request,
// if a metric is rejected, other metrics should not use any tokens.
// One way is for prefetch to implement TryQuota, we call TryQuota
// for all metrics first, only all passed, then deduce the tokens.
quota->result = CheckResult::Quota::Rejected;
PerQuotaReferenced& quota_ref = quota_referenced_map_[quota_name];
std::string hash = referenced.Hash();
if (quota_ref.referenced_map.find(hash) == quota_ref.referenced_map.end()) {
quota_ref.referenced_map[hash] = referenced;
GOOGLE_LOG(INFO) << "Add a new Referenced for quota cache: " << quota_name
<< ", reference: " << referenced.DebugString();
}

cache_->Insert(signature, quota_ref.pending_item.release(), 1);
}

void QuotaCache::Check(const Attributes& request, bool use_cache,
Expand Down
25 changes: 23 additions & 2 deletions mixerclient/src/quota_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

#include <mutex>
#include <string>
#include <unordered_map>

#include "include/client.h"
#include "prefetch/quota_prefetch.h"
#include "src/attribute_converter.h"
#include "src/referenced.h"
#include "utils/simple_lru_cache.h"
#include "utils/simple_lru_cache_inl.h"

Expand Down Expand Up @@ -79,6 +81,7 @@ class QuotaCache {

// The function to set the quota response from server.
using OnResponseFunc = std::function<bool(
const Attributes& attributes,
const ::istio::mixer::v1::CheckResponse::QuotaResult* result)>;
OnResponseFunc response_func;
};
Expand Down Expand Up @@ -111,7 +114,7 @@ class QuotaCache {
CacheElem(const std::string& name);

// Use the prefetch object to check the quota.
bool Quota(int amount, CheckResult::Quota* quota);
void Quota(int amount, CheckResult::Quota* quota);

// The quota name.
const std::string& quota_name() const { return name_; }
Expand All @@ -129,14 +132,32 @@ class QuotaCache {
std::unique_ptr<QuotaPrefetch> prefetch_;
};

// Per quota Referenced data.
struct PerQuotaReferenced {
// Pending CacheElem for all cache miss requests.
// This item will be added to the cache after response.
std::unique_ptr<CacheElem> pending_item;

// Referenced map keyed with their hashes
std::unordered_map<std::string, Referenced> referenced_map;
};

// Set a quota response.
void SetResponse(
const Attributes& attributes, const std::string& quota_name,
const ::istio::mixer::v1::CheckResponse::QuotaResult* result);

// A map from quota name to PerQuotaReferenced.
std::unordered_map<std::string, PerQuotaReferenced> quota_referenced_map_;

// Key is the signature of the Attributes. Value is the CacheElem.
// It is a LRU cache with MaxIdelTime as response_expiration_time.
using QuotaLRUCache = SimpleLRUCache<std::string, CacheElem>;

// The quota options.
QuotaOptions options_;

// Mutex guarding the access of cache_;
// Mutex guarding the access of cache_ and quota_referenced_map_
std::mutex cache_mutex_;

// The cache that maps from key to prefetch object
Expand Down
145 changes: 145 additions & 0 deletions mixerclient/src/quota_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

using ::istio::mixer::v1::CheckRequest;
using ::istio::mixer::v1::CheckResponse;
using ::istio::mixer::v1::ReferencedAttributes;
using ::google::protobuf::util::Status;
using ::google::protobuf::util::error::Code;
using ::testing::Invoke;
Expand All @@ -43,6 +44,24 @@ class QuotaCacheTest : public ::testing::Test {
Attributes::StringValue(kQuotaName);
}

void TestRequest(const Attributes& request, bool pass,
const CheckResponse& response) {
QuotaCache::CheckResult result;
cache_->Check(request, true, &result);

CheckRequest request_pb;
result.BuildRequest(&request_pb);

EXPECT_TRUE(result.IsCacheHit());
if (pass) {
EXPECT_TRUE(result.status().ok());
} else {
EXPECT_FALSE(result.status().ok());
}

result.SetResponse(Status::OK, request, response);
}

Attributes request_;
std::unique_ptr<QuotaCache> cache_;
};
Expand Down Expand Up @@ -158,6 +177,132 @@ TEST_F(QuotaCacheTest, TestUseCacheRejected) {
EXPECT_EQ(rejected, 9);
}

TEST_F(QuotaCacheTest, TestInvalidQuotaReferenced) {
// If quota result Referenced is invalid (wrong word index),
// its cache item stays in pending.
// Other cache miss requests with same quota name will use pending
// item.
CheckResponse response;
CheckResponse::QuotaResult quota_result;
// Not more quota.
quota_result.set_granted_amount(0);
auto match =
quota_result.mutable_referenced_attributes()->add_attribute_matches();
match->set_condition(ReferencedAttributes::ABSENCE);
match->set_name(10000); // global index is too big
(*response.mutable_quotas())[kQuotaName] = quota_result;

Attributes attr(request_);
attr.attributes["source.name"] = Attributes::StringValue("user1");
// response has invalid referenced, cache item still in pending.
TestRequest(attr, true, response);

attr.attributes["source.name"] = Attributes::StringValue("user2");
// it is a cache miss, use pending request.
// Previous request has used up token, this request will be rejected.
TestRequest(attr, false, response);
}

TEST_F(QuotaCacheTest, TestMismatchedReferenced) {
// If quota result Referenced is mismatched with request data.
// its cache item stays in pending.
// Other cache miss requests with same quota name will use pending
// item.
CheckResponse response;
CheckResponse::QuotaResult quota_result;
// Not more quota.
quota_result.set_granted_amount(0);
auto match =
quota_result.mutable_referenced_attributes()->add_attribute_matches();
match->set_condition(ReferencedAttributes::ABSENCE);
match->set_name(2); // "source.name" should be absence (mismatch)
(*response.mutable_quotas())[kQuotaName] = quota_result;

Attributes attr(request_);
attr.attributes["source.name"] = Attributes::StringValue("user1");
// Since respones has mismatched Referenced, cache item still in pending.
// Prefetch always allow the first call.
TestRequest(attr, true, response);

// second request with different users still use the pending request.
attr.attributes["source.name"] = Attributes::StringValue("user2");
// it is a cache miss, use pending request.
// Previous request has used up token, this request will be rejected.
TestRequest(attr, false, response);
}

TEST_F(QuotaCacheTest, TestOneReferencedWithTwoKeys) {
// Quota needs to use source.name as cache key.
// First source.name is exhaused, and second one is with quota.
CheckResponse response;
CheckResponse::QuotaResult quota_result;
// Not more quota.
quota_result.set_granted_amount(0);
auto match =
quota_result.mutable_referenced_attributes()->add_attribute_matches();
match->set_condition(ReferencedAttributes::EXACT);
match->set_name(2); // "source.name" should be used
(*response.mutable_quotas())[kQuotaName] = quota_result;

Attributes attr1(request_);
attr1.attributes["source.name"] = Attributes::StringValue("user1");
Attributes attr2(request_);
attr2.attributes["source.name"] = Attributes::StringValue("user2");

// cache item is updated with 0 token in the pool.
// it will be saved into cache key with user1.
TestRequest(attr1, true, response);

// user2 still have quota.
quota_result.set_granted_amount(10);
(*response.mutable_quotas())[kQuotaName] = quota_result;
TestRequest(attr2, true, response);

// user1 will not have quota
TestRequest(attr1, false, response);

// user2 will have quota
TestRequest(attr2, true, response);
}

TEST_F(QuotaCacheTest, TestTwoReferencedWith) {
CheckResponse::QuotaResult quota_result1;
// Not more quota.
quota_result1.set_granted_amount(0);
auto match =
quota_result1.mutable_referenced_attributes()->add_attribute_matches();
match->set_condition(ReferencedAttributes::EXACT);
match->set_name(2); // "source.name" should be used
CheckResponse response1;
(*response1.mutable_quotas())[kQuotaName] = quota_result1;

CheckResponse::QuotaResult quota_result2;
quota_result2.set_granted_amount(10);
match =
quota_result2.mutable_referenced_attributes()->add_attribute_matches();
match->set_condition(ReferencedAttributes::EXACT);
match->set_name(3); // "source.uid" should be used
CheckResponse response2;
(*response2.mutable_quotas())[kQuotaName] = quota_result2;

Attributes attr1(request_);
attr1.attributes["source.name"] = Attributes::StringValue("name");
Attributes attr2(request_);
attr2.attributes["source.uid"] = Attributes::StringValue("uid");

// name request with 0 granted response
TestRequest(attr1, true, response1);

// uid request with 10 granted response
TestRequest(attr2, true, response2);

// user1 will not have quota
TestRequest(attr1, false, response1);

// user2 will have quota
TestRequest(attr2, true, response2);
}

} // namespace
} // namespace mixer_client
} // namespace istio

0 comments on commit 5025ded

Please sign in to comment.