Skip to content

Commit

Permalink
[Transform] Do not log warning for ABORTING transform
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed Feb 7, 2024
1 parent 9b584aa commit dc7b63a
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -444,11 +444,12 @@ public void triggered(TransformScheduler.Event event) {
return;
}

// ignore trigger if indexer is running or completely stopped
// ignore trigger if indexer is running, stopping, stopped or aborting
IndexerState indexerState = getIndexer().getState();
if (IndexerState.INDEXING.equals(indexerState)
|| IndexerState.STOPPING.equals(indexerState)
|| IndexerState.STOPPED.equals(indexerState)) {
|| IndexerState.STOPPED.equals(indexerState)
|| IndexerState.ABORTING.equals(indexerState)) {
logger.debug("[{}] indexer for transform has state [{}]. Ignoring trigger.", getTransformId(), indexerState);
return;
}
Expand Down Expand Up @@ -575,7 +576,12 @@ TransformTask setAuthState(AuthorizationState authState) {
}

void initializeIndexer(ClientTransformIndexerBuilder indexerBuilder) {
indexer.set(indexerBuilder.build(getThreadPool(), context));
initializeIndexer(indexerBuilder.build(getThreadPool(), context));
}

/** Visible for testing. */
void initializeIndexer(ClientTransformIndexer indexer) {
this.indexer.set(indexer);
}

ThreadPool getThreadPool() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.xpack.transform.transforms.scheduling.TransformScheduler;
import org.junit.After;
import org.junit.Before;
import org.mockito.verification.VerificationMode;

import java.time.Clock;
import java.util.Collections;
Expand All @@ -61,16 +62,20 @@
import static java.util.stream.Collectors.toList;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

public class TransformTaskTests extends ESTestCase {
Expand Down Expand Up @@ -523,6 +528,85 @@ public void testDeriveBasicCheckpointingInfoWithIndexer() {
assertThat(checkpointingInfo.getNext().getCheckpointProgress(), sameInstance(progress));
}

public void testInitializeIndexerWhenAlreadyInitialized() {
var transformTask = createTransformTask(
TransformConfigTests.randomTransformConfigWithoutHeaders(),
MockTransformAuditor.createMockAuditor()
);
transformTask.initializeIndexer(mock(ClientTransformIndexerBuilder.class));
IllegalStateException e = expectThrows(
IllegalStateException.class,
() -> transformTask.initializeIndexer(mock(ClientTransformIndexerBuilder.class))
);
assertThat(e.getMessage(), containsString("The object cannot be set twice!"));
}

public void testTriggeredIsNoOpWhenTransformIdMismatch() {
var transformId = randomAlphaOfLengthBetween(1, 10);
var transformTask = createTransformTask(
TransformConfigTests.randomTransformConfigWithoutHeaders(transformId),
MockTransformAuditor.createMockAuditor()
);
transformTask.triggered(new TransformScheduler.Event("not-" + transformId, randomNonNegativeLong(), randomNonNegativeLong()));
}

public void testTriggeredIsNoOpWhenIndexerIsUninitialized() {
var transformId = randomAlphaOfLengthBetween(1, 10);
var transformTask = createTransformTask(
TransformConfigTests.randomTransformConfigWithoutHeaders(transformId),
MockTransformAuditor.createMockAuditor()
);
transformTask.triggered(new TransformScheduler.Event(transformId, randomNonNegativeLong(), randomNonNegativeLong()));
}

public void testTriggeredIsNoOpWhenStateIsWrong() {
testTriggered(TransformTaskState.STOPPED, IndexerState.INDEXING, never());
testTriggered(TransformTaskState.STOPPED, IndexerState.STOPPING, never());
testTriggered(TransformTaskState.STOPPED, IndexerState.STOPPED, never());
testTriggered(TransformTaskState.STOPPED, IndexerState.ABORTING, never());
testTriggered(TransformTaskState.STOPPED, IndexerState.STARTED, never());
testTriggered(TransformTaskState.FAILED, IndexerState.INDEXING, never());
testTriggered(TransformTaskState.FAILED, IndexerState.STOPPING, never());
testTriggered(TransformTaskState.FAILED, IndexerState.STOPPED, never());
testTriggered(TransformTaskState.FAILED, IndexerState.ABORTING, never());
testTriggered(TransformTaskState.FAILED, IndexerState.STARTED, never());
testTriggered(TransformTaskState.STARTED, IndexerState.INDEXING, never());
testTriggered(TransformTaskState.STARTED, IndexerState.STOPPING, never());
testTriggered(TransformTaskState.STARTED, IndexerState.STOPPED, never());
testTriggered(TransformTaskState.STARTED, IndexerState.ABORTING, never());
}

public void testTriggeredActuallyTriggersIndexer() {
testTriggered(TransformTaskState.STARTED, IndexerState.STARTED, times(1));
}

private void testTriggered(TransformTaskState taskState, IndexerState indexerState, VerificationMode indexerVerificationMode) {
String transformId = randomAlphaOfLengthBetween(1, 10);
TransformState transformState = new TransformState(taskState, indexerState, null, 0L, "because", null, null, false, null);
ThreadPool threadPool = mock(ThreadPool.class);
TransformAuditor auditor = mock(TransformAuditor.class);
TransformTask transformTask = new TransformTask(
42,
"some_type",
"some_action",
TaskId.EMPTY_TASK_ID,
createTransformTaskParams(transformId),
transformState,
new TransformScheduler(mock(Clock.class), threadPool, Settings.EMPTY, TimeValue.ZERO),
auditor,
threadPool,
Collections.emptyMap()
);

ClientTransformIndexer indexer = mock(ClientTransformIndexer.class);
when(indexer.getState()).thenReturn(indexerState);
transformTask.initializeIndexer(indexer);
transformTask.triggered(new TransformScheduler.Event(transformId, randomNonNegativeLong(), randomNonNegativeLong()));

verify(indexer, indexerVerificationMode).maybeTriggerAsyncJob(anyLong());
verifyNoInteractions(auditor, threadPool);
}

private static TransformTaskParams createTransformTaskParams(String transformId) {
return new TransformTaskParams(transformId, TransformConfigVersion.CURRENT, TimeValue.timeValueSeconds(10), false);
}
Expand Down

0 comments on commit dc7b63a

Please sign in to comment.