From 848fca2dbab6e104b5b0a576a3e65b7317ae70e2 Mon Sep 17 00:00:00 2001 From: Dmitry Spikhalskiy Date: Mon, 11 Jul 2016 12:51:20 +0100 Subject: [PATCH] Issue #1252: Fix creating a lot of new Jedis instances on unstable cluster, fix slots clearing without filling (#1253) * Issue #1252: Fix creating lot of new Jedis instances on unstable cluster, fix slots clearing without filling * Issue #1252: Acquire one long lock for trying all nodes when rediscover cluster Conflicts: src/main/java/redis/clients/jedis/JedisClusterInfoCache.java --- .../jedis/JedisClusterConnectionHandler.java | 34 +----- .../clients/jedis/JedisClusterInfoCache.java | 110 +++++++++++------- .../JedisSlotBasedConnectionHandler.java | 11 +- 3 files changed, 80 insertions(+), 75 deletions(-) diff --git a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java index 44689833cf..15c47c467e 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisClusterConnectionHandler.java @@ -1,9 +1,6 @@ package redis.clients.jedis; import java.io.Closeable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.Set; @@ -25,8 +22,7 @@ public JedisClusterConnectionHandler(Set nodes, abstract Jedis getConnectionFromSlot(int slot); public Jedis getConnectionFromNode(HostAndPort node) { - cache.setNodeIfNotExist(node); - return cache.getNode(JedisClusterInfoCache.getNodeKey(node)).getResource(); + return cache.setupNodeIfNotExist(node).getResource(); } public Map getNodes() { @@ -50,39 +46,15 @@ private void initializeSlotsCache(Set startNodes, GenericObjectPool } public void renewSlotCache() { - for (JedisPool jp : getShuffledNodesPool()) { - Jedis jedis = null; - try { - jedis = jp.getResource(); - cache.discoverClusterSlots(jedis); - break; - } catch (JedisConnectionException e) { - // try next nodes - } finally { - if (jedis != null) { - jedis.close(); - } - } - } + cache.renewClusterSlots(null); } public void renewSlotCache(Jedis jedis) { - try { - cache.discoverClusterSlots(jedis); - } catch (JedisConnectionException e) { - renewSlotCache(); - } + cache.renewClusterSlots(jedis); } @Override public void close() { cache.reset(); } - - protected List getShuffledNodesPool() { - List pools = new ArrayList(); - pools.addAll(cache.getNodes().values()); - Collections.shuffle(pools); - return pools; - } } diff --git a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java index b84c916b8e..759317a162 100644 --- a/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java +++ b/src/main/java/redis/clients/jedis/JedisClusterInfoCache.java @@ -3,6 +3,7 @@ import redis.clients.util.SafeEncoder; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -11,6 +12,8 @@ import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; public class JedisClusterInfoCache { private final Map nodes = new HashMap(); private final Map slots = new HashMap(); @@ -62,7 +65,7 @@ public void discoverClusterNodesAndSlots(Jedis jedis) { } HostAndPort targetNode = generateHostAndPort(hostInfos); - setNodeIfNotExist(targetNode); + setupNodeIfNotExist(targetNode); if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); } @@ -73,37 +76,34 @@ public void discoverClusterNodesAndSlots(Jedis jedis) { } } - public void discoverClusterSlots(Jedis jedis) { + public void renewClusterSlots(Jedis jedis) { //If rediscovering is already in process - no need to start one more same rediscovering, just return if (!rediscovering) { - w.lock(); - rediscovering = true; - try { - this.slots.clear(); - - List slots = jedis.clusterSlots(); - - for (Object slotInfoObj : slots) { - List slotInfo = (List) slotInfoObj; - - if (slotInfo.size() <= 2) { - continue; + w.lock(); + rediscovering = true; + + if (jedis != null) { + try { + discoverClusterSlots(jedis); + return; + } catch (JedisException e) { + //try nodes from all pools } + } - List slotNums = getAssignedSlotArray(slotInfo); - - // hostInfos - List hostInfos = (List) slotInfo.get(2); - if (hostInfos.isEmpty()) { - continue; + for (JedisPool jp : getShuffledNodesPool()) { + try { + jedis = jp.getResource(); + discoverClusterSlots(jedis); + return; + } catch (JedisConnectionException e) { + // try next nodes + } finally { + if (jedis != null) { + jedis.close(); + } } - - // at this time, we just use master, discard slave information - HostAndPort targetNode = generateHostAndPort(hostInfos); - - setNodeIfNotExist(targetNode); - assignSlotsToNode(slotNums, targetNode); } } finally { rediscovering = false; @@ -112,20 +112,47 @@ public void discoverClusterSlots(Jedis jedis) { } } + private void discoverClusterSlots(Jedis jedis) { + List slots = jedis.clusterSlots(); + this.slots.clear(); + + for (Object slotInfoObj : slots) { + List slotInfo = (List) slotInfoObj; + + if (slotInfo.size() <= MASTER_NODE_INDEX) { + continue; + } + + List slotNums = getAssignedSlotArray(slotInfo); + + // hostInfos + List hostInfos = (List) slotInfo.get(MASTER_NODE_INDEX); + if (hostInfos.isEmpty()) { + continue; + } + + // at this time, we just use master, discard slave information + HostAndPort targetNode = generateHostAndPort(hostInfos); + assignSlotsToNode(slotNums, targetNode); + } + } + private HostAndPort generateHostAndPort(List hostInfos) { return new HostAndPort(SafeEncoder.encode((byte[]) hostInfos.get(0)), ((Long) hostInfos.get(1)).intValue()); } - public void setNodeIfNotExist(HostAndPort node) { + public JedisPool setupNodeIfNotExist(HostAndPort node) { w.lock(); try { String nodeKey = getNodeKey(node); - if (nodes.containsKey(nodeKey)) return; + JedisPool existingPool = nodes.get(nodeKey); + if (existingPool != null) return existingPool; JedisPool nodePool = new JedisPool(poolConfig, node.getHost(), node.getPort(), connectionTimeout, soTimeout, null, 0, null); nodes.put(nodeKey, nodePool); + return nodePool; } finally { w.unlock(); } @@ -134,12 +161,7 @@ public void setNodeIfNotExist(HostAndPort node) { public void assignSlotToNode(int slot, HostAndPort targetNode) { w.lock(); try { - JedisPool targetPool = nodes.get(getNodeKey(targetNode)); - - if (targetPool == null) { - setNodeIfNotExist(targetNode); - targetPool = nodes.get(getNodeKey(targetNode)); - } + JedisPool targetPool = setupNodeIfNotExist(targetNode); slots.put(slot, targetPool); } finally { w.unlock(); @@ -149,13 +171,7 @@ public void assignSlotToNode(int slot, HostAndPort targetNode) { public void assignSlotsToNode(List targetSlots, HostAndPort targetNode) { w.lock(); try { - JedisPool targetPool = nodes.get(getNodeKey(targetNode)); - - if (targetPool == null) { - setNodeIfNotExist(targetNode); - targetPool = nodes.get(getNodeKey(targetNode)); - } - + JedisPool targetPool = setupNodeIfNotExist(targetNode); for (Integer slot : targetSlots) { slots.put(slot, targetPool); } @@ -191,6 +207,17 @@ public Map getNodes() { } } + public List getShuffledNodesPool() { + r.lock(); + try { + List pools = new ArrayList(nodes.values()); + Collections.shuffle(pools); + return pools; + } finally { + r.unlock(); + } + } + /** * Clear discovered nodes collections and gently release allocated resources */ @@ -233,5 +260,4 @@ private List getAssignedSlotArray(List slotInfo) { } return slotNums; } - } diff --git a/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java b/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java index 87c93704c1..0f89fee8ae 100644 --- a/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java +++ b/src/main/java/redis/clients/jedis/JedisSlotBasedConnectionHandler.java @@ -27,7 +27,7 @@ public Jedis getConnection() { // ping-pong) // or exception if all connections are invalid - List pools = getShuffledNodesPool(); + List pools = cache.getShuffledNodesPool(); for (JedisPool pool : pools) { Jedis jedis = null; @@ -61,7 +61,14 @@ public Jedis getConnectionFromSlot(int slot) { // assignment return connectionPool.getResource(); } else { - return getConnection(); + renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state + connectionPool = cache.getSlotPool(slot); + if (connectionPool != null) { + return connectionPool.getResource(); + } else { + //no choice, fallback to new connection to random node + return getConnection(); + } } } }