Skip to content

Commit

Permalink
Issue #1252: Fix creating a lot of new Jedis instances on unstable cl…
Browse files Browse the repository at this point in the history
…uster, fix slots clearing without filling (#1253)

* Issue #1252: Fix creating lot of new Jedis instances on unstable cluster, fix slots clearing without filling

* Issue #1252: Acquire one long lock for trying all nodes when rediscover cluster

Conflicts:
	src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
  • Loading branch information
Spikhalskiy authored and marcosnils committed Jul 19, 2016
1 parent 13cf301 commit 848fca2
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 75 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package redis.clients.jedis;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;

Expand All @@ -25,8 +22,7 @@ public JedisClusterConnectionHandler(Set<HostAndPort> nodes,
abstract Jedis getConnectionFromSlot(int slot);

public Jedis getConnectionFromNode(HostAndPort node) {
cache.setNodeIfNotExist(node);
return cache.getNode(JedisClusterInfoCache.getNodeKey(node)).getResource();
return cache.setupNodeIfNotExist(node).getResource();
}

public Map<String, JedisPool> getNodes() {
Expand All @@ -50,39 +46,15 @@ private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPool
}

public void renewSlotCache() {
for (JedisPool jp : getShuffledNodesPool()) {
Jedis jedis = null;
try {
jedis = jp.getResource();
cache.discoverClusterSlots(jedis);
break;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}
}
cache.renewClusterSlots(null);
}

public void renewSlotCache(Jedis jedis) {
try {
cache.discoverClusterSlots(jedis);
} catch (JedisConnectionException e) {
renewSlotCache();
}
cache.renewClusterSlots(jedis);
}

@Override
public void close() {
cache.reset();
}

protected List<JedisPool> getShuffledNodesPool() {
List<JedisPool> pools = new ArrayList<JedisPool>();
pools.addAll(cache.getNodes().values());
Collections.shuffle(pools);
return pools;
}
}
110 changes: 68 additions & 42 deletions src/main/java/redis/clients/jedis/JedisClusterInfoCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import redis.clients.util.SafeEncoder;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -11,6 +12,8 @@

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
public class JedisClusterInfoCache {
private final Map<String, JedisPool> nodes = new HashMap<String, JedisPool>();
private final Map<Integer, JedisPool> slots = new HashMap<Integer, JedisPool>();
Expand Down Expand Up @@ -62,7 +65,7 @@ public void discoverClusterNodesAndSlots(Jedis jedis) {
}

HostAndPort targetNode = generateHostAndPort(hostInfos);
setNodeIfNotExist(targetNode);
setupNodeIfNotExist(targetNode);
if (i == MASTER_NODE_INDEX) {
assignSlotsToNode(slotNums, targetNode);
}
Expand All @@ -73,37 +76,34 @@ public void discoverClusterNodesAndSlots(Jedis jedis) {
}
}

public void discoverClusterSlots(Jedis jedis) {
public void renewClusterSlots(Jedis jedis) {
//If rediscovering is already in process - no need to start one more same rediscovering, just return
if (!rediscovering) {
w.lock();
rediscovering = true;

try {
this.slots.clear();

List<Object> slots = jedis.clusterSlots();

for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;

if (slotInfo.size() <= 2) {
continue;
w.lock();
rediscovering = true;

if (jedis != null) {
try {
discoverClusterSlots(jedis);
return;
} catch (JedisException e) {
//try nodes from all pools
}
}

List<Integer> slotNums = getAssignedSlotArray(slotInfo);

// hostInfos
List<Object> hostInfos = (List<Object>) slotInfo.get(2);
if (hostInfos.isEmpty()) {
continue;
for (JedisPool jp : getShuffledNodesPool()) {
try {
jedis = jp.getResource();
discoverClusterSlots(jedis);
return;
} catch (JedisConnectionException e) {
// try next nodes
} finally {
if (jedis != null) {
jedis.close();
}
}

// at this time, we just use master, discard slave information
HostAndPort targetNode = generateHostAndPort(hostInfos);

setNodeIfNotExist(targetNode);
assignSlotsToNode(slotNums, targetNode);
}
} finally {
rediscovering = false;
Expand All @@ -112,20 +112,47 @@ public void discoverClusterSlots(Jedis jedis) {
}
}

private void discoverClusterSlots(Jedis jedis) {
List<Object> slots = jedis.clusterSlots();
this.slots.clear();

for (Object slotInfoObj : slots) {
List<Object> slotInfo = (List<Object>) slotInfoObj;

if (slotInfo.size() <= MASTER_NODE_INDEX) {
continue;
}

List<Integer> slotNums = getAssignedSlotArray(slotInfo);

// hostInfos
List<Object> hostInfos = (List<Object>) slotInfo.get(MASTER_NODE_INDEX);
if (hostInfos.isEmpty()) {
continue;
}

// at this time, we just use master, discard slave information
HostAndPort targetNode = generateHostAndPort(hostInfos);
assignSlotsToNode(slotNums, targetNode);
}
}

private HostAndPort generateHostAndPort(List<Object> hostInfos) {
return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)),
((Long) hostInfos.get(1)).intValue());
}

public void setNodeIfNotExist(HostAndPort node) {
public JedisPool setupNodeIfNotExist(HostAndPort node) {
w.lock();
try {
String nodeKey = getNodeKey(node);
if (nodes.containsKey(nodeKey)) return;
JedisPool existingPool = nodes.get(nodeKey);
if (existingPool != null) return existingPool;

JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(),
connectionTimeout, soTimeout, null, 0, null);
nodes.put(nodeKey, nodePool);
return nodePool;
} finally {
w.unlock();
}
Expand All @@ -134,12 +161,7 @@ public void setNodeIfNotExist(HostAndPort node) {
public void assignSlotToNode(int slot, HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));

if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}
JedisPool targetPool = setupNodeIfNotExist(targetNode);
slots.put(slot, targetPool);
} finally {
w.unlock();
Expand All @@ -149,13 +171,7 @@ public void assignSlotToNode(int slot, HostAndPort targetNode) {
public void assignSlotsToNode(List<Integer> targetSlots, HostAndPort targetNode) {
w.lock();
try {
JedisPool targetPool = nodes.get(getNodeKey(targetNode));

if (targetPool == null) {
setNodeIfNotExist(targetNode);
targetPool = nodes.get(getNodeKey(targetNode));
}

JedisPool targetPool = setupNodeIfNotExist(targetNode);
for (Integer slot : targetSlots) {
slots.put(slot, targetPool);
}
Expand Down Expand Up @@ -191,6 +207,17 @@ public Map<String, JedisPool> getNodes() {
}
}

public List<JedisPool> getShuffledNodesPool() {
r.lock();
try {
List<JedisPool> pools = new ArrayList<JedisPool>(nodes.values());
Collections.shuffle(pools);
return pools;
} finally {
r.unlock();
}
}

/**
* Clear discovered nodes collections and gently release allocated resources
*/
Expand Down Expand Up @@ -233,5 +260,4 @@ private List<Integer> getAssignedSlotArray(List<Object> slotInfo) {
}
return slotNums;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Jedis getConnection() {
// ping-pong)
// or exception if all connections are invalid

List<JedisPool> pools = getShuffledNodesPool();
List<JedisPool> pools = cache.getShuffledNodesPool();

for (JedisPool pool : pools) {
Jedis jedis = null;
Expand Down Expand Up @@ -61,7 +61,14 @@ public Jedis getConnectionFromSlot(int slot) {
// assignment
return connectionPool.getResource();
} else {
return getConnection();
renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
connectionPool = cache.getSlotPool(slot);
if (connectionPool != null) {
return connectionPool.getResource();
} else {
//no choice, fallback to new connection to random node
return getConnection();
}
}
}
}

0 comments on commit 848fca2

Please sign in to comment.