Skip to content

Commit

Permalink
[Transform] Do not use PIT in the presence of remote indices in source (
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek authored Sep 22, 2023
1 parent 9091b39 commit 700a2a4
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 2 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/99803.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 99803
summary: Do not use PIT in the presence of remote indices in source
area: Transform
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener<SearchResponse>

injectPointInTimeIfNeeded(
buildSearchRequest(),
ActionListener.wrap(pitSearchRequest -> doSearch(pitSearchRequest, nextPhase), nextPhase::onFailure)
ActionListener.wrap(searchRequest -> doSearch(searchRequest, nextPhase), nextPhase::onFailure)
);
}

Expand Down Expand Up @@ -435,7 +435,8 @@ private void injectPointInTimeIfNeeded(
ActionListener<Tuple<String, SearchRequest>> listener
) {
SearchRequest searchRequest = namedSearchRequest.v2();
if (disablePit || searchRequest.indices().length == 0) {
// We explicitly disable PIT in the presence of remote clusters in the source due to huge PIT handles causing performance problems.
if (disablePit || searchRequest.indices().length == 0 || transformConfig.getSource().requiresRemoteCluster()) {
listener.onResponse(namedSearchRequest);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
import org.elasticsearch.xpack.core.transform.transforms.SourceConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests;
Expand Down Expand Up @@ -290,6 +291,7 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException {
}

public void testDisablePit() throws InterruptedException {
// TransformConfigTests.randomTransformConfig never produces remote indices in the source, hence we are safe here. */
TransformConfig config = TransformConfigTests.randomTransformConfig();
boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit();

Expand Down Expand Up @@ -349,6 +351,65 @@ public void testDisablePit() throws InterruptedException {
}
}

public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedException {
TransformConfig config = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig())
// Remote index is configured within source
.setSource(new SourceConfig("remote-cluster:remote-index"))
.build();
boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit();

try (PitMockClient client = new PitMockClient(getTestName(), true)) {
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
mock(ThreadPool.class),
new TransformServices(
mock(IndexBasedTransformConfigManager.class),
mock(TransformCheckpointService.class),
mock(TransformAuditor.class),
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY)
),
mock(CheckpointProvider.class),
new AtomicReference<>(IndexerState.STOPPED),
null,
client,
mock(TransformIndexerStats.class),
config,
null,
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
0L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
),
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
2L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
),
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
mock(TransformContext.class),
false
);

// Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> assertNull(response.pointInTimeId())
);

// reverse the setting
indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build());

// Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> assertNull(response.pointInTimeId())
);
}
}

public void testHandlePitIndexNotFound() throws InterruptedException {
// simulate a deleted index due to ILM
try (PitMockClient client = new PitMockClient(getTestName(), true)) {
Expand Down

0 comments on commit 700a2a4

Please sign in to comment.