Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[redis_proxy] Add support for SELECT and KEYS #37706

Merged
merged 3 commits into from
Dec 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ removed_config_or_runtime:
Removed runtime flag ``envoy.reloadable_features.exclude_host_in_eds_status_draining``.

new_features:
- area: redis
change: |
Added support for keys and select.
- area: wasm
change: |
Added the wasm vm reload support to reload wasm vm when the wasm vm is failed with runtime errors. See
Expand Down
4 changes: 3 additions & 1 deletion docs/root/intro/arch_overview/other_protocols/redis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,13 @@ For details on each command's usage see the official
EXISTS, Generic
EXPIRE, Generic
EXPIREAT, Generic
KEYS, String
PERSIST, Generic
PEXPIRE, Generic
PEXPIREAT, Generic
PTTL, Generic
RESTORE, Generic
SELECT, Generic
TOUCH, Generic
TTL, Generic
TYPE, Generic
Expand Down Expand Up @@ -300,7 +302,7 @@ Envoy can also generate its own errors in response to the client.
the connection."
invalid request, "Command was rejected by the first stage of the command splitter due to
datatype or length."
unsupported command, "The command was not recognized by Envoy and therefore cannot be serviced
ERR unknown command, "The command was not recognized by Envoy and therefore cannot be serviced
because it cannot be hashed to a backend server."
finished with n errors, "Fragmented commands which sum the response (e.g. DEL) will return the
total number of errors received if any were received."
Expand Down
21 changes: 19 additions & 2 deletions source/extensions/clusters/redis/redis_cluster_lb.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "source/extensions/clusters/redis/redis_cluster_lb.h"

#include <string>

namespace Envoy {
namespace Extensions {
namespace Clusters {
Expand Down Expand Up @@ -138,8 +140,17 @@ Upstream::HostConstSharedPtr RedisClusterLoadBalancerFactory::RedisClusterLoadBa
return nullptr;
}

auto shard = shard_vector_->at(
slot_array_->at(hash.value() % Envoy::Extensions::Clusters::Redis::MaxSlot));
RedisShardSharedPtr shard;
if (dynamic_cast<const RedisSpecifyShardContextImpl*>(context)) {
if (hash.value() < shard_vector_->size()) {
shard = shard_vector_->at(hash.value());
} else {
return nullptr;
}
} else {
shard = shard_vector_->at(
slot_array_->at(hash.value() % Envoy::Extensions::Clusters::Redis::MaxSlot));
}

auto redis_context = dynamic_cast<RedisLoadBalancerContext*>(context);
if (redis_context && redis_context->isReadCommand()) {
Expand Down Expand Up @@ -213,6 +224,12 @@ absl::string_view RedisLoadBalancerContextImpl::hashtag(absl::string_view v, boo

return v.substr(start + 1, end - start - 1);
}
RedisSpecifyShardContextImpl::RedisSpecifyShardContextImpl(
uint64_t shard_index, const NetworkFilters::Common::Redis::RespValue& request,
NetworkFilters::Common::Redis::Client::ReadPolicy read_policy)
: RedisLoadBalancerContextImpl(std::to_string(shard_index), true, true, request, read_policy),
shard_index_(shard_index) {}

} // namespace Redis
} // namespace Clusters
} // namespace Extensions
Expand Down
20 changes: 20 additions & 0 deletions source/extensions/clusters/redis/redis_cluster_lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ class RedisLoadBalancerContextImpl : public RedisLoadBalancerContext,
const NetworkFilters::Common::Redis::Client::ReadPolicy read_policy_;
};

class RedisSpecifyShardContextImpl : public RedisLoadBalancerContextImpl {
public:
/**
* The redis specify Shard load balancer context for Redis requests.
* @param shard_index specify the shard index for the Redis request.
* @param request specify the Redis request.
* @param read_policy specify the read policy.
*/
RedisSpecifyShardContextImpl(uint64_t shard_index,
const NetworkFilters::Common::Redis::RespValue& request,
NetworkFilters::Common::Redis::Client::ReadPolicy read_policy =
NetworkFilters::Common::Redis::Client::ReadPolicy::Primary);

// Upstream::LoadBalancerContextBase
absl::optional<uint64_t> computeHashKey() override { return shard_index_; }

private:
const absl::optional<uint64_t> shard_index_;
};

class ClusterSlotUpdateCallBack {
public:
virtual ~ClusterSlotUpdateCallBack() = default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ struct SupportedCommands {
*/
static const std::string& mset() { CONSTRUCT_ON_FIRST_USE(std::string, "mset"); }

/**
* @return keys command
*/
static const std::string& keys() { CONSTRUCT_ON_FIRST_USE(std::string, "keys"); }

/**
* @return ping command
*/
Expand All @@ -94,6 +99,11 @@ struct SupportedCommands {
*/
static const std::string& quit() { CONSTRUCT_ON_FIRST_USE(std::string, "quit"); }

/**
* @return select command
*/
static const std::string& select() { CONSTRUCT_ON_FIRST_USE(std::string, "select"); }

/**
* @return commands which alters the state of redis
*/
Expand All @@ -112,6 +122,14 @@ struct SupportedCommands {
static bool isReadCommand(const std::string& command) {
return !writeCommands().contains(command);
}

static bool isSupportedCommand(const std::string& command) {
return (simpleCommands().contains(command) || evalCommands().contains(command) ||
hashMultipleSumResultCommands().contains(command) ||
transactionCommands().contains(command) || auth() == command || echo() == command ||
mget() == command || mset() == command || keys() == command || ping() == command ||
time() == command || quit() == command || select() == command);
}
};

} // namespace Redis
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "source/extensions/filters/network/redis_proxy/command_splitter_impl.h"

#include <cstdint>

#include "source/common/common/logger.h"
#include "source/extensions/filters/network/common/redis/supported_commands.h"

Expand Down Expand Up @@ -75,6 +77,35 @@ makeFragmentedRequest(const RouteSharedPtr& route, const std::string& command,
return handler;
}

/**
* Make request and maybe mirror the request based on the mirror policies of the route.
* @param route supplies the route matched with the request.
* @param command supplies the command of the request.
* @param key supplies the key of the request.
* @param incoming_request supplies the request.
* @param callbacks supplies the request completion callbacks.
* @param transaction supplies the transaction info of the current connection.
* @return PoolRequest* a handle to the active request or nullptr if the request could not be made
* for some reason.
*/
Common::Redis::Client::PoolRequest*
makeFragmentedRequestToShard(const RouteSharedPtr& route, const std::string& command,
uint16_t shard_index, const Common::Redis::RespValue& incoming_request,
ConnPool::PoolCallbacks& callbacks,
Common::Redis::Client::Transaction& transaction) {
auto handler = route->upstream(command)->makeRequestToShard(
shard_index, ConnPool::RespVariant(incoming_request), callbacks, transaction);
if (handler) {
for (auto& mirror_policy : route->mirrorPolicies()) {
if (mirror_policy->shouldMirror(command)) {
mirror_policy->upstream()->makeRequestToShard(
shard_index, ConnPool::RespVariant(incoming_request), null_pool_callbacks, transaction);
}
}
}
return handler;
}

// Send a string response downstream.
void localResponse(SplitCallbacks& callbacks, std::string response) {
Common::Redis::RespValuePtr res(new Common::Redis::RespValue());
Expand Down Expand Up @@ -385,6 +416,80 @@ void MSETRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t
}
}

SplitRequestPtr KeysRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source, bool delay_command_latency,
const StreamInfo::StreamInfo& stream_info) {
if (incoming_request->asArray().size() != 2) {
onWrongNumberOfArguments(callbacks, *incoming_request);
command_stats.error_.inc();
return nullptr;
}
const auto route = router.upstreamPool(incoming_request->asArray()[1].asString(), stream_info);
uint32_t shard_size =
route ? route->upstream(incoming_request->asArray()[0].asString())->shardSize() : 0;
if (shard_size == 0) {
command_stats.error_.inc();
callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost));
return nullptr;
}

std::unique_ptr<KeysRequest> request_ptr{
new KeysRequest(callbacks, command_stats, time_source, delay_command_latency)};
request_ptr->num_pending_responses_ = shard_size;
request_ptr->pending_requests_.reserve(request_ptr->num_pending_responses_);

request_ptr->pending_response_ = std::make_unique<Common::Redis::RespValue>();
request_ptr->pending_response_->type(Common::Redis::RespType::Array);

Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request);
for (uint32_t shard_index = 0; shard_index < shard_size; shard_index++) {
request_ptr->pending_requests_.emplace_back(*request_ptr, shard_index);
PendingRequest& pending_request = request_ptr->pending_requests_.back();

ENVOY_LOG(debug, "keys request shard index {}: {}", shard_index, base_request->toString());
pending_request.handle_ =
makeFragmentedRequestToShard(route, base_request->asArray()[0].asString(), shard_index,
*base_request, pending_request, callbacks.transaction());

if (!pending_request.handle_) {
pending_request.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost));
}
}

if (request_ptr->num_pending_responses_ > 0) {
return request_ptr;
}

return nullptr;
}

void KeysRequest::onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) {
pending_requests_[index].handle_ = nullptr;
switch (value->type()) {
case Common::Redis::RespType::Array: {
pending_response_->asArray().insert(pending_response_->asArray().end(),
value->asArray().begin(), value->asArray().end());
break;
}
default: {
error_count_++;
break;
}
}

ASSERT(num_pending_responses_ > 0);
if (--num_pending_responses_ == 0) {
updateStats(error_count_ == 0);
if (error_count_ == 0) {
callbacks_.onResponse(std::move(pending_response_));
} else {
callbacks_.onResponse(Common::Redis::Utility::makeError(
fmt::format("finished with {} error(s)", error_count_)));
}
}
}

SplitRequestPtr
SplitKeysSumResultRequest::create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
Expand Down Expand Up @@ -593,7 +698,7 @@ InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::s
Common::Redis::FaultManagerPtr&& fault_manager)
: router_(std::move(router)), simple_command_handler_(*router_),
eval_command_handler_(*router_), mget_handler_(*router_), mset_handler_(*router_),
split_keys_sum_result_handler_(*router_),
keys_handler_(*router_), split_keys_sum_result_handler_(*router_),
transaction_handler_(*router_), stats_{ALL_COMMAND_SPLITTER_STATS(
POOL_COUNTER_PREFIX(scope, stat_prefix + "splitter."))},
time_source_(time_source), fault_manager_(std::move(fault_manager)) {
Expand All @@ -616,6 +721,9 @@ InstanceImpl::InstanceImpl(RouterPtr&& router, Stats::Scope& scope, const std::s
addHandler(scope, stat_prefix, Common::Redis::SupportedCommands::mset(), latency_in_micros,
mset_handler_);

addHandler(scope, stat_prefix, Common::Redis::SupportedCommands::keys(), latency_in_micros,
keys_handler_);

for (const std::string& command : Common::Redis::SupportedCommands::transactionCommands()) {
addHandler(scope, stat_prefix, command, latency_in_micros, transaction_handler_);
}
Expand All @@ -637,6 +745,15 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request,
}

std::string command_name = absl::AsciiStrToLower(request->asArray()[0].asString());
// Compatible with redis behavior, if there is an unsupported command, return immediately,
// this action must be performed before verifying auth, some redis clients rely on this behavior.
if (!Common::Redis::SupportedCommands::isSupportedCommand(command_name)) {
stats_.unsupported_command_.inc();
callbacks.onResponse(Common::Redis::Utility::makeError(fmt::format(
"ERR unknown command '{}', with args beginning with: {}", request->asArray()[0].asString(),
request->asArray().size() > 1 ? request->asArray()[1].asString() : "")));
return nullptr;
}

if (command_name == Common::Redis::SupportedCommands::auth()) {
if (request->asArray().size() < 2) {
Expand Down Expand Up @@ -704,6 +821,16 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request,
return nullptr;
}

if (command_name == Common::Redis::SupportedCommands::select()) {
// Respond to OK locally.
if (request->asArray().size() != 2) {
onInvalidRequest(callbacks);
return nullptr;
}
localResponse(callbacks, "OK");
return nullptr;
}

if (command_name == Common::Redis::SupportedCommands::quit()) {
callbacks.onQuit();
return nullptr;
Expand All @@ -718,12 +845,7 @@ SplitRequestPtr InstanceImpl::makeRequest(Common::Redis::RespValuePtr&& request,

// Get the handler for the downstream request
auto handler = handler_lookup_table_.find(command_name.c_str());
if (handler == nullptr) {
stats_.unsupported_command_.inc();
callbacks.onResponse(Common::Redis::Utility::makeError(
fmt::format("unsupported command '{}'", request->asArray()[0].asString())));
return nullptr;
}
ASSERT(handler != nullptr);

// If we are within a transaction, forward all requests to the transaction handler (i.e. handler
// of "multi" command).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,26 @@ class MGETRequest : public FragmentedRequest {
void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) override;
};

/**
* KeysRequest sends the command to all Redis server. The response from each Redis (which
* must be an array) is merged and returned to the user. If there is any error or failure in
* processing the fragmented commands, an error will be returned.
*/
class KeysRequest : public FragmentedRequest {
public:
static SplitRequestPtr create(Router& router, Common::Redis::RespValuePtr&& incoming_request,
SplitCallbacks& callbacks, CommandStats& command_stats,
TimeSource& time_source, bool delay_command_latency,
const StreamInfo::StreamInfo& stream_info);

private:
KeysRequest(SplitCallbacks& callbacks, CommandStats& command_stats, TimeSource& time_source,
bool delay_command_latency)
: FragmentedRequest(callbacks, command_stats, time_source, delay_command_latency) {}
// RedisProxy::CommandSplitter::FragmentedRequest
void onChildResponse(Common::Redis::RespValuePtr&& value, uint32_t index) override;
};

/**
* SplitKeysSumResultRequest takes each key from the command and sends the same incoming command
* with each key to the appropriate Redis server. The response from each Redis (which must be an
Expand Down Expand Up @@ -390,6 +410,7 @@ class InstanceImpl : public Instance, Logger::Loggable<Logger::Id::redis> {
CommandHandlerFactory<EvalRequest> eval_command_handler_;
CommandHandlerFactory<MGETRequest> mget_handler_;
CommandHandlerFactory<MSETRequest> mset_handler_;
CommandHandlerFactory<KeysRequest> keys_handler_;
CommandHandlerFactory<SplitKeysSumResultRequest> split_keys_sum_result_handler_;
CommandHandlerFactory<TransactionRequest> transaction_handler_;
TrieLookupTable<HandlerDataPtr> handler_lookup_table_;
Expand Down
13 changes: 13 additions & 0 deletions source/extensions/filters/network/redis_proxy/conn_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class Instance {
public:
virtual ~Instance() = default;

virtual uint16_t shardSize() PURE;
/**
* Makes a redis request.
* @param hash_key supplies the key to use for consistent hashing.
Expand All @@ -64,6 +65,18 @@ class Instance {
virtual Common::Redis::Client::PoolRequest*
makeRequest(const std::string& hash_key, RespVariant&& request, PoolCallbacks& callbacks,
Common::Redis::Client::Transaction& transaction) PURE;
/**
* Makes a redis request.
* @param shard_index supplies the key to use for consistent hashing.
* @param request supplies the request to make.
* @param callbacks supplies the request completion callbacks.
* @param transaction supplies the transaction info of the current connection.
* @return PoolRequest* a handle to the active request or nullptr if the request could not be made
* for some reason.
*/
virtual Common::Redis::Client::PoolRequest*
makeRequestToShard(uint16_t shard_index, RespVariant&& request, PoolCallbacks& callbacks,
Common::Redis::Client::Transaction& transaction) PURE;
};

using InstanceSharedPtr = std::shared_ptr<Instance>;
Expand Down
Loading