Skip to content

Commit

Permalink
Test fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
Bukhtawar committed Jun 24, 2022
1 parent 20772d8 commit dc1d0c7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void onTragicFailure(AlreadyClosedException ex) {
readLock,
() -> getLocalCheckpointTracker(),
translogUUID,
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener)),
new CompositeTranslogEventListener(Arrays.asList(internalTranslogEventListener, translogEventListener), shardId),
this::ensureOpen
);
this.translogManager = translogManagerRef;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.translog.TranslogException;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -27,9 +29,11 @@
public final class CompositeTranslogEventListener implements TranslogEventListener {

private final List<TranslogEventListener> listeners;
private final ShardId shardId;
private final Logger logger = LogManager.getLogger(CompositeTranslogEventListener.class);

public CompositeTranslogEventListener(Collection<TranslogEventListener> listeners) {
public CompositeTranslogEventListener(Collection<TranslogEventListener> listeners, ShardId shardId) {
this.shardId = shardId;
for (TranslogEventListener listener : listeners) {
if (listener == null) {
throw new IllegalArgumentException("listeners must be non-null");
Expand All @@ -49,7 +53,7 @@ public void onAfterTranslogSync() {
exceptionList.add(ex);
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
maybeThrowTranslogExceptionAndSuppress(exceptionList);
}

@Override
Expand All @@ -63,7 +67,7 @@ public void onAfterTranslogRecovery() {
exceptionList.add(ex);
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
maybeThrowTranslogExceptionAndSuppress(exceptionList);
}

@Override
Expand All @@ -77,7 +81,7 @@ public void onBeginTranslogRecovery() {
exceptionList.add(ex);
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
maybeThrowTranslogExceptionAndSuppress(exceptionList);
}

@Override
Expand All @@ -91,7 +95,7 @@ public void onFailure(String reason, Exception e) {
exceptionList.add(ex);
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
maybeThrowTranslogExceptionAndSuppress(exceptionList);
}

@Override
Expand All @@ -105,6 +109,16 @@ public void onTragicFailure(AlreadyClosedException e) {
exceptionList.add(ex);
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptionList);
maybeThrowTranslogExceptionAndSuppress(exceptionList);
}

private <T extends Throwable> void maybeThrowTranslogExceptionAndSuppress(List<T> exceptions) {
T main = null;
for (T ex : exceptions) {
main = ExceptionsHelper.useOrSuppress(main, ex);
}
if (main != null) {
throw new TranslogException(shardId, "Error while executing translog event listener", main);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
package org.opensearch.index.translog.listener;

import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.test.OpenSearchTestCase;

import java.lang.reflect.Proxy;
Expand Down Expand Up @@ -53,7 +55,10 @@ public void onTragicFailure(AlreadyClosedException ex) {

final List<TranslogEventListener> translogEventListeners = new ArrayList<>(Arrays.asList(listener, listener));
Collections.shuffle(translogEventListeners, random());
TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners);
TranslogEventListener compositeListener = new CompositeTranslogEventListener(
translogEventListeners,
new ShardId(new Index("indexName", "indexUuid"), 123)
);
compositeListener.onAfterTranslogRecovery();
compositeListener.onAfterTranslogSync();
compositeListener.onBeginTranslogRecovery();
Expand Down Expand Up @@ -109,7 +114,10 @@ public void onTragicFailure(AlreadyClosedException ex) {

final List<TranslogEventListener> translogEventListeners = new LinkedList<>(Arrays.asList(listener, throwingListener, listener));
Collections.shuffle(translogEventListeners, random());
TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners);
TranslogEventListener compositeListener = new CompositeTranslogEventListener(
translogEventListeners,
new ShardId(new Index("indexName", "indexUuid"), 123)
);
expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogRecovery());
expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogSync());
expectThrows(RuntimeException.class, () -> compositeListener.onBeginTranslogRecovery());
Expand Down

0 comments on commit dc1d0c7

Please sign in to comment.