Skip to content

Commit

Permalink
[redis proxy]: add support for keys and select
Browse files Browse the repository at this point in the history
Signed-off-by: duanhongyi <duanhongyi@doopai.com>
  • Loading branch information
duanhongyi committed Dec 19, 2024
1 parent 857107b commit ea751e2
Show file tree
Hide file tree
Showing 12 changed files with 450 additions and 36 deletions.
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ removed_config_or_runtime:
Removed runtime flag ``envoy.reloadable_features.exclude_host_in_eds_status_draining``.
new_features:
- area: redis
change: |
Added support for keys and select.
- area: wasm
change: |
Added the wasm vm reload support to reload wasm vm when the wasm vm is failed with runtime errors. See
Expand Down
2 changes: 2 additions & 0 deletions docs/root/intro/arch_overview/other_protocols/redis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ For details on each command's usage see the official
EXISTS, Generic
EXPIRE, Generic
EXPIREAT, Generic
KEYS, String
PERSIST, Generic
PEXPIRE, Generic
PEXPIREAT, Generic
PTTL, Generic
RESTORE, Generic
SELECT, Generic
TOUCH, Generic
TTL, Generic
TYPE, Generic
Expand Down
21 changes: 19 additions & 2 deletions source/extensions/clusters/redis/redis_cluster_lb.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "source/extensions/clusters/redis/redis_cluster_lb.h"

#include <string>

namespace Envoy {
namespace Extensions {
namespace Clusters {
Expand Down Expand Up @@ -138,8 +140,17 @@ Upstream::HostConstSharedPtr RedisClusterLoadBalancerFactory::RedisClusterLoadBa
return nullptr;
}

auto shard = shard_vector_->at(
slot_array_->at(hash.value() % Envoy::Extensions::Clusters::Redis::MaxSlot));
RedisShardSharedPtr shard;
if (dynamic_cast<const RedisSpecifyShardContextImpl*>(context)) {
if (hash.value() < shard_vector_->size()) {
shard = shard_vector_->at(hash.value());
} else {
return nullptr;
}
} else {
shard = shard_vector_->at(
slot_array_->at(hash.value() % Envoy::Extensions::Clusters::Redis::MaxSlot));
}

auto redis_context = dynamic_cast<RedisLoadBalancerContext*>(context);
if (redis_context && redis_context->isReadCommand()) {
Expand Down Expand Up @@ -213,6 +224,12 @@ absl::string_view RedisLoadBalancerContextImpl::hashtag(absl::string_view v, boo

return v.substr(start + 1, end - start - 1);
}
RedisSpecifyShardContextImpl::RedisSpecifyShardContextImpl(
uint64_t shard_index, const NetworkFilters::Common::Redis::RespValue& request,
NetworkFilters::Common::Redis::Client::ReadPolicy read_policy)
: RedisLoadBalancerContextImpl(std::to_string(shard_index), true, true, request, read_policy),
shard_index_(shard_index) {}

} // namespace Redis
} // namespace Clusters
} // namespace Extensions
Expand Down
20 changes: 20 additions & 0 deletions source/extensions/clusters/redis/redis_cluster_lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ class RedisLoadBalancerContextImpl : public RedisLoadBalancerContext,
const NetworkFilters::Common::Redis::Client::ReadPolicy read_policy_;
};

class RedisSpecifyShardContextImpl : public RedisLoadBalancerContextImpl {
public:
/**
* The redis specify Shard load balancer context for Redis requests.
* @param shard_index specify the shard index for the Redis request.
* @param request specify the Redis request.
* @param read_policy specify the read policy.
*/
RedisSpecifyShardContextImpl(uint64_t shard_index,
const NetworkFilters::Common::Redis::RespValue& request,
NetworkFilters::Common::Redis::Client::ReadPolicy read_policy =
NetworkFilters::Common::Redis::Client::ReadPolicy::Primary);

// Upstream::LoadBalancerContextBase
absl::optional<uint64_t> computeHashKey() override { return shard_index_; }

private:
const absl::optional<uint64_t> shard_index_;
};

class ClusterSlotUpdateCallBack {
public:
virtual ~ClusterSlotUpdateCallBack() = default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ struct SupportedCommands {
*/
static const std::string& mset() { CONSTRUCT_ON_FIRST_USE(std::string, "mset"); }

/**
* @return keys command
*/
static const std::string& keys() { CONSTRUCT_ON_FIRST_USE(std::string, "keys"); }

/**
* @return ping command
*/
Expand All @@ -94,6 +99,11 @@ struct SupportedCommands {
*/
static const std::string& quit() { CONSTRUCT_ON_FIRST_USE(std::string, "quit"); }

/**
* @return quit command
*/
static const std::string& select() { CONSTRUCT_ON_FIRST_USE(std::string, "select"); }

/**
* @return commands which alters the state of redis
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "source/extensions/filters/network/redis_proxy/command_splitter_impl.h"

#include <cstdint>

#include "source/common/common/logger.h"
#include "source/extensions/filters/network/common/redis/supported_commands.h"

Expand Down Expand Up @@ -75,6 +77,35 @@ makeFragmentedRequest(const RouteSharedPtr& route, const std::string& command,
return handler;
}

/**
* Make request and maybe mirror the request based on the mirror policies of the route.
* @param route supplies the route matched with the request.
* @param command supplies the command of the request.
* @param key supplies the key of the request.
* @param incoming_request supplies the request.
* @param callbacks supplies the request completion callbacks.
* @param transaction supplies the transaction info of the current connection.
* @return PoolRequest* a handle to the active request or nullptr if the request could not be made
* for some reason.
*/
Common::Redis::Client::PoolRequest*
makeFragmentedRequestToShard(const RouteSharedPtr& route, const std::string& command,
uint16_t shard_index, const Common::Redis::RespValue& incoming_request,
ConnPool::PoolCallbacks& callbacks,
Common::Redis::Client::Transaction& transaction) {
auto handler = route->upstream(command)->makeRequestToShard(
shard_index, ConnPool::RespVariant(incoming_request), callbacks, transaction);
if (handler) {
for (auto& mirror_policy : route->mirrorPolicies()) {
if (mirror_policy->shouldMirror(command)) {
mirror_policy->upstream()->makeRequestToShard(
shard_index, ConnPool::RespVariant(incoming_request), null_pool_callbacks, transaction);
}
}
}
return handler;
}

// Send a string response downstream.
void localResponse(SplitCallbacks& callbacks, std::string response) {
Common::Redis::RespValuePtr res(new Common::Redis::RespValue());
Expand Down Expand Up @@ -385,6 +416,80 @@ void MSETRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t
}
}

SplitRequestPtr KeysRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source, bool delay_command_latency,
const StreamInfo::StreamInfo& stream_info) {
if (incoming_request->asArray().size() != 2) {
onWrongNumberOfArguments(callbacks, *incoming_request);
command_stats.error_.inc();
return nullptr;
}
const auto route = router.upstreamPool(incoming_request->asArray()[1].asString(), stream_info);
uint32_t shard_size =
route ? route->upstream(incoming_request->asArray()[0].asString())->shardSize() : 0;
if (shard_size == 0) {
command_stats.error_.inc();
callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost));
return nullptr;
}

std::unique_ptr<KeysRequest> request_ptr{
new KeysRequest(callbacks, command_stats, time_source, delay_command_latency)};
request_ptr->num_pending_responses_ = shard_size;
request_ptr->pending_requests_.reserve(request_ptr->num_pending_responses_);

request_ptr->pending_response_ = std::make_unique<Common::Redis::RespValue>();
request_ptr->pending_response_->type(Common::Redis::RespType::Array);

Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request);
for (uint32_t shard_index = 0; shard_index < shard_size; shard_index++) {
request_ptr->pending_requests_.emplace_back(*request_ptr, shard_index);
PendingRequest& pending_request = request_ptr->pending_requests_.back();

ENVOY_LOG(debug, "keys request shard index {}: {}", shard_index, base_request->toString());
pending_request.handle_ =
makeFragmentedRequestToShard(route, base_request->asArray()[0].asString(), shard_index,
*base_request, pending_request, callbacks.transaction());

if (!pending_request.handle_) {
pending_request.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost));
}
}

if (request_ptr->num_pending_responses_ > 0) {
return request_ptr;
}

return nullptr;
}

void KeysRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) {
pending_requests_[index].handle_ = nullptr;
switch (value->type()) {
case Common::Redis::RespType::Array: {
pending_response_->asArray().insert(pending_response_->asArray().end(),
value->asArray().begin(), value->asArray().end());
break;
}
default: {
error_count_++;
break;
}
}

ASSERT(num_pending_responses_ > 0);
if (--num_pending_responses_ == 0) {
updateStats(error_count_ == 0);
if (error_count_ == 0) {
callbacks_.onResponse(std::move(pending_response_));
} else {
callbacks_.onResponse(Common::Redis::Utility::makeError(
fmt::format("finished with {} error(s)", error_count_)));
}
}
}

SplitRequestPtr
SplitKeysSumResultRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
Expand Down Expand Up @@ -593,7 +698,7 @@ InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::s
Common::Redis::FaultManagerPtr&& fault_manager)
: router_(std::move(router)), simple_command_handler_(*router_),
eval_command_handler_(*router_), mget_handler_(*router_), mset_handler_(*router_),
split_keys_sum_result_handler_(*router_),
keys_handler_(*router_), split_keys_sum_result_handler_(*router_),
transaction_handler_(*router_), stats_{ALL_COMMAND_SPLITTER_STATS(
POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))},
time_source_(time_source), fault_manager_(std::move(fault_manager)) {
Expand All @@ -616,6 +721,9 @@ InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::s
addHandler(scope, stat_prefix, Common::Redis::SupportedCommands::mset(), latency_in_micros,
mset_handler_);

addHandler(scope, stat_prefix, Common::Redis::SupportedCommands::keys(), latency_in_micros,
keys_handler_);

for (const std::string& command : Common::Redis::SupportedCommands::transactionCommands()) {
addHandler(scope, stat_prefix, command, latency_in_micros, transaction_handler_);
}
Expand Down Expand Up @@ -704,6 +812,16 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request,
return nullptr;
}

if (command_name == Common::Redis::SupportedCommands::select()) {
// Respond to OK locally.
if (request->asArray().size() != 2) {
onInvalidRequest(callbacks);
return nullptr;
}
localResponse(callbacks, "OK");
return nullptr;
}

if (command_name == Common::Redis::SupportedCommands::quit()) {
callbacks.onQuit();
return nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,26 @@ class MGETRequest : public FragmentedRequest {
void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) override;
};

/**
* KeysRequest sends the command to all Redis server. The response from each Redis (which
* must be an array) is merged and returned to the user. If there is any error or failure in
* processing the fragmented commands, an error will be returned.
*/
class KeysRequest : public FragmentedRequest {
public:
static SplitRequestPtr create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source, bool delay_command_latency,
const StreamInfo::StreamInfo& stream_info);

private:
KeysRequest(SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source,
bool delay_command_latency)
: FragmentedRequest(callbacks, command_stats, time_source, delay_command_latency) {}
// RedisProxy::CommandSplitter::FragmentedRequest
void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) override;
};

/**
* SplitKeysSumResultRequest takes each key from the command and sends the same incoming command
* with each key to the appropriate Redis server. The response from each Redis (which must be an
Expand Down Expand Up @@ -390,6 +410,7 @@ class InstanceImpl : public Instance, Logger::Loggable<Logger::Id::redis> {
CommandHandlerFactory<EvalRequest> eval_command_handler_;
CommandHandlerFactory<MGETRequest> mget_handler_;
CommandHandlerFactory<MSETRequest> mset_handler_;
CommandHandlerFactory<KeysRequest> keys_handler_;
CommandHandlerFactory<SplitKeysSumResultRequest> split_keys_sum_result_handler_;
CommandHandlerFactory<TransactionRequest> transaction_handler_;
TrieLookupTable<HandlerDataPtr> handler_lookup_table_;
Expand Down
13 changes: 13 additions & 0 deletions source/extensions/filters/network/redis_proxy/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Instance {
public:
virtual ~Instance() = default;

virtual uint16_t shardSize() PURE;
/**
* Makes a redis request.
* @param hash_key supplies the key to use for consistent hashing.
Expand All @@ -64,6 +65,18 @@ class Instance {
virtual Common::Redis::Client::PoolRequest*
makeRequest(const std::string& hash_key, RespVariant&& request, PoolCallbacks& callbacks,
Common::Redis::Client::Transaction& transaction) PURE;
/**
* Makes a redis request.
* @param shard_index supplies the key to use for consistent hashing.
* @param request supplies the request to make.
* @param callbacks supplies the request completion callbacks.
* @param transaction supplies the transaction info of the current connection.
* @return PoolRequest* a handle to the active request or nullptr if the request could not be made
* for some reason.
*/
virtual Common::Redis::Client::PoolRequest*
makeRequestToShard(uint16_t shard_index, RespVariant&& request, PoolCallbacks& callbacks,
Common::Redis::Client::Transaction& transaction) PURE;
};

using InstanceSharedPtr = std::shared_ptr<Instance>;
Expand Down
Loading

0 comments on commit ea751e2

Please sign in to comment.