diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 4829148322b31..5e9bc3926d7c2 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -117,7 +117,7 @@ * * @opensearch.internal */ -public abstract class Engine implements Closeable { +public abstract class Engine implements LifecycleAware, Closeable { public static final String SYNC_COMMIT_ID = "sync_id"; // TODO: remove sync_id in 3.0 public static final String HISTORY_UUID_KEY = "history_uuid"; @@ -847,7 +847,7 @@ protected final void ensureOpen(Exception suppressed) { } } - protected final void ensureOpen() { + public final void ensureOpen() { ensureOpen(null); } diff --git a/server/src/main/java/org/opensearch/index/engine/LifecycleAware.java b/server/src/main/java/org/opensearch/index/engine/LifecycleAware.java new file mode 100644 index 0000000000000..06cfb8e7e73a5 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/engine/LifecycleAware.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.engine; + +/** + * Interface that is aware of a component lifecycle. + */ +public interface LifecycleAware { + + /** + * Checks to ensure if the component is an open state + */ + void ensureOpen(); +} diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 7c4ddf389b803..22f72cc3d9acd 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.engine.LifecycleAware; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.listener.TranslogEventListener; @@ -33,7 +34,7 @@ public class InternalTranslogManager implements TranslogManager { private final ReleasableLock readLock; - private final Runnable ensureEngineOpen; + private final LifecycleAware engineLifeCycleAware; private final ShardId shardId; private final Translog translog; private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false); @@ -50,11 +51,11 @@ public InternalTranslogManager( Supplier localCheckpointTrackerSupplier, String translogUUID, TranslogEventListener translogEventListener, - Runnable ensureEngineOpen + LifecycleAware engineLifeCycleAware ) throws IOException { this.shardId = shardId; this.readLock = readLock; - this.ensureEngineOpen = ensureEngineOpen; + this.engineLifeCycleAware = engineLifeCycleAware; this.translogEventListener = translogEventListener; Translog translog = openTranslog(translogConfig, primaryTermSupplier, translogDeletionPolicy, globalCheckpointSupplier, seqNo -> { final LocalCheckpointTracker tracker = localCheckpointTrackerSupplier.get(); @@ -76,7 +77,7 @@ public InternalTranslogManager( @Override public void rollTranslogGeneration() throws TranslogException { try (ReleasableLock ignored = readLock.acquire()) { - ensureEngineOpen.run(); + engineLifeCycleAware.ensureOpen(); translog.rollGeneration(); translog.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { @@ -105,7 +106,7 @@ public int recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, lo int opsRecovered = 0; translogEventListener.onBeginTranslogRecovery(); try (ReleasableLock ignored = readLock.acquire()) { - ensureEngineOpen.run(); + engineLifeCycleAware.ensureOpen(); if (pendingTranslogRecovery.get() == false) { throw new IllegalStateException("Engine has already been recovered"); } @@ -200,7 +201,7 @@ public Translog.Location getTranslogLastWriteLocation() { @Override public void trimUnreferencedTranslogFiles() throws TranslogException { try (ReleasableLock ignored = readLock.acquire()) { - ensureEngineOpen.run(); + engineLifeCycleAware.ensureOpen(); translog.trimUnreferencedReaders(); } catch (AlreadyClosedException e) { translogEventListener.onTragicFailure(e); @@ -233,7 +234,7 @@ public boolean shouldRollTranslogGeneration() { @Override public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws TranslogException { try (ReleasableLock ignored = readLock.acquire()) { - ensureEngineOpen.run(); + engineLifeCycleAware.ensureOpen(); translog.trimOperations(belowTerm, aboveSeqNo); } catch (AlreadyClosedException e) { translogEventListener.onTragicFailure(e); @@ -257,7 +258,7 @@ public void trimOperationsFromTranslog(long belowTerm, long aboveSeqNo) throws T @Override public int restoreLocalHistoryFromTranslog(long processedCheckpoint, TranslogRecoveryRunner translogRecoveryRunner) throws IOException { try (ReleasableLock ignored = readLock.acquire()) { - ensureEngineOpen.run(); + engineLifeCycleAware.ensureOpen(); try (Translog.Snapshot snapshot = getTranslog(true).newSnapshot(processedCheckpoint + 1, Long.MAX_VALUE)) { return translogRecoveryRunner.run(snapshot); } @@ -314,7 +315,7 @@ private Translog openTranslog( @Override public Translog getTranslog(boolean ensureOpen) { if (ensureOpen) { - this.ensureEngineOpen.run(); + this.engineLifeCycleAware.ensureOpen(); } return translog; } diff --git a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java index 366f70ac50ca2..09f5f38a9f6a9 100644 --- a/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/WriteOnlyTranslogManager.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import org.opensearch.common.util.concurrent.ReleasableLock; +import org.opensearch.index.engine.LifecycleAware; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.listener.TranslogEventListener; @@ -34,7 +35,7 @@ public WriteOnlyTranslogManager( Supplier localCheckpointTrackerSupplier, String translogUUID, TranslogEventListener translogEventListener, - Runnable ensureEngineOpen + LifecycleAware engineLifecycleAware ) throws IOException { super( translogConfig, @@ -46,7 +47,7 @@ public WriteOnlyTranslogManager( localCheckpointTrackerSupplier, translogUUID, translogEventListener, - ensureEngineOpen + engineLifecycleAware ); } diff --git a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java index 4ca7069140811..4f9ca9d282caa 100644 --- a/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java +++ b/server/src/main/java/org/opensearch/index/translog/listener/CompositeTranslogEventListener.java @@ -44,6 +44,7 @@ public void onAfterTranslogSync() { listener.onAfterTranslogSync(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogSync listener"), ex); + throw ex; } } } @@ -55,6 +56,7 @@ public void onAfterTranslogRecovery() { listener.onAfterTranslogRecovery(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onTranslogRecovery listener"), ex); + throw ex; } } } @@ -66,6 +68,7 @@ public void onBeginTranslogRecovery() { listener.onBeginTranslogRecovery(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onBeginTranslogRecovery listener"), ex); + throw ex; } } } @@ -77,6 +80,7 @@ public void onFailure(String reason, Exception e) { listener.onFailure(reason, e); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onFailure listener"), ex); + throw ex; } } } @@ -88,6 +92,7 @@ public void onTragicFailure(AlreadyClosedException e) { listener.onTragicFailure(e); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("failed to invoke onTragicFailure listener"), ex); + throw ex; } } } diff --git a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java index 713dabcbd16b0..3d78667a1ad26 100644 --- a/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/listener/TranslogListenerTests.java @@ -12,10 +12,7 @@ import org.opensearch.test.OpenSearchTestCase; import java.lang.reflect.Proxy; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; public class TranslogListenerTests extends OpenSearchTestCase { @@ -53,19 +50,8 @@ public void onTragicFailure(AlreadyClosedException ex) { onTragicFailureInvoked.incrementAndGet(); } }; - TranslogEventListener throwingListener = (TranslogEventListener) Proxy.newProxyInstance( - TranslogEventListener.class.getClassLoader(), - new Class[] { TranslogEventListener.class }, - (a, b, c) -> { throw new RuntimeException(); } - ); final List translogEventListeners = new ArrayList<>(Arrays.asList(listener, listener)); - if (randomBoolean()) { - translogEventListeners.add(throwingListener); - if (randomBoolean()) { - translogEventListeners.add(throwingListener); - } - } Collections.shuffle(translogEventListeners, random()); TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners); compositeListener.onAfterTranslogRecovery(); @@ -80,4 +66,60 @@ public void onTragicFailure(AlreadyClosedException ex) { assertEquals(2, onFailureInvoked.get()); assertEquals(2, onTragicFailureInvoked.get()); } + + public void testCompositeTranslogEventListenerOnExceptions() { + AtomicInteger onTranslogSyncInvoked = new AtomicInteger(); + AtomicInteger onTranslogRecoveryInvoked = new AtomicInteger(); + AtomicInteger onBeginTranslogRecoveryInvoked = new AtomicInteger(); + AtomicInteger onFailureInvoked = new AtomicInteger(); + AtomicInteger onTragicFailureInvoked = new AtomicInteger(); + + TranslogEventListener listener = new TranslogEventListener() { + @Override + public void onAfterTranslogSync() { + onTranslogSyncInvoked.incrementAndGet(); + } + + @Override + public void onAfterTranslogRecovery() { + onTranslogRecoveryInvoked.incrementAndGet(); + } + + @Override + public void onBeginTranslogRecovery() { + onBeginTranslogRecoveryInvoked.incrementAndGet(); + } + + @Override + public void onFailure(String reason, Exception ex) { + onFailureInvoked.incrementAndGet(); + } + + @Override + public void onTragicFailure(AlreadyClosedException ex) { + onTragicFailureInvoked.incrementAndGet(); + } + }; + + TranslogEventListener throwingListener = (TranslogEventListener) Proxy.newProxyInstance( + TranslogEventListener.class.getClassLoader(), + new Class[] { TranslogEventListener.class }, + (a, b, c) -> { throw new RuntimeException(); } + ); + + final List translogEventListeners = new LinkedList<>(Arrays.asList(listener, throwingListener, listener)); + TranslogEventListener compositeListener = new CompositeTranslogEventListener(translogEventListeners); + expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogRecovery()); + expectThrows(RuntimeException.class, () -> compositeListener.onAfterTranslogSync()); + expectThrows(RuntimeException.class, () -> compositeListener.onBeginTranslogRecovery()); + expectThrows(RuntimeException.class, () -> compositeListener.onFailure("reason", new RuntimeException("reason"))); + expectThrows(RuntimeException.class, () -> compositeListener.onTragicFailure(new AlreadyClosedException("reason"))); + + assertEquals(1, onBeginTranslogRecoveryInvoked.get()); + assertEquals(1, onTranslogRecoveryInvoked.get()); + assertEquals(1, onTranslogSyncInvoked.get()); + assertEquals(1, onFailureInvoked.get()); + assertEquals(1, onTragicFailureInvoked.get()); + + } }