Skip to content

Commit

Permalink
[Transform] Allow using PIT in the presence of remote clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
przemekwitek committed Feb 6, 2024
1 parent 4cf8942 commit 57f1264
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,7 @@ private void injectPointInTimeIfNeeded(
ActionListener<Tuple<String, SearchRequest>> listener
) {
SearchRequest searchRequest = namedSearchRequest.v2();
// We explicitly disable PIT in the presence of remote clusters in the source due to huge PIT handles causing performance problems.
if (disablePit || searchRequest.indices().length == 0 || transformConfig.getSource().requiresRemoteCluster()) {
if (disablePit || searchRequest.indices().length == 0) {
listener.onResponse(namedSearchRequest);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,14 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException {
}

public void testDisablePit() throws InterruptedException {
// TransformConfigTests.randomTransformConfig never produces remote indices in the source, hence we are safe here. */
TransformConfig config = TransformConfigTests.randomTransformConfig();
TransformConfig.Builder configBuilder = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig());
if (randomBoolean()) {
// TransformConfigTests.randomTransformConfig never produces remote indices in the source.
// We need to explicitly set the remote index here for coverage.
configBuilder.setSource(new SourceConfig("remote-cluster:remote-index"));
}
TransformConfig config = configBuilder.build();

boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit();

try (var threadPool = createThreadPool()) {
Expand Down Expand Up @@ -354,66 +360,6 @@ public void testDisablePit() throws InterruptedException {
}
}

public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedException {
TransformConfig config = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig())
// Remote index is configured within source
.setSource(new SourceConfig("remote-cluster:remote-index"))
.build();
boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit();

try (var threadPool = createThreadPool()) {
final var client = new PitMockClient(threadPool, true);
MockClientTransformIndexer indexer = new MockClientTransformIndexer(
mock(ThreadPool.class),
new TransformServices(
mock(IndexBasedTransformConfigManager.class),
mock(TransformCheckpointService.class),
mock(TransformAuditor.class),
new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY, TimeValue.ZERO)
),
mock(CheckpointProvider.class),
new AtomicReference<>(IndexerState.STOPPED),
null,
new ParentTaskAssigningClient(client, new TaskId("dummy-node:123456")),
mock(TransformIndexerStats.class),
config,
null,
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
0L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
),
new TransformCheckpoint(
"transform",
Instant.now().toEpochMilli(),
2L,
Collections.emptyMap(),
Instant.now().toEpochMilli()
),
new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME),
mock(TransformContext.class),
false
);

// Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> assertNull(response.pointInTimeId())
);

// reverse the setting
indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build());

// Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings
this.<SearchResponse>assertAsync(
listener -> indexer.doNextSearch(0, listener),
response -> assertNull(response.pointInTimeId())
);
}
}

public void testHandlePitIndexNotFound() throws InterruptedException {
// simulate a deleted index due to ILM
try (var threadPool = createThreadPool()) {
Expand Down

0 comments on commit 57f1264

Please sign in to comment.