Skip to content

Commit

Permalink
Fix redis transaction when traffic is mirrored (new PR instead of #28149
Browse files Browse the repository at this point in the history
) (#28244)

Signed-off-by: asheryer <asheryer@amazon.com>
  • Loading branch information
asheryerm authored Jul 7, 2023
1 parent 00faa84 commit 224c0f9
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 20 deletions.
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ bug_fixes:
- area: maglev loadbalancer
change: |
Fixes maglev stability problem. Previously, maglev returns slightly different backend assignment from the same backends and keys.
- area: redis
change: |
Fixes a bug where redis transactions do not work properly when redis traffic is mirrored.
removed_config_or_runtime:
# *Normally occurs at the end of the* :ref:`deprecation period <deprecated>`
Expand Down
23 changes: 15 additions & 8 deletions source/extensions/filters/network/common/redis/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,22 +236,20 @@ struct EmptyArray : public Extensions::NetworkFilters::Common::Redis::RespValue
};

// A struct representing a Redis transaction.

struct Transaction {
Transaction(Network::ConnectionCallbacks* connection_cb) : connection_cb_(connection_cb) {}
~Transaction() {
if (connection_established_) {
client_->close();
connection_established_ = false;
}
}
~Transaction() { close(); }

void start() { active_ = true; }

void close() {
active_ = false;
key_.clear();
if (connection_established_) {
client_->close();
for (auto& client : clients_) {
client->close();
}
connection_established_ = false;
}
should_close_ = false;
Expand All @@ -260,9 +258,18 @@ struct Transaction {
bool active_{false};
bool connection_established_{false};
bool should_close_{false};

// The key which represents the transaction hash slot.
std::string key_;
ClientPtr client_;
// clients_[0] represents the main connection, clients_[1..n] are for
// the mirroring policies.
std::vector<ClientPtr> clients_;
Network::ConnectionCallbacks* connection_cb_;

// This index represents the current client on which traffic is being sent to.
// When sending to the main redis server it will be 0, and when sending to one of
// the mirror servers it will be 1..n.
uint32_t current_client_idx_{0};
};

class NoOpTransaction : public Transaction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,15 @@ struct SupportedCommands {
* @return commands which alters the state of redis
*/
static const absl::flat_hash_set<std::string>& writeCommands() {
CONSTRUCT_ON_FIRST_USE(
absl::flat_hash_set<std::string>, "append", "bitfield", "decr", "decrby", "del", "expire",
"expireat", "eval", "evalsha", "geoadd", "hdel", "hincrby", "hincrbyfloat", "hmset", "hset",
"hsetnx", "incr", "incrby", "incrbyfloat", "linsert", "lpop", "lpush", "lpushx", "lrem",
"lset", "ltrim", "mset", "persist", "pexpire", "pexpireat", "pfadd", "psetex", "restore",
"rpop", "rpush", "rpushx", "sadd", "set", "setbit", "setex", "setnx", "setrange", "spop",
"srem", "zadd", "zincrby", "touch", "zpopmin", "zpopmax", "zrem", "zremrangebylex",
"zremrangebyrank", "zremrangebyscore", "unlink");
CONSTRUCT_ON_FIRST_USE(absl::flat_hash_set<std::string>, "append", "bitfield", "decr", "decrby",
"del", "discard", "exec", "expire", "expireat", "eval", "evalsha",
"geoadd", "hdel", "hincrby", "hincrbyfloat", "hmset", "hset", "hsetnx",
"incr", "incrby", "incrbyfloat", "linsert", "lpop", "lpush", "lpushx",
"lrem", "lset", "ltrim", "mset", "multi", "persist", "pexpire",
"pexpireat", "pfadd", "psetex", "restore", "rpop", "rpush", "rpushx",
"sadd", "set", "setbit", "setex", "setnx", "setrange", "spop", "srem",
"zadd", "zincrby", "touch", "zpopmin", "zpopmax", "zrem",
"zremrangebylex", "zremrangebyrank", "zremrangebyscore", "unlink");
}

static bool isReadCommand(const std::string& command) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ Common::Redis::Client::PoolRequest* makeSingleServerRequest(
const RouteSharedPtr& route, const std::string& command, const std::string& key,
Common::Redis::RespValueConstSharedPtr incoming_request, ConnPool::PoolCallbacks& callbacks,
Common::Redis::Client::Transaction& transaction) {
// If a transaction is active, clients_[0] is the primary connection to the cluster.
// The subsequent clients in the array are used for mirroring.
transaction.current_client_idx_ = 0;
auto handler = route->upstream()->makeRequest(key, ConnPool::RespVariant(incoming_request),
callbacks, transaction);
if (handler) {
for (auto& mirror_policy : route->mirrorPolicies()) {
transaction.current_client_idx_++;
if (mirror_policy->shouldMirror(command)) {
mirror_policy->upstream()->makeRequest(key, ConnPool::RespVariant(incoming_request),
null_pool_callbacks, transaction);
Expand Down Expand Up @@ -504,22 +508,27 @@ SplitRequestPtr TransactionRequest::create(Router& router,
// key, and then send a MULTI command to the node that handles that key.
// The response for the MULTI command will be discarded since we pass the null_pool_callbacks
// to the handler.

RouteSharedPtr route;
if (transaction.key_.empty()) {
transaction.key_ = incoming_request->asArray()[1].asString();
route = router.upstreamPool(transaction.key_);
Common::Redis::RespValueSharedPtr multi_request =
std::make_shared<Common::Redis::Client::MultiRequest>();
if (route) {
// We reserve a client for the main connection and for each mirror connection.
transaction.clients_.resize(1 + route->mirrorPolicies().size());
makeSingleServerRequest(route, "MULTI", transaction.key_, multi_request, null_pool_callbacks,
callbacks.transaction());
transaction.connection_established_ = true;
}
} else {
route = router.upstreamPool(transaction.key_);
}

std::unique_ptr<TransactionRequest> request_ptr{
new TransactionRequest(callbacks, command_stats, time_source, delay_command_latency)};

if (route) {
Common::Redis::RespValueSharedPtr base_request = std::move(incoming_request);
request_ptr->handle_ =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,15 @@ InstanceImpl::ThreadLocalPool::makeRequest(const std::string& key, RespVariant&&
return nullptr;
}

uint32_t client_idx = transaction.current_client_idx_;
// If there is an active transaction, establish a new connection if necessary.
if (transaction.active_ && !transaction.connection_established_) {
transaction.client_ =
transaction.clients_[client_idx] =
client_factory_.create(host, dispatcher_, *config_, redis_command_stats_, *(stats_scope_),
auth_username_, auth_password_, true);
if (transaction.connection_cb_) {
transaction.client_->addConnectionCallbacks(*transaction.connection_cb_);
transaction.clients_[client_idx]->addConnectionCallbacks(*transaction.connection_cb_);
}
transaction.connection_established_ = true;
}

pending_requests_.emplace_back(*this, std::move(request), callbacks, host);
Expand All @@ -309,7 +309,7 @@ InstanceImpl::ThreadLocalPool::makeRequest(const std::string& key, RespVariant&&
pending_request.request_handler_ = client->redis_client_->makeRequest(
getRequest(pending_request.incoming_request_), pending_request);
} else {
pending_request.request_handler_ = transaction.client_->makeRequest(
pending_request.request_handler_ = transaction.clients_[client_idx]->makeRequest(
getRequest(pending_request.incoming_request_), pending_request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,39 @@ TEST_P(RedisProxyWithMirrorsIntegrationTest, MirroredCatchAllRequest) {
redis_client->close();
}

TEST_P(RedisProxyWithMirrorsIntegrationTest, MirroredTransaction) {
initialize();

std::array<FakeRawConnectionPtr, 3> fake_upstream_connection;

std::string request = makeBulkStringArray({"MULTI"}) +
makeBulkStringArray({"set", "foo", "bar"}) +
makeBulkStringArray({"get", "foo"}) + makeBulkStringArray({"exec"});
const std::string& response = "+OK\r\n+QUEUED\r\n+QUEUED\r\n*2\r\n+OK\r\n$3\r\nbar\r\n";

// roundtrip to cluster_0 (catch_all route)
IntegrationTcpClientPtr redis_client = makeTcpConnection(lookupPort("redis_proxy"));
ASSERT_TRUE(redis_client->write(request));

expectUpstreamRequestResponse(fake_upstreams_[0], request, response, fake_upstream_connection[0]);

// mirror to cluster_1 and cluster_2
expectUpstreamRequestResponse(fake_upstreams_[2], request, "$4\r\nbar1\r\n",
fake_upstream_connection[1]);
expectUpstreamRequestResponse(fake_upstreams_[4], request, "$4\r\nbar2\r\n",
fake_upstream_connection[2]);

redis_client->waitForData(response);
// The original response from the cluster_0 should be received by the fake Redis client and the
// response from mirrored requests are ignored.
EXPECT_EQ(response, redis_client->data());

EXPECT_TRUE(fake_upstream_connection[0]->close());
EXPECT_TRUE(fake_upstream_connection[1]->close());
EXPECT_TRUE(fake_upstream_connection[2]->close());
redis_client->close();
}

TEST_P(RedisProxyWithMirrorsIntegrationTest, MirroredWriteOnlyRequest) {
initialize();

Expand Down

0 comments on commit 224c0f9

Please sign in to comment.