From ef81c1df57587d6e44dd750ad36926a5c8bce894 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 17 Jul 2018 09:41:34 -0400 Subject: [PATCH] Ensure to release translog snapshot in primary-replica resync (#32045) Previously we create a translog snapshot inside the resync method, and that snapshot will be closed by the resync listener. However, if the resync method throws an exception before the resync listener is initialized, the translog snapshot won't be released. Closes #32030 --- .../index/shard/PrimaryReplicaSyncer.java | 71 ++++++++++--------- .../shard/PrimaryReplicaSyncerTests.java | 28 +++++--- .../index/translog/TranslogTests.java | 18 +++++ 3 files changed, 75 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java index b39ebd51f2bc8..e66d78f2e1a05 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/elasticsearch/index/shard/PrimaryReplicaSyncer.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.tasks.Task; @@ -80,48 +81,25 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests } public void resync(final IndexShard indexShard, final ActionListener listener) { - ActionListener resyncListener = null; + Translog.Snapshot snapshot = null; try { final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; - Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); - resyncListener = new ActionListener() { - @Override - public void onResponse(final ResyncTask resyncTask) { - try { - snapshot.close(); - listener.onResponse(resyncTask); - } catch (final Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(final Exception e) { - try { - snapshot.close(); - } catch (final Exception inner) { - e.addSuppressed(inner); - } finally { - listener.onFailure(e); - } - } - }; - ShardId shardId = indexShard.shardId(); - + final ShardId shardId = indexShard.shardId(); // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down - Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { - + snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); + final Translog.Snapshot originalSnapshot = snapshot; + final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override public synchronized void close() throws IOException { - snapshot.close(); + originalSnapshot.close(); } @Override public synchronized int totalOperations() { - return snapshot.totalOperations(); + return originalSnapshot.totalOperations(); } @Override @@ -132,15 +110,40 @@ public synchronized Translog.Operation next() throws IOException { } else { assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state; } - return snapshot.next(); + return originalSnapshot.next(); } }; + final ActionListener resyncListener = new ActionListener() { + @Override + public void onResponse(final ResyncTask resyncTask) { + try { + wrappedSnapshot.close(); + listener.onResponse(resyncTask); + } catch (final Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(final Exception e) { + try { + wrappedSnapshot.close(); + } catch (final Exception inner) { + e.addSuppressed(inner); + } finally { + listener.onFailure(e); + } + } + }; + resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot, startingSeqNo, maxSeqNo, resyncListener); } catch (Exception e) { - if (resyncListener != null) { - resyncListener.onFailure(e); - } else { + try { + IOUtils.close(snapshot); + } catch (IOException inner) { + e.addSuppressed(inner); + } finally { listener.onFailure(e); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java index b290f4d45597b..4444f475329b3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/PrimaryReplicaSyncerTests.java @@ -123,12 +123,10 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { public void testSyncerOnClosingShard() throws Exception { IndexShard shard = newStartedShard(true); AtomicBoolean syncActionCalled = new AtomicBoolean(); - CountDownLatch syncCalledLatch = new CountDownLatch(1); PrimaryReplicaSyncer.SyncAction syncAction = (request, parentTask, allocationId, primaryTerm, listener) -> { logger.info("Sending off {} operations", request.getOperations().length); syncActionCalled.set(true); - syncCalledLatch.countDown(); threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); }; PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, @@ -147,13 +145,27 @@ public void testSyncerOnClosingShard() throws Exception { shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId), new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet()); - PlainActionFuture fut = new PlainActionFuture<>(); - threadPool.generic().execute(() -> { - try { - syncer.resync(shard, fut); - } catch (AlreadyClosedException ace) { - fut.onFailure(ace); + CountDownLatch syncCalledLatch = new CountDownLatch(1); + PlainActionFuture fut = new PlainActionFuture() { + @Override + public void onFailure(Exception e) { + try { + super.onFailure(e); + } finally { + syncCalledLatch.countDown(); + } + } + @Override + public void onResponse(PrimaryReplicaSyncer.ResyncTask result) { + try { + super.onResponse(result); + } finally { + syncCalledLatch.countDown(); + } } + }; + threadPool.generic().execute(() -> { + syncer.resync(shard, fut); }); if (randomBoolean()) { syncCalledLatch.await(); diff --git a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index cf6e753684676..dbbb38090bc3b 100644 --- a/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -2932,6 +2932,24 @@ public void testSnapshotDedupOperations() throws Exception { } } + /** Make sure that it's ok to close a translog snapshot multiple times */ + public void testCloseSnapshotTwice() throws Exception { + int numOps = between(0, 10); + for (int i = 0; i < numOps; i++) { + Translog.Index op = new Translog.Index("doc", randomAlphaOfLength(10), i, primaryTerm.get(), new byte[]{1}); + translog.add(op); + if (randomBoolean()) { + translog.rollGeneration(); + } + } + for (int i = 0; i < 5; i++) { + Translog.Snapshot snapshot = translog.newSnapshot(); + assertThat(snapshot, SnapshotMatchers.size(numOps)); + snapshot.close(); + snapshot.close(); + } + } + static class SortedSnapshot implements Translog.Snapshot { private final Translog.Snapshot snapshot; private List operations = null;