Skip to content

Commit

Permalink
update slot mapping with time interval
Browse files Browse the repository at this point in the history
  • Loading branch information
sewenew committed Sep 16, 2024
1 parent 8232fc5 commit 6ba5d8e
Show file tree
Hide file tree
Showing 11 changed files with 60 additions and 83 deletions.
2 changes: 0 additions & 2 deletions src/sw/redis++/async_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,6 @@ class ClusterEvent : public CommandEvent<Result, ResultParser> {
// i.e. ClosedError or IoError, we need to update node-slot mapping.
try {
std::rethrow_exception(err);
} catch (const SlotUncoveredError &) {
detail::update_shards(_key, _pool, AsyncEventUPtr(new UpdateShardsEvent));
} catch (const IoError &) {
detail::update_shards(_key, _pool, AsyncEventUPtr(new UpdateShardsEvent));
} catch (const ClosedError &) {
Expand Down
20 changes: 13 additions & 7 deletions src/sw/redis++/async_redis_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,24 @@ AsyncRedisCluster::AsyncRedisCluster(const ConnectionOptions &opts,
_loop = std::make_shared<EventLoop>();
}

_pool = std::make_shared<AsyncShardsPool>(_loop, pool_opts, opts, role);
_pool = std::make_shared<AsyncShardsPool>(_loop, pool_opts, opts, role, ClusterOptions{});
}

AsyncRedisCluster::AsyncRedisCluster(const ConnectionOptions &opts,
const ConnectionPoolOptions &pool_opts,
Role role,
const ClusterOptions &cluster_opts,
const EventLoopSPtr &loop) : _loop(loop) {
if (!_loop) {
_loop = std::make_shared<EventLoop>();
}

_pool = std::make_shared<AsyncShardsPool>(_loop, pool_opts, opts, role, cluster_opts);
}

AsyncRedis AsyncRedisCluster::redis(const StringView &hash_tag, bool new_connection) {
assert(_pool);

_pool->update();

auto pool = _pool->fetch(hash_tag);
if (new_connection) {
// Create a new pool.
Expand All @@ -52,8 +62,6 @@ AsyncRedis AsyncRedisCluster::redis(const StringView &hash_tag, bool new_connect
AsyncSubscriber AsyncRedisCluster::subscriber() {
assert(_pool);

_pool->update();

auto opts = _pool->connection_options();

auto connection = std::make_shared<AsyncConnection>(opts, _loop.get());
Expand All @@ -65,8 +73,6 @@ AsyncSubscriber AsyncRedisCluster::subscriber() {
AsyncSubscriber AsyncRedisCluster::subscriber(const StringView &hash_tag) {
assert(_pool);

_pool->update();

auto opts = _pool->connection_options(hash_tag);

auto connection = std::make_shared<AsyncConnection>(opts, _loop.get());
Expand Down
6 changes: 6 additions & 0 deletions src/sw/redis++/async_redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class AsyncRedisCluster {
Role role = Role::MASTER,
const EventLoopSPtr &loop = nullptr);

AsyncRedisCluster(const ConnectionOptions &opts,
const ConnectionPoolOptions &pool_opts,
Role role,
const ClusterOptions &cluster_opts,
const EventLoopSPtr &loop = nullptr);

explicit AsyncRedisCluster(const std::string &uri) : AsyncRedisCluster(Uri(uri)) {}

AsyncRedisCluster(const AsyncRedisCluster &) = delete;
Expand Down
14 changes: 10 additions & 4 deletions src/sw/redis++/async_shards_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ const std::size_t AsyncShardsPool::SHARDS;
AsyncShardsPool::AsyncShardsPool(const EventLoopSPtr &loop,
const ConnectionPoolOptions &pool_opts,
const ConnectionOptions &connection_opts,
Role role) :
Role role,
const ClusterOptions &cluster_opts) :
_pool_opts(pool_opts),
_connection_opts(connection_opts),
_role(role),
_cluster_opts(cluster_opts),
_loop(loop) {
assert(loop);

Expand Down Expand Up @@ -208,8 +210,12 @@ auto AsyncShardsPool::_fetch_events() -> std::queue<RedeliverEvent> {
std::queue<RedeliverEvent> events;

std::unique_lock<std::mutex> lock(_mutex);
if (_events.empty()) {
_cv.wait(lock, [this]() { return !(this->_events).empty(); } );

if (!_cv.wait_for(lock,
_cluster_opts.slot_map_refresh_interval,
[this]() { return !(this->_events).empty(); })) {
// Reach timeout, but there's still no event, put an update event.
_events.push(RedeliverEvent{{}, AsyncEventUPtr(new UpdateShardsEvent)});
}

events.swap(_events);
Expand Down Expand Up @@ -238,7 +244,7 @@ Shards AsyncShardsPool::_get_shards(const std::string &host, int port) {
auto opts = _connection_opts;
opts.host = host;
opts.port = port;
ShardsPool pool(_pool_opts, opts, _role);
ShardsPool pool(_pool_opts, opts, _role, _cluster_opts);

return pool.shards();
}
Expand Down
5 changes: 4 additions & 1 deletion src/sw/redis++/async_shards_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class AsyncShardsPool {
AsyncShardsPool(const EventLoopSPtr &loop,
const ConnectionPoolOptions &pool_opts,
const ConnectionOptions &connection_opts,
Role role);
Role role,
const ClusterOptions &cluster_opts);

AsyncConnectionPoolSPtr fetch(const StringView &key);

Expand Down Expand Up @@ -101,6 +102,8 @@ class AsyncShardsPool {

Role _role = Role::MASTER;

ClusterOptions _cluster_opts;

Shards _shards;

NodeMap _pools;
Expand Down
2 changes: 1 addition & 1 deletion src/sw/redis++/crc16.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* Output for "123456789" : 31C3
*/

#include "utils.h"
#include "sw/redis++/utils.h"
#include <cstdint>

namespace sw {
Expand Down
10 changes: 0 additions & 10 deletions src/sw/redis++/redis_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ RedisCluster::RedisCluster(const Uri &uri) :
Redis RedisCluster::redis(const StringView &hash_tag, bool new_connection) {
assert(_pool);

_pool->async_update();

auto pool = _pool->fetch(hash_tag);
if (new_connection) {
// Create a new pool
Expand All @@ -46,8 +44,6 @@ Redis RedisCluster::redis(const StringView &hash_tag, bool new_connection) {
Pipeline RedisCluster::pipeline(const StringView &hash_tag, bool new_connection) {
assert(_pool);

_pool->async_update();

auto pool = _pool->fetch(hash_tag);
if (new_connection) {
// Create a new pool
Expand All @@ -60,8 +56,6 @@ Pipeline RedisCluster::pipeline(const StringView &hash_tag, bool new_connection)
Transaction RedisCluster::transaction(const StringView &hash_tag, bool piped, bool new_connection) {
assert(_pool);

_pool->async_update();

auto pool = _pool->fetch(hash_tag);
if (new_connection) {
// Create a new pool
Expand All @@ -74,17 +68,13 @@ Transaction RedisCluster::transaction(const StringView &hash_tag, bool piped, bo
Subscriber RedisCluster::subscriber() {
assert(_pool);

_pool->async_update();

auto opts = _pool->connection_options();
return Subscriber(Connection(opts));
}

Subscriber RedisCluster::subscriber(const StringView &hash_tag) {
assert(_pool);

_pool->async_update();

auto opts = _pool->connection_options(hash_tag);
return Subscriber(Connection(opts));
}
Expand Down
3 changes: 2 additions & 1 deletion src/sw/redis++/redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class RedisCluster {
public:
explicit RedisCluster(const ConnectionOptions &connection_opts,
const ConnectionPoolOptions &pool_opts = {},
Role role = Role::MASTER) : _pool(new ShardsPool(pool_opts, connection_opts, role)) {}
Role role = Role::MASTER,
const ClusterOptions &cluster_opts = {}) : _pool(new ShardsPool(pool_opts, connection_opts, role, cluster_opts)) {}

// Construct RedisCluster with URI:
// "tcp://127.0.0.1" or "tcp://127.0.0.1:6379"
Expand Down
5 changes: 0 additions & 5 deletions src/sw/redis++/redis_cluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1351,11 +1351,6 @@ ReplyUPtr RedisCluster::_command(Cmd cmd, const StringView &key, Args &&...args)
SafeConnection safe_connection(*pool);

return _command(cmd, safe_connection.connection(), std::forward<Args>(args)...);
} catch (const SlotUncoveredError &) {
// Some slot is not covered, update asynchronously to see if new node added.
// Check https://github.com/sewenew/redis-plus-plus/issues/255 for detail.
// TODO: should we replace other 'update's with 'async_update's?
_pool->async_update();
} catch (const IoError &) {
// When master is down, one of its replicas will be promoted to be the new master.
// If we try to send command to the old master, we'll get an *IoError*.
Expand Down
54 changes: 13 additions & 41 deletions src/sw/redis++/shards_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const std::size_t ShardsPool::SHARDS;

ShardsPool::ShardsPool(const ConnectionPoolOptions &pool_opts,
const ConnectionOptions &connection_opts,
Role role) :
Role role,
const ClusterOptions &cluster_opts) :
_pool_opts(pool_opts),
_connection_opts(connection_opts),
_role(role) {
_role(role),
_cluster_opts(cluster_opts) {
if (_connection_opts.type != ConnectionType::TCP) {
throw Error("Only support TCP connection for Redis Cluster");
}
Expand All @@ -47,7 +49,7 @@ ShardsPool::~ShardsPool() {
{
std::lock_guard<std::mutex> lock(_mutex);

_update_status = UpdateStatus::STOP;
_stop = true;
}

_cv.notify_one();
Expand Down Expand Up @@ -172,22 +174,6 @@ std::vector<ConnectionPoolSPtr> ShardsPool::pools() {
return nodes;
}

void ShardsPool::async_update() {
bool should_update = false;
{
std::lock_guard<std::mutex> lock(_mutex);

if (_update_status == UpdateStatus::UPDATED) {
should_update = true;
_update_status = UpdateStatus::STALE;
}
}

if (should_update) {
_cv.notify_one();
}
}

void ShardsPool::_init_pool(const Shards &shards) {
for (const auto &shard : shards) {
_add_node(shard.second);
Expand Down Expand Up @@ -385,34 +371,20 @@ auto ShardsPool::_add_node(const Node &node) -> NodeMap::iterator {
void ShardsPool::_run() {
while (true) {
std::unique_lock<std::mutex> lock(_mutex);
if (_update_status == UpdateStatus::UPDATED) {
_cv.wait(lock, [this]() { return this->_update_status != UpdateStatus::UPDATED; });
}

if (_update_status == UpdateStatus::STOP) {
if (_cv.wait_for(lock,
_cluster_opts.slot_map_refresh_interval,
[this]() { return this->_stop; })) {
break;
} else if (_update_status == UpdateStatus::STALE) {
lock.unlock();

_do_async_update();
} else {
assert("invalid UpdateStatus");
}
}
}

void ShardsPool::_do_async_update() {
try {
update();

std::lock_guard<std::mutex> lock(_mutex);
lock.unlock();

if (_update_status != UpdateStatus::STOP) {
_update_status = UpdateStatus::UPDATED;
try {
update();
} catch (...) {
// Ignore exceptions.
}
} catch (...) {
// Ignore exceptions.
// TODO: should we sleep a while?
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/sw/redis++/shards_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define SEWENEW_REDISPLUSPLUS_SHARDS_POOL_H

#include <cassert>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
Expand All @@ -34,10 +35,13 @@ namespace sw {

namespace redis {

struct ClusterOptions {
// Automatically update slot map every `slot_map_refresh_interval`.
std::chrono::milliseconds slot_map_refresh_interval = std::chrono::seconds(10);
};

class ShardsPool {
public:
ShardsPool() = default;

ShardsPool(const ShardsPool &that) = delete;
ShardsPool& operator=(const ShardsPool &that) = delete;

Expand All @@ -48,7 +52,8 @@ class ShardsPool {

ShardsPool(const ConnectionPoolOptions &pool_opts,
const ConnectionOptions &connection_opts,
Role role);
Role role,
const ClusterOptions &cluster_opts);

// Fetch a connection by key.
ConnectionPoolSPtr fetch(const StringView &key);
Expand All @@ -69,8 +74,6 @@ class ShardsPool {

std::vector<ConnectionPoolSPtr> pools();

void async_update();

private:
void _init_pool(const Shards &shards);

Expand Down Expand Up @@ -117,12 +120,7 @@ class ShardsPool {

NodeMap _pools;

enum class UpdateStatus {
STALE = 0,
UPDATED,
STOP
};
UpdateStatus _update_status = UpdateStatus::UPDATED;
bool _stop = false;

std::thread _worker;

Expand All @@ -132,6 +130,8 @@ class ShardsPool {

Role _role = Role::MASTER;

ClusterOptions _cluster_opts;

static const std::size_t SHARDS = 16383;
};

Expand Down

0 comments on commit 6ba5d8e

Please sign in to comment.