Skip to content

Commit

Permalink
NodeId-bound cluster connections #104
Browse files Browse the repository at this point in the history
Motivation: Redis cluster nodes can be identified by three-and-a-half means:

* NodeId
* Host and Port
* Slot
** The half: Master/slave state for a certain slot

The identification details can be moved/changed at runtime. Most prominent examples are slots and master/slave state. A certain nodeId can be moved as well from one host/port to another one. The previous implementation did not care too much about that fact; all connections were identified by host and port.

While moving a certain nodeId from one host to another is quite unlikely, it still might happen. The connection pool, therefore, distinguishes now between host and port-bound and nodeId-bound connections. Host and port-bound connections stick to the particular host/port. NodeId-bound connections are reconfigured once the cluster topology changes.

Another effect of the change is, the connection management can double the number connections because connections are not shared amongst the identifier classes.
  • Loading branch information
mp911de committed Jul 27, 2015
1 parent 1818ee1 commit d0c6ee3
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.api.StatefulRedisConnection;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandArgs;
import com.lambdaworks.redis.protocol.CommandKeyword;
import com.lambdaworks.redis.protocol.RedisCommand;
Expand All @@ -31,10 +33,8 @@ class ClusterDistributionChannelWriter<K, V> implements RedisChannelWriter<K, V>
private boolean closed = false;
private int executionLimit = 5;

public ClusterDistributionChannelWriter(RedisChannelWriter<K, V> defaultWriter,
ClusterConnectionProvider clusterConnectionProvider) {
public ClusterDistributionChannelWriter(RedisChannelWriter<K, V> defaultWriter) {
this.defaultWriter = defaultWriter;
this.clusterConnectionProvider = clusterConnectionProvider;
}

@Override
Expand Down Expand Up @@ -174,6 +174,10 @@ public void reset() {
clusterConnectionProvider.reset();
}

public void setClusterConnectionProvider(ClusterConnectionProvider clusterConnectionProvider) {
this.clusterConnectionProvider = clusterConnectionProvider;
}

public void setPartitions(Partitions partitions) {
clusterConnectionProvider.setPartitions(partitions);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.lambdaworks.redis.cluster;

import java.util.Queue;
import java.util.Set;

import com.google.common.collect.ImmutableSet;
import com.lambdaworks.redis.ClientOptions;
import com.lambdaworks.redis.RedisChannelWriter;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.protocol.CommandHandler;
import com.lambdaworks.redis.protocol.ConnectionWatchdog;
import com.lambdaworks.redis.protocol.RedisCommand;

import io.netty.channel.ChannelHandler;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

/**
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a>
*/
@ChannelHandler.Sharable
class ClusterNodeCommandHandler<K, V> extends CommandHandler<K, V> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(ClusterNodeCommandHandler.class);
private static final Set<LifecycleState> CHANNEL_OPEN_STATES = ImmutableSet.of(LifecycleState.ACTIVATING,
LifecycleState.ACTIVE, LifecycleState.CONNECTED);

private final RedisChannelWriter<K, V> clusterChannelWriter;

/**
* Initialize a new instance that handles commands from the supplied queue.
*
* @param clientOptions client options for this connection
* @param queue The command queue
* @param clusterChannelWriter top-most channel writer.
*/
public ClusterNodeCommandHandler(ClientOptions clientOptions, Queue<RedisCommand<K, V, ?>> queue,
RedisChannelWriter<K, V> clusterChannelWriter) {
super(clientOptions, queue);
this.clusterChannelWriter = clusterChannelWriter;
}

/**
* Prepare the closing of the channel.
*/
public void prepareClose() {
if (channel != null) {
ConnectionWatchdog connectionWatchdog = channel.pipeline().get(ConnectionWatchdog.class);
if (connectionWatchdog != null) {
connectionWatchdog.setReconnectSuspended(true);
}
}
}

/**
* Move queued and buffered commands from the inactive connection to the master command writer. This is done only if the
* current connection is disconnected and auto-reconnect is enabled (command-retries). If the connection would be open, we
* could get into a race that the commands we're moving are right now in processing. Alive connections can handle redirects
* and retries on their own.
*/
@Override
public void close() {

logger.debug("{} close()", logPrefix());

if (clusterChannelWriter != null) {
if (isAutoReconnect() && !CHANNEL_OPEN_STATES.contains(getState())) {
for (RedisCommand<K, V, ?> queuedCommand : queue) {
try {
clusterChannelWriter.write(queuedCommand);
} catch (RedisException e) {
queuedCommand.completeExceptionally(e);
queuedCommand.complete();
}
}

queue.clear();
}

for (RedisCommand<K, V, ?> queuedCommand : commandBuffer) {
try {
clusterChannelWriter.write(queuedCommand);
} catch (RedisException e) {
queuedCommand.completeExceptionally(e);
}
}

commandBuffer.clear();
}

super.close();
}

public boolean isAutoReconnect() {
return clientOptions.isAutoReconnect();
}

public boolean isQueueEmpty() {
if (queue.isEmpty() && commandBuffer.isEmpty()) {
return true;
}

return false;
}

}
Loading

0 comments on commit d0c6ee3

Please sign in to comment.