-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure to release translog snapshot in primary-replica resync #32045
Changes from 1 commit
2cd685a
d3135c6
753e836
2bcba24
b866fc1
1dbdaac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,7 @@ | |
import org.elasticsearch.common.unit.ByteSizeValue; | ||
import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.core.internal.io.IOUtils; | ||
import org.elasticsearch.index.seqno.SequenceNumbers; | ||
import org.elasticsearch.index.translog.Translog; | ||
import org.elasticsearch.tasks.Task; | ||
|
@@ -81,47 +82,27 @@ void setChunkSize(ByteSizeValue chunkSize) { // only settable for tests | |
|
||
public void resync(final IndexShard indexShard, final ActionListener<ResyncTask> listener) { | ||
ActionListener<ResyncTask> resyncListener = null; | ||
Translog.Snapshot snapshot = null; | ||
try { | ||
final long startingSeqNo = indexShard.getGlobalCheckpoint() + 1; | ||
Translog.Snapshot snapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); | ||
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); | ||
resyncListener = new ActionListener<ResyncTask>() { | ||
@Override | ||
public void onResponse(final ResyncTask resyncTask) { | ||
try { | ||
snapshot.close(); | ||
listener.onResponse(resyncTask); | ||
} catch (final Exception e) { | ||
onFailure(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(final Exception e) { | ||
try { | ||
snapshot.close(); | ||
} catch (final Exception inner) { | ||
e.addSuppressed(inner); | ||
} finally { | ||
listener.onFailure(e); | ||
} | ||
} | ||
}; | ||
ShardId shardId = indexShard.shardId(); | ||
|
||
final ShardId shardId = indexShard.shardId(); | ||
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. | ||
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible | ||
// Also fail the resync early if the shard is shutting down | ||
Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { | ||
|
||
snapshot = new Translog.Snapshot() { | ||
final AtomicBoolean closed = new AtomicBoolean(); // closed once | ||
final Translog.Snapshot originalSnapshot = indexShard.newTranslogSnapshotFromMinSeqNo(startingSeqNo); | ||
@Override | ||
public synchronized void close() throws IOException { | ||
snapshot.close(); | ||
if (closed.compareAndSet(false, true)) { | ||
originalSnapshot.close(); | ||
} | ||
} | ||
|
||
@Override | ||
public synchronized int totalOperations() { | ||
return snapshot.totalOperations(); | ||
return originalSnapshot.totalOperations(); | ||
} | ||
|
||
@Override | ||
|
@@ -132,12 +113,41 @@ public synchronized Translog.Operation next() throws IOException { | |
} else { | ||
assert state == IndexShardState.STARTED : "resync should only happen on a started shard, but state was: " + state; | ||
} | ||
return snapshot.next(); | ||
return originalSnapshot.next(); | ||
} | ||
}; | ||
Translog.Snapshot wrappedSnapshot = snapshot; | ||
resyncListener = new ActionListener<ResyncTask>() { | ||
@Override | ||
public void onResponse(final ResyncTask resyncTask) { | ||
try { | ||
wrappedSnapshot.close(); | ||
listener.onResponse(resyncTask); | ||
} catch (final Exception e) { | ||
onFailure(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(final Exception e) { | ||
try { | ||
wrappedSnapshot.close(); | ||
} catch (final Exception inner) { | ||
e.addSuppressed(inner); | ||
} finally { | ||
listener.onFailure(e); | ||
} | ||
} | ||
}; | ||
|
||
resync(shardId, indexShard.routingEntry().allocationId().getId(), indexShard.getPrimaryTerm(), wrappedSnapshot, | ||
startingSeqNo, maxSeqNo, resyncListener); | ||
} catch (Exception e) { | ||
try { | ||
IOUtils.close(snapshot); | ||
} catch (IOException inner) { | ||
e.addSuppressed(inner); | ||
} | ||
if (resyncListener != null) { | ||
resyncListener.onFailure(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not needed anymore now (i.e. the |
||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -123,12 +123,10 @@ public void testSyncerSendsOffCorrectDocuments() throws Exception { | |
public void testSyncerOnClosingShard() throws Exception { | ||
IndexShard shard = newStartedShard(true); | ||
AtomicBoolean syncActionCalled = new AtomicBoolean(); | ||
CountDownLatch syncCalledLatch = new CountDownLatch(1); | ||
PrimaryReplicaSyncer.SyncAction syncAction = | ||
(request, parentTask, allocationId, primaryTerm, listener) -> { | ||
logger.info("Sending off {} operations", request.getOperations().length); | ||
syncActionCalled.set(true); | ||
syncCalledLatch.countDown(); | ||
threadPool.generic().execute(() -> listener.onResponse(new ResyncReplicationResponse())); | ||
}; | ||
PrimaryReplicaSyncer syncer = new PrimaryReplicaSyncer(Settings.EMPTY, | ||
|
@@ -147,13 +145,27 @@ public void testSyncerOnClosingShard() throws Exception { | |
shard.updateShardState(shard.routingEntry(), shard.getPrimaryTerm(), null, 1000L, Collections.singleton(allocationId), | ||
new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet()); | ||
|
||
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<>(); | ||
threadPool.generic().execute(() -> { | ||
try { | ||
syncer.resync(shard, fut); | ||
} catch (AlreadyClosedException ace) { | ||
fut.onFailure(ace); | ||
CountDownLatch syncCalledLatch = new CountDownLatch(1); | ||
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<PrimaryReplicaSyncer.ResyncTask>() { | ||
@Override | ||
public void onFailure(Exception e) { | ||
try { | ||
super.onFailure(e); | ||
}finally { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. space missing |
||
syncCalledLatch.countDown(); | ||
} | ||
} | ||
@Override | ||
public void onResponse(PrimaryReplicaSyncer.ResyncTask result) { | ||
try { | ||
super.onResponse(result); | ||
}finally { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. space missing |
||
syncCalledLatch.countDown(); | ||
} | ||
} | ||
}; | ||
threadPool.generic().execute(() -> { | ||
syncer.resync(shard, fut); | ||
}); | ||
if (randomBoolean()) { | ||
syncCalledLatch.await(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not protect against double closing in the snapshot it self? this is a common problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this boolean. It's ok to close a translog snapshot multiple times.