Skip to content

Commit

Permalink
Add quota cache. (envoyproxy#60)
Browse files Browse the repository at this point in the history
* Add quota cache.

* fix format

* free memroy in quota_cache_test.

* add README
  • Loading branch information
qiwzhang authored Apr 24, 2017
1 parent 154eb0d commit 3b6a587
Show file tree
Hide file tree
Showing 8 changed files with 517 additions and 12 deletions.
14 changes: 14 additions & 0 deletions mixerclient/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ cc_library(
"src/context_update.h",
"src/grpc_transport.cc",
"src/grpc_transport.h",
"src/quota_cache.cc",
"src/quota_cache.h",
"src/signature.cc",
"src/signature.h",
"src/stream_transport.h",
Expand All @@ -55,6 +57,7 @@ cc_library(
"//external:boringssl_crypto",
"//external:grpc++",
"//external:mixer_api_cc_proto",
"//prefetch:quota_prefetch_lib",
],
)

Expand Down Expand Up @@ -160,6 +163,17 @@ cc_test(
],
)

cc_test(
name = "quota_cache_test",
size = "small",
srcs = ["src/quota_cache_test.cc"],
linkstatic = 1,
deps = [
":mixer_client_lib",
"//external:googletest_main",
],
)

cc_test(
name = "grpc_transport_test",
size = "small",
Expand Down
31 changes: 31 additions & 0 deletions mixerclient/prefetch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

## Introduction

A c++ library for a rate limiting prefetch algorithm.

The rate limiting feature is for a system wanted to limit request rate. For example, a proxy wants to limit request rate to protect the backend server. The exceeded requests will be rejected by the proxy and will not reach the backend server.

If a system has multiple proxy instances, rate limiting could be local or global. If local, each running instance enforces its own limit. If global, all running instances are subjected to a global limit. Global rate limiting is more useful than local. For global rate limiting, usually there is a rate limiting server to enforce the global limits, each proxy instance needs to call the server to check the limits.

If each proxy instance is calling the rate limiting server for each request it is processing, it will greatly increase the request latency by adding a remote call. It is a good idea for the proxy to prefetch some tokens so not every request needs to make a remote call to check rate limits.

Here presents a prefetch algorithm for that purpose.

This code presents a rate limiting prefetch algorithm. It can achieve:
* All rate limiting decisions are done at local, not need to wait for remote call.
* It works for both big rate limiting window, such as 1 minute, or small window, such as 1 second.


## Algorithm

Basic idea is:
* Use a predict window to count number of requests, use that to determine prefetch amount.
* There is a pool to store prefetch tokens from the rate limiting server.
* When the available tokens in the pool is less than half of desired amount, trigger a new prefetch.
* If a prefetch is negative (requested amount is not fully granted), need to wait for a period time before next prefetch.

There are three parameters in this algorithm:
* predictWindow: the time to count the requests, use that to determine prefetch amount
* minPrefetch: the minimum prefetch amount
* closeWaitWindow: the wait time for the next prefetch if last prefetch is negative.

13 changes: 3 additions & 10 deletions mixerclient/src/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ MixerClientImpl::MixerClientImpl(const MixerClientOptions &options)

check_cache_ =
std::unique_ptr<CheckCache>(new CheckCache(options.check_options));
quota_cache_ = std::unique_ptr<QuotaCache>(
new QuotaCache(options.quota_options, quota_transport_.get()));
}

MixerClientImpl::~MixerClientImpl() { check_cache_->FlushAll(); }
Expand Down Expand Up @@ -84,16 +86,7 @@ void MixerClientImpl::Report(const Attributes &attributes, DoneFunc on_done) {
}

void MixerClientImpl::Quota(const Attributes &attributes, DoneFunc on_done) {
auto response = new QuotaResponse;
quota_transport_->Send(attributes, response,
[response, on_done](const Status &status) {
if (status.ok()) {
on_done(ConvertRpcStatus(response->result()));
} else {
on_done(status);
}
delete response;
});
quota_cache_->Quota(attributes, on_done);
}

// Creates a MixerClient object.
Expand Down
2 changes: 2 additions & 0 deletions mixerclient/src/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "include/client.h"
#include "src/check_cache.h"
#include "src/grpc_transport.h"
#include "src/quota_cache.h"
#include "src/transport.h"

namespace istio {
Expand All @@ -44,6 +45,7 @@ class MixerClientImpl : public MixerClient {
std::unique_ptr<GrpcTransport> grpc_transport_;

std::shared_ptr<CheckCache> check_cache_;
std::shared_ptr<QuotaCache> quota_cache_;

GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(MixerClientImpl);
};
Expand Down
163 changes: 163 additions & 0 deletions mixerclient/src/quota_cache.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/* Copyright 2017 Istio Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "src/quota_cache.h"
#include "src/signature.h"
#include "utils/protobuf.h"

using namespace std::chrono;
using ::istio::mixer::v1::QuotaResponse;
using ::google::protobuf::util::Status;
using ::google::protobuf::util::error::Code;

namespace istio {
namespace mixer_client {

QuotaCache::CacheElem::CacheElem(const Attributes& request,
QuotaTransport* transport)
: request_(request), transport_(transport) {
prefetch_ = QuotaPrefetch::Create(
[this](int amount, QuotaPrefetch::DoneFunc fn, QuotaPrefetch::Tick t) {
Alloc(amount, fn);
},
QuotaPrefetch::Options(), system_clock::now());

const auto& it = request.attributes.find(Attributes::kQuotaName);
if (it != request.attributes.end()) {
quota_name_ = it->second.str_v;
}
}

void QuotaCache::CacheElem::Alloc(int amount, QuotaPrefetch::DoneFunc fn) {
auto response = new QuotaResponse;
request_.attributes[Attributes::kQuotaAmount] =
Attributes::Int64Value(amount);
transport_->Send(request_, response, [response, fn](const Status& status) {
int amount = -1;
milliseconds expire;
if (status.ok()) {
amount = response->amount();
expire = ToMilliseonds(response->expiration());
}
delete response;
fn(amount, expire, system_clock::now());
});
}

bool QuotaCache::CacheElem::Quota(const Attributes& request) {
int amount = 1;
const auto& it = request.attributes.find(Attributes::kQuotaAmount);
if (it != request.attributes.end()) {
amount = it->second.value.int64_v;
}
return prefetch_->Check(amount, system_clock::now());
}

QuotaCache::QuotaCache(const QuotaOptions& options, QuotaTransport* transport)
: options_(options), transport_(transport) {
if (options.num_entries > 0) {
cache_.reset(new QuotaLRUCache(
options.num_entries, std::bind(&QuotaCache::OnCacheEntryDelete, this,
std::placeholders::_1)));
cache_->SetMaxIdleSeconds(options.expiration_ms / 1000.0);

// Excluse quota_amount in the key calculation.
cache_keys_ = CacheKeySet::CreateExclusive({Attributes::kQuotaAmount});
}
}

QuotaCache::~QuotaCache() {
// FlushAll() will remove all cache items.
FlushAll();
}

void QuotaCache::Quota(const Attributes& request, DoneFunc on_done) {
if (transport_ == nullptr) {
on_done(Status(Code::INVALID_ARGUMENT,
std::string("transport object is NOT provided: ")));
return;
}

// Makes sure quota_name is provided and with correct type.
const auto& it = request.attributes.find(Attributes::kQuotaName);
if (it == request.attributes.end() ||
it->second.type != Attributes::Value::STRING) {
on_done(Status(Code::INVALID_ARGUMENT,
std::string("A required attribute is missing: ") +
Attributes::kQuotaName));
return;
}

if (!cache_) {
auto response = new QuotaResponse;
transport_->Send(request, response,
[response, on_done](const Status& status) {
if (status.ok()) {
on_done(ConvertRpcStatus(response->result()));
} else {
on_done(status);
}
delete response;
});
return;
}

std::string signature = GenerateSignature(request, *cache_keys_);

std::lock_guard<std::mutex> lock(cache_mutex_);
QuotaLRUCache::ScopedLookup lookup(cache_.get(), signature);

CacheElem* cache_elem;
if (!lookup.Found()) {
cache_elem = new CacheElem(request, transport_);
cache_->Insert(signature, cache_elem, 1);
} else {
cache_elem = lookup.value();
}

if (cache_elem->Quota(request)) {
on_done(Status::OK);
} else {
on_done(Status(
Code::RESOURCE_EXHAUSTED,
std::string("Quota is exhausted for: ") + cache_elem->quota_name()));
}
}

// TODO: hookup with a timer object to call Flush() periodically.
Status QuotaCache::Flush() {
if (cache_) {
std::lock_guard<std::mutex> lock(cache_mutex_);
cache_->RemoveExpiredEntries();
}

return Status::OK;
}

void QuotaCache::OnCacheEntryDelete(CacheElem* elem) { delete elem; }

// Flush out aggregated check requests, clear all cache items.
// Usually called at destructor.
Status QuotaCache::FlushAll() {
if (cache_) {
std::lock_guard<std::mutex> lock(cache_mutex_);
cache_->RemoveAll();
}

return Status::OK;
}

} // namespace mixer_client
} // namespace istio
111 changes: 111 additions & 0 deletions mixerclient/src/quota_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/* Copyright 2017 Istio Authors. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Caches quota calls.

#ifndef MIXERCLIENT_QUOTA_CACHE_H
#define MIXERCLIENT_QUOTA_CACHE_H

#include <mutex>
#include <string>

#include "include/client.h"
#include "prefetch/quota_prefetch.h"
#include "src/cache_key_set.h"
#include "src/transport.h"
#include "utils/simple_lru_cache.h"
#include "utils/simple_lru_cache_inl.h"

namespace istio {
namespace mixer_client {

// Cache Mixer Quota Attributes.
// This interface is thread safe.
class QuotaCache {
public:
QuotaCache(const QuotaOptions& options, QuotaTransport* transport);

virtual ~QuotaCache();

// Make cached Quota call.
void Quota(const Attributes& request, DoneFunc on_done);

// Invalidates expired check responses.
// Called at time specified by GetNextFlushInterval().
::google::protobuf::util::Status Flush();

// Flushes out all cached check responses; clears all cache items.
// Usually called at destructor.
::google::protobuf::util::Status FlushAll();

private:
// The cache element for each quota metric.
class CacheElem {
public:
CacheElem(const Attributes& request, QuotaTransport* transport);

// Use the prefetch object to check the quota.
bool Quota(const Attributes& request);

// The quota name.
const std::string& quota_name() const { return quota_name_; }

private:
// The quota allocation call.
void Alloc(int amount, QuotaPrefetch::DoneFunc fn);

// The original quota request.
Attributes request_;
// the quota name.
std::string quota_name_;
// quota transport.
QuotaTransport* transport_;
// The prefetch object.
std::unique_ptr<QuotaPrefetch> prefetch_;
};

using CacheDeleter = std::function<void(CacheElem*)>;
// Key is the signature of the Attributes. Value is the CacheElem.
// It is a LRU cache with MaxIdelTime as response_expiration_time.
using QuotaLRUCache =
SimpleLRUCacheWithDeleter<std::string, CacheElem, CacheDeleter>;

// Flushes the internal operation in the elem and delete the elem. The
// response from the server is NOT cached.
// Takes ownership of the elem.
void OnCacheEntryDelete(CacheElem* elem);

// The quota options.
QuotaOptions options_;

// The quota transport
QuotaTransport* transport_;

// The cache keys.
std::unique_ptr<CacheKeySet> cache_keys_;

// Mutex guarding the access of cache_;
std::mutex cache_mutex_;

// The cache that maps from key to prefetch object
std::unique_ptr<QuotaLRUCache> cache_;

GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(QuotaCache);
};

} // namespace mixer_client
} // namespace istio

#endif // MIXERCLIENT_CHECK_CACHE_H
Loading

0 comments on commit 3b6a587

Please sign in to comment.