Skip to content

Commit

Permalink
[redis proxy]: add more test cases
Browse files Browse the repository at this point in the history
Signed-off-by: duanhongyi <duanhongyi@doopai.com>
  • Loading branch information
duanhongyi committed Dec 20, 2024
1 parent 6a157a6 commit 8af3e65
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 0 deletions.
43 changes: 43 additions & 0 deletions test/extensions/clusters/redis/redis_cluster_lb_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,49 @@ TEST_F(RedisClusterLoadBalancerTest, Basic) {
validateAssignment(hosts, expected_assignments);
}

TEST_F(RedisClusterLoadBalancerTest, Shard) {
Upstream::HostVector hosts{Upstream::makeTestHost(info_, "tcp://127.0.0.1:90", simTime()),
Upstream::makeTestHost(info_, "tcp://127.0.0.1:91", simTime()),
Upstream::makeTestHost(info_, "tcp://127.0.0.1:92", simTime())};

ClusterSlotsPtr slots = std::make_unique<std::vector<ClusterSlot>>(std::vector<ClusterSlot>{
ClusterSlot(0, 1000, hosts[0]->address()),
ClusterSlot(1001, 2000, hosts[1]->address()),
ClusterSlot(2001, 16383, hosts[2]->address()),
});
Upstream::HostMap all_hosts{
{hosts[0]->address()->asString(), hosts[0]},
{hosts[1]->address()->asString(), hosts[1]},
{hosts[2]->address()->asString(), hosts[2]},
};
init();
factory_->onClusterSlotUpdate(std::move(slots), all_hosts);

// A list of (hash: host_index) pair
// Simple read command
std::vector<NetworkFilters::Common::Redis::RespValue> get_foo(2);
get_foo[0].type(NetworkFilters::Common::Redis::RespType::BulkString);
get_foo[0].asString() = "get";
get_foo[1].type(NetworkFilters::Common::Redis::RespType::BulkString);
get_foo[1].asString() = "foo";

NetworkFilters::Common::Redis::RespValue get_request;
get_request.type(NetworkFilters::Common::Redis::RespType::Array);
get_request.asArray().swap(get_foo);

Upstream::LoadBalancerPtr lb = lb_->factory()->create(lb_params_);
for (uint16_t i = 0; i < 5; i++) {
RedisSpecifyShardContextImpl context(i, get_request);
auto host = lb->chooseHost(&context);
if (i < 3) {
EXPECT_FALSE(host == nullptr);
EXPECT_EQ(hosts[i]->address()->asString(), host->address()->asString());
} else {
EXPECT_TRUE(host == nullptr);
}
}
}

TEST_F(RedisClusterLoadBalancerTest, ReadStrategiesHealthy) {
Upstream::HostVector hosts{
Upstream::makeTestHost(info_, "tcp://127.0.0.1:90", simTime()),
Expand Down
85 changes: 85 additions & 0 deletions test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,91 @@ TEST_F(RedisConnPoolImplTest, Basic) {
tls_.shutdownThread();
};

TEST_F(RedisConnPoolImplTest, ShardSize) {
InSequence s;

setup();

Common::Redis::RespValueSharedPtr value = std::make_shared<Common::Redis::RespValue>();
MockPoolCallbacks callbacks;
Common::Redis::Client::MockClient* client = new NiceMock<Common::Redis::Client::MockClient>();

uint16_t shard_size = 3;
EXPECT_CALL(cm_.thread_local_cluster_.lb_, chooseHost(_))
.WillRepeatedly(
Invoke([&](Upstream::LoadBalancerContext* context) -> Upstream::HostConstSharedPtr {
EXPECT_EQ(context->metadataMatchCriteria(), nullptr);
EXPECT_EQ(context->downstreamConnection(), nullptr);
std::cout << (context->computeHashKey().value()) << std::endl;
if (context->computeHashKey() < shard_size) {
return cm_.thread_local_cluster_.lb_.host_;
}
return nullptr;
}));
EXPECT_CALL(*this, create_(_)).WillRepeatedly(Return(client));
EXPECT_CALL(*cm_.thread_local_cluster_.lb_.host_, address())
.WillRepeatedly(Return(test_address_));
EXPECT_EQ(conn_pool_->shardSize(), shard_size);

delete client;
tls_.shutdownThread();
};

TEST_F(RedisConnPoolImplTest, ShardHost) {
InSequence s;

setup();

Common::Redis::RespValueSharedPtr value = std::make_shared<Common::Redis::RespValue>();
Common::Redis::Client::MockPoolRequest active_request;
MockPoolCallbacks callbacks;
Common::Redis::Client::MockClient* client = new NiceMock<Common::Redis::Client::MockClient>();

EXPECT_CALL(cm_.thread_local_cluster_.lb_, chooseHost(_))
.WillOnce(Invoke([&](Upstream::LoadBalancerContext* context) -> Upstream::HostConstSharedPtr {
EXPECT_EQ(context->computeHashKey().value(), 0);
EXPECT_EQ(context->metadataMatchCriteria(), nullptr);
EXPECT_EQ(context->downstreamConnection(), nullptr);
return cm_.thread_local_cluster_.lb_.host_;
}));
EXPECT_CALL(*this, create_(_)).WillOnce(Return(client));
EXPECT_CALL(*cm_.thread_local_cluster_.lb_.host_, address())
.WillRepeatedly(Return(test_address_));
EXPECT_CALL(*client, makeRequest_(Ref(*value), _)).WillOnce(Return(&active_request));
Common::Redis::Client::PoolRequest* request =
conn_pool_->makeRequestToShard(0, value, callbacks, transaction_);
EXPECT_NE(nullptr, request);

EXPECT_CALL(active_request, cancel());
EXPECT_CALL(callbacks, onFailure_());
EXPECT_CALL(*client, close());
tls_.shutdownThread();
};

TEST_F(RedisConnPoolImplTest, ShardNoHost) {
InSequence s;

setup();

Common::Redis::RespValueSharedPtr value = std::make_shared<Common::Redis::RespValue>();
MockPoolCallbacks callbacks;

EXPECT_CALL(cm_.thread_local_cluster_.lb_, chooseHost(_))
.WillOnce(Invoke([&](Upstream::LoadBalancerContext* context) -> Upstream::HostConstSharedPtr {
EXPECT_EQ(context->computeHashKey().value(), 0);
EXPECT_EQ(context->metadataMatchCriteria(), nullptr);
EXPECT_EQ(context->downstreamConnection(), nullptr);
return nullptr;
}));
EXPECT_CALL(*cm_.thread_local_cluster_.lb_.host_, address())
.WillRepeatedly(Return(test_address_));
Common::Redis::Client::PoolRequest* request =
conn_pool_->makeRequestToShard(0, value, callbacks, transaction_);
EXPECT_EQ(nullptr, request);

tls_.shutdownThread();
};

TEST_F(RedisConnPoolImplTest, BasicRespVariant) {
InSequence s;

Expand Down

0 comments on commit 8af3e65

Please sign in to comment.