diff --git a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java index 82bdd2c4d7..e0b855f8e2 100644 --- a/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java +++ b/src/main/java/redis/clients/jedis/MultiNodePipelineBase.java @@ -1,6 +1,7 @@ package redis.clients.jedis; import java.io.Closeable; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -8,6 +9,8 @@ import java.util.Queue; import java.util.Set; import org.json.JSONArray; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import redis.clients.jedis.args.*; import redis.clients.jedis.bloom.BFInsertParams; @@ -17,6 +20,7 @@ import redis.clients.jedis.commands.PipelineBinaryCommands; import redis.clients.jedis.commands.PipelineCommands; import redis.clients.jedis.commands.RedisModulePipelineCommands; +import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.graph.GraphCommandObjects; import redis.clients.jedis.graph.ResultSet; import redis.clients.jedis.json.JsonSetParams; @@ -30,11 +34,14 @@ import redis.clients.jedis.search.aggr.AggregationResult; import redis.clients.jedis.search.schemafields.SchemaField; import redis.clients.jedis.timeseries.*; +import redis.clients.jedis.util.IOUtils; import redis.clients.jedis.util.KeyValue; public abstract class MultiNodePipelineBase implements PipelineCommands, PipelineBinaryCommands, RedisModulePipelineCommands, Closeable { + private final Logger log = LoggerFactory.getLogger(getClass()); + private final Map>> pipelinedResponses; private final Map connections; private volatile boolean synced; @@ -83,9 +90,12 @@ protected final Response appendCommand(CommandObject commandObject) { @Override public void close() { - sync(); - for (Connection connection : connections.values()) { - connection.close(); + try { + sync(); + } finally { + for (Connection connection : connections.values()) { + IOUtils.closeQuietly(connection); + } } } @@ -93,12 +103,25 @@ public final void sync() { if (synced) { return; } - for (Map.Entry>> entry : pipelinedResponses.entrySet()) { + + Iterator>>> pipelinedResponsesIterator + = pipelinedResponses.entrySet().iterator(); + while (pipelinedResponsesIterator.hasNext()) { + Map.Entry>> entry = pipelinedResponsesIterator.next(); HostAndPort nodeKey = entry.getKey(); Queue> queue = entry.getValue(); - List unformatted = connections.get(nodeKey).getMany(queue.size()); - for (Object o : unformatted) { - queue.poll().set(o); + Connection connection = connections.get(nodeKey); + try { + List unformatted = connection.getMany(queue.size()); + for (Object o : unformatted) { + queue.poll().set(o); + } + } catch (JedisConnectionException jce) { + log.error("Error with connection to " + nodeKey, jce); + // cleanup the connection + pipelinedResponsesIterator.remove(); + connections.remove(nodeKey); + IOUtils.closeQuietly(connection); } } synced = true;