Skip to content

Commit

Permalink
Check cache to use cachability from response. (#72)
Browse files Browse the repository at this point in the history
  • Loading branch information
qiwzhang authored Jun 2, 2017
1 parent a1882dc commit 9287180
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 181 deletions.
24 changes: 2 additions & 22 deletions mixerclient/include/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,17 @@ namespace mixer_client {
struct CheckOptions {
// Default constructor.
// Default options are chosen from experience.
CheckOptions()
: num_entries(10000), flush_interval_ms(500), expiration_ms(1000) {}
CheckOptions() : num_entries(10000) {}

// Constructor.
// cache_entries is the maximum number of cache entries that can be kept in
// the cache. Cache is disabled when cache_entries <= 0.
// flush_cache_entry_interval_ms is the maximum milliseconds before an
// check request needs to send to remote server again.
// response_expiration_ms is the maximum milliseconds before a cached check
// response is invalidated. We make sure that it is at least
// flush_cache_entry_interval_ms + 1.
CheckOptions(int cache_entries, int flush_cache_entry_interval_ms,
int response_expiration_ms)
: num_entries(cache_entries),
flush_interval_ms(flush_cache_entry_interval_ms),
expiration_ms(std::max(flush_cache_entry_interval_ms + 1,
response_expiration_ms)) {}
CheckOptions(int cache_entries) : num_entries(cache_entries) {}

// Maximum number of cache entries kept in the cache.
// Set to 0 will disable caching.
const int num_entries;

// Maximum milliseconds before check requests are flushed to the
// server. The flush is triggered by a check request.
const int flush_interval_ms;

// Maximum milliseconds before a cached check response should be deleted. The
// deletion is triggered by a timer. This value must be larger than
// flush_interval_ms.
const int expiration_ms;

// Only the attributes in this set are used to caclculate cache key.
// If empty, check cache is disabled.
std::vector<std::string> cache_keys;
Expand Down
108 changes: 53 additions & 55 deletions mixerclient/src/check_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,49 @@

#include "src/check_cache.h"
#include "src/signature.h"
#include "utils/protobuf.h"

#include "google/protobuf/stubs/logging.h"

using std::string;
using ::istio::mixer::v1::CheckRequest;
using namespace std::chrono;
using ::istio::mixer::v1::CheckResponse;
using ::google::protobuf::util::Status;
using ::google::protobuf::util::error::Code;

namespace istio {
namespace mixer_client {

CheckCache::CheckCache(const CheckOptions& options) : options_(options) {
// Converts flush_interval_ms to Cycle used by SimpleCycleTimer.
flush_interval_in_cycle_ =
options_.flush_interval_ms * SimpleCycleTimer::Frequency() / 1000;
void CheckCache::CacheElem::CacheElem::SetResponse(
const CheckResponse& response, Tick time_now) {
status_ = ConvertRpcStatus(response.status());
if (response.has_cachability()) {
if (response.cachability().has_duration()) {
expire_time_ =
time_now + ToMilliseonds(response.cachability().duration());
} else {
// never expired.
expire_time_ = time_point<system_clock>::max();
}
use_count_ = response.cachability().use_count();
} else {
// if no cachability specified, use it forever.
use_count_ = -1; // -1 for not checking use_count
expire_time_ = time_point<system_clock>::max();
}
}

// check if the item is expired.
bool CheckCache::CacheElem::CacheElem::IsExpired(Tick time_now) {
if (time_now > expire_time_ || use_count_ == 0) {
return true;
}
if (use_count_ > 0) {
--use_count_;
}
return false;
}

CheckCache::CheckCache(const CheckOptions& options) : options_(options) {
if (options.num_entries > 0 && !options_.cache_keys.empty()) {
cache_.reset(new CheckLRUCache(
options.num_entries, std::bind(&CheckCache::OnCacheEntryDelete, this,
std::placeholders::_1)));
cache_->SetMaxIdleSeconds(options.expiration_ms / 1000.0);
cache_.reset(new CheckLRUCache(options.num_entries));
cache_keys_ = CacheKeySet::CreateInclusive(options_.cache_keys);
}
}
Expand All @@ -46,81 +67,58 @@ CheckCache::~CheckCache() {
FlushAll();
}

bool CheckCache::ShouldFlush(const CacheElem& elem) {
int64_t age = SimpleCycleTimer::Now() - elem.last_check_time();
return age >= flush_interval_in_cycle_;
}

Status CheckCache::Check(const Attributes& attributes, std::string* signature) {
Status CheckCache::Check(const Attributes& attributes, Tick time_now,
std::string* ret_signature) {
if (!cache_) {
// By returning NOT_FOUND, caller will send request to server.
return Status(Code::NOT_FOUND, "");
}

std::string request_signature = GenerateSignature(attributes, *cache_keys_);
if (signature) {
*signature = request_signature;
std::string signature = GenerateSignature(attributes, *cache_keys_);
if (ret_signature) {
*ret_signature = signature;
}

std::lock_guard<std::mutex> lock(cache_mutex_);
CheckLRUCache::ScopedLookup lookup(cache_.get(), request_signature);
CheckLRUCache::ScopedLookup lookup(cache_.get(), signature);

if (!lookup.Found()) {
// By returning NO_FOUND, caller will send request to server.
return Status(Code::NOT_FOUND, "");
}

CacheElem* elem = lookup.value();

if (ShouldFlush(*elem)) {
// Setting last check to now to block more check requests to Mixer.
elem->set_last_check_time(SimpleCycleTimer::Now());
if (elem->IsExpired(time_now)) {
// By returning NO_FOUND, caller will send request to server.
cache_->Remove(signature);
return Status(Code::NOT_FOUND, "");
}

return elem->status();
}

Status CheckCache::CacheResponse(const std::string& request_signature,
::google::protobuf::util::Status status) {
if (cache_) {
std::lock_guard<std::mutex> lock(cache_mutex_);
CheckLRUCache::ScopedLookup lookup(cache_.get(), request_signature);

int64_t now = SimpleCycleTimer::Now();

if (lookup.Found()) {
lookup.value()->set_last_check_time(now);
lookup.value()->set_status(status);
} else {
CacheElem* cache_elem = new CacheElem(status, now);
cache_->Insert(request_signature, cache_elem, 1);
}
Status CheckCache::CacheResponse(const std::string& signature,
const CheckResponse& response, Tick time_now) {
if (!cache_) {
return ConvertRpcStatus(response.status());
}

return Status::OK;
}
std::lock_guard<std::mutex> lock(cache_mutex_);
CheckLRUCache::ScopedLookup lookup(cache_.get(), signature);

// TODO: need to hook up a timer to call Flush.
// Flush aggregated requests whom are longer than flush_interval.
// Called at time specified by GetNextFlushInterval().
Status CheckCache::Flush() {
if (cache_) {
std::lock_guard<std::mutex> lock(cache_mutex_);
cache_->RemoveExpiredEntries();
if (lookup.Found()) {
lookup.value()->SetResponse(response, time_now);
return lookup.value()->status();
}

return Status::OK;
CacheElem* cache_elem = new CacheElem(response, time_now);
cache_->Insert(signature, cache_elem, 1);
return cache_elem->status();
}

void CheckCache::OnCacheEntryDelete(CacheElem* elem) { delete elem; }

// Flush out aggregated check requests, clear all cache items.
// Usually called at destructor.
Status CheckCache::FlushAll() {
if (cache_) {
GOOGLE_LOG(INFO) << "Remove all entries of check cache.";
std::lock_guard<std::mutex> lock(cache_mutex_);
cache_->RemoveAll();
}
Expand Down
78 changes: 30 additions & 48 deletions mixerclient/src/check_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef MIXERCLIENT_CHECK_CACHE_H
#define MIXERCLIENT_CHECK_CACHE_H

#include <chrono>
#include <mutex>
#include <string>
#include <unordered_map>
Expand All @@ -33,27 +34,26 @@
namespace istio {
namespace mixer_client {

// Cache Mixer Check Attributes.
// Cache Mixer Check call result.
// This interface is thread safe.
class CheckCache {
public:
using Tick = std::chrono::time_point<std::chrono::system_clock>;

CheckCache(const CheckOptions& options);

virtual ~CheckCache();

// If the check could not be handled by the cache, returns NOT_FOUND,
// caller has to send the request to mixer.
// Otherwise, returns OK and cached response.
virtual ::google::protobuf::util::Status Check(const Attributes& request,
std::string* signature);
::google::protobuf::util::Status Check(const Attributes& request,
Tick time_now, std::string* signature);

// Caches a response from a remote mixer call.
virtual ::google::protobuf::util::Status CacheResponse(
const std::string& signature, ::google::protobuf::util::Status status);

// Invalidates expired check responses.
// Called at time specified by GetNextFlushInterval().
virtual ::google::protobuf::util::Status Flush();
// Return the converted status from response.
::google::protobuf::util::Status CacheResponse(
const std::string& signature,
const ::istio::mixer::v1::CheckResponse& response, Tick time_now);

// Flushes out all cached check responses; clears all cache items.
// Usually called at destructor.
Expand All @@ -62,50 +62,35 @@ class CheckCache {
private:
class CacheElem {
public:
CacheElem(::google::protobuf::util::Status status, const int64_t time)
: status_(status), last_check_time_(time) {}

// Setter for check status.
inline void set_status(::google::protobuf::util::Status status) {
status_ = status;
CacheElem(const ::istio::mixer::v1::CheckResponse& response, Tick time) {
SetResponse(response, time);
}
// Getter for check response.
inline ::google::protobuf::util::Status status() const { return status_; }

// Setter for last check time.
inline void set_last_check_time(const int64_t last_check_time) {
last_check_time_ = last_check_time;
}
// Getter for last check time.
inline const int64_t last_check_time() const { return last_check_time_; }
// Set the response
void SetResponse(const ::istio::mixer::v1::CheckResponse& response,
Tick time_now);

// Check if the item is expired.
bool IsExpired(Tick time_now);

// getter for converted status from response.
::google::protobuf::util::Status status() const { return status_; }

private:
// The check status for the last check request.
::google::protobuf::util::Status status_;
// In general, this is the last time a check response is updated.
//
// During flush, we set it to be the request start time to prevent a next
// check request from triggering another flush. Note that this prevention
// works only during the flush interval, which means for long RPC, there
// could be up to RPC_time/flush_interval ongoing check requests.
int64_t last_check_time_;
// Cache item should not be used after it is expired.
std::chrono::time_point<std::chrono::system_clock> expire_time_;
// if -1, not to check use_count.
// if 0, cache item should not be used.
// use_cound is decreased by 1 for each request,
int use_count_;
};

using CacheDeleter = std::function<void(CacheElem*)>;
// Key is the signature of the Attributes. Value is the CacheElem.
// It is a LRU cache with MaxIdelTime as response_expiration_time.
using CheckLRUCache =
SimpleLRUCacheWithDeleter<std::string, CacheElem, CacheDeleter>;

// Returns whether we should flush a cache entry.
// If the aggregated check request is less than flush interval, no need to
// flush.
bool ShouldFlush(const CacheElem& elem);

// Flushes the internal operation in the elem and delete the elem. The
// response from the server is NOT cached.
// Takes ownership of the elem.
void OnCacheEntryDelete(CacheElem* elem);
// It is a LRU cache with maximum size.
// When the maximum size is reached, oldest idle items will be removed.
using CheckLRUCache = SimpleLRUCache<std::string, CacheElem>;

// The check options.
CheckOptions options_;
Expand All @@ -122,9 +107,6 @@ class CheckCache {
// Guarded by mutex_, except when compare against NULL.
std::unique_ptr<CheckLRUCache> cache_;

// flush interval in cycles.
int64_t flush_interval_in_cycle_;

GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(CheckCache);
};

Expand Down
Loading

0 comments on commit 9287180

Please sign in to comment.