Skip to content

Commit

Permalink
[redis proxy]: add keys test cases
Browse files Browse the repository at this point in the history
Signed-off-by: duanhongyi <duanhongyi@doopai.com>
  • Loading branch information
duanhongyi committed Dec 19, 2024
1 parent b8c3125 commit ef12e37
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,13 @@ SplitRequestPtr KeysRequest::create(Router& router, Common::Redis::RespValuePtr&
return nullptr;
}
const auto route = router.upstreamPool(incoming_request->asArray()[1].asString(), stream_info);
if (!route) {
uint32_t shard_size =
route ? route->upstream(incoming_request->asArray()[0].asString())->shardSize() : 0;
if (shard_size == 0) {
command_stats.error_.inc();
callbacks.onResponse(Common::Redis::Utility::makeError(Response::get().NoUpstreamHost));
return nullptr;
}
auto shard_size = route->upstream(incoming_request->asArray()[0].asString())->shardSize();

std::unique_ptr<KeysRequest> request_ptr{
new KeysRequest(callbacks, command_stats, time_source, delay_command_latency)};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,20 +286,19 @@ uint16_t InstanceImpl::ThreadLocalPool::shardSize() {
if (cluster_ == nullptr) {
ASSERT(client_map_.empty());
ASSERT(host_set_member_update_cb_handle_ == nullptr);
return 1;
return 0;
}

uint16_t size = 1;
Common::Redis::RespValue request;
for (;; size++) {
for (uint16_t size = 0;; size++) {
Clusters::Redis::RedisSpecifyShardContextImpl lb_context(
size, request, Common::Redis::Client::ReadPolicy::Primary);
Upstream::HostConstSharedPtr host = cluster_->loadBalancer().chooseHost(&lb_context);
if (!host) {
break;
return size;
}
}
return size;
return 0;
}

Common::Redis::Client::PoolRequest*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,10 +603,13 @@ class FragmentedRequestCommandHandlerTest : public RedisCommandSplitterImplTest
pool_requests_.swap(tmp_pool_requests);
std::vector<Common::Redis::Client::MockPoolRequest> tmp_mirrored_pool_requests(shard_size);
mirror_pool_requests_.swap(tmp_mirrored_pool_requests);

EXPECT_CALL(callbacks_, connectionAllowed()).WillOnce(Return(true));

std::vector<Common::Redis::Client::MockPoolRequest> dummy_requests(shard_size);

EXPECT_CALL(*conn_pool_, shardSize_()).WillRepeatedly(Return(shard_size));
if (mirrored) {
EXPECT_CALL(*mirror_conn_pool_, shardSize_()).WillRepeatedly(Return(shard_size));
}
ConnPool::RespVariant keys(*request);
for (uint32_t i = 0; i < shard_size; i++) {
Common::Redis::Client::PoolRequest* request_to_use = &pool_requests_[i];
Expand Down Expand Up @@ -971,26 +974,92 @@ class KeysHandlerTest : public FragmentedRequestCommandHandlerTest,
makeRequestToShard(shard_size, request_strings, mirrored);
}

Common::Redis::RespValuePtr response(std::vector<Common::Redis::RespValue>& value) {
Common::Redis::RespValuePtr response() {
Common::Redis::RespValuePtr response = std::make_unique<Common::Redis::RespValue>();
response->type(Common::Redis::RespType::Array);
response->asArray() = value;
return response;
}
};

void makeBulkStringArray(Common::Redis::RespValue& value,
const std::vector<std::string>& strings) {
std::vector<Common::Redis::RespValue> values(strings.size());
for (uint64_t i = 0; i < strings.size(); i++) {
values[i].type(Common::Redis::RespType::BulkString);
values[i].asString() = strings[i];
}
TEST_P(KeysHandlerTest, Normal) {
InSequence s;

value.type(Common::Redis::RespType::Array);
value.asArray().swap(values);
}
setup(2, {});
EXPECT_NE(nullptr, handle_);
Common::Redis::RespValue expected_response;
expected_response.type(Common::Redis::RespType::Array);
pool_callbacks_[1]->onResponse(response());
time_system_.setMonotonicTime(std::chrono::milliseconds(10));
EXPECT_CALL(
store_,
deliverHistogramToSinks(
Property(&Stats::Metric::name, "redis.foo.command." + GetParam() + ".latency"), 10));
EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response)));
pool_callbacks_[0]->onResponse(response());
EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".total").value());
EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".success").value());
};

TEST_P(KeysHandlerTest, Mirrored) {
InSequence s;

setupMirrorPolicy();
setup(2, true);
EXPECT_NE(nullptr, handle_);

Common::Redis::RespValue expected_response;
expected_response.type(Common::Redis::RespType::Array);

pool_callbacks_[1]->onResponse(response());
mirror_pool_callbacks_[1]->onResponse(response());

time_system_.setMonotonicTime(std::chrono::milliseconds(10));
EXPECT_CALL(
store_,
deliverHistogramToSinks(
Property(&Stats::Metric::name, "redis.foo.command." + GetParam() + ".latency"), 10));
EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response)));
pool_callbacks_[0]->onResponse(response());
mirror_pool_callbacks_[0]->onResponse(response());

EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".total").value());
EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".success").value());
};

TEST_P(KeysHandlerTest, NormalOneZero) {
InSequence s;

setup(2);
EXPECT_NE(nullptr, handle_);

Common::Redis::RespValue expected_response;
expected_response.type(Common::Redis::RespType::Array);

pool_callbacks_[1]->onResponse(response());

EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response)));
pool_callbacks_[0]->onResponse(response());

EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".total").value());
EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".success").value());
};

TEST_P(KeysHandlerTest, NoUpstreamHostForAll) {
// No InSequence to avoid making setup() more complicated.

Common::Redis::RespValue expected_response;
expected_response.type(Common::Redis::RespType::Error);
expected_response.asString() = "no upstream host";

EXPECT_CALL(callbacks_, onResponse_(PointeesEq(&expected_response)));
setup(0);
EXPECT_EQ(nullptr, handle_);
EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".total").value());
EXPECT_EQ(1UL, store_.counter("redis.foo.command." + GetParam() + ".error").value());
};

INSTANTIATE_TEST_SUITE_P(KeysHandlerTest, KeysHandlerTest, testing::Values("keys"));

class RedisSplitKeysSumResultHandlerTest : public FragmentedRequestCommandHandlerTest,
public testing::WithParamInterface<std::string> {
public:
Expand Down
3 changes: 2 additions & 1 deletion test/extensions/filters/network/redis_proxy/mocks.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class MockInstance : public Instance {
MockInstance();
~MockInstance() override;

uint16_t shardSize() override { return 1; }
uint16_t shardSize() override { return shardSize_(); }

Common::Redis::Client::PoolRequest* makeRequest(const std::string& hash_key,
RespVariant&& request, PoolCallbacks& callbacks,
Expand All @@ -100,6 +100,7 @@ class MockInstance : public Instance {
return makeRequestToShard_(shard_index, request, callbacks);
}

MOCK_METHOD(uint16_t, shardSize_, ());
MOCK_METHOD(Common::Redis::Client::PoolRequest*, makeRequest_,
(const std::string& hash_key, RespVariant& request, PoolCallbacks& callbacks));
MOCK_METHOD(Common::Redis::Client::PoolRequest*, makeRequestToShard_,
Expand Down

0 comments on commit ef12e37

Please sign in to comment.