From 700a2a42b8410cbc1d628bbf43577f3ad67eac1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Witek?= Date: Fri, 22 Sep 2023 13:41:37 +0200 Subject: [PATCH] [Transform] Do not use PIT in the presence of remote indices in source (#99803) (#99805) --- docs/changelog/99803.yaml | 5 ++ .../transforms/ClientTransformIndexer.java | 5 +- .../ClientTransformIndexerTests.java | 61 +++++++++++++++++++ 3 files changed, 69 insertions(+), 2 deletions(-) create mode 100644 docs/changelog/99803.yaml diff --git a/docs/changelog/99803.yaml b/docs/changelog/99803.yaml new file mode 100644 index 0000000000000..ce0929eb20e07 --- /dev/null +++ b/docs/changelog/99803.yaml @@ -0,0 +1,5 @@ +pr: 99803 +summary: Do not use PIT in the presence of remote indices in source +area: Transform +type: bug +issues: [] diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index b3373eac02968..00fa7f200a3c3 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -146,7 +146,7 @@ protected void doNextSearch(long waitTimeInNanos, ActionListener injectPointInTimeIfNeeded( buildSearchRequest(), - ActionListener.wrap(pitSearchRequest -> doSearch(pitSearchRequest, nextPhase), nextPhase::onFailure) + ActionListener.wrap(searchRequest -> doSearch(searchRequest, nextPhase), nextPhase::onFailure) ); } @@ -435,7 +435,8 @@ private void injectPointInTimeIfNeeded( ActionListener> listener ) { SearchRequest searchRequest = namedSearchRequest.v2(); - if (disablePit || searchRequest.indices().length == 0) { + // We explicitly disable PIT in the presence of remote clusters in the source due to huge PIT handles causing performance problems. + if (disablePit || searchRequest.indices().length == 0 || transformConfig.getSource().requiresRemoteCluster()) { listener.onResponse(namedSearchRequest); return; } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index ad2976e9a3dc1..09b46c72c2ead 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -40,6 +40,7 @@ import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; +import org.elasticsearch.xpack.core.transform.transforms.SourceConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; import org.elasticsearch.xpack.core.transform.transforms.TransformConfig; import org.elasticsearch.xpack.core.transform.transforms.TransformConfigTests; @@ -290,6 +291,7 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException { } public void testDisablePit() throws InterruptedException { + // TransformConfigTests.randomTransformConfig never produces remote indices in the source, hence we are safe here. */ TransformConfig config = TransformConfigTests.randomTransformConfig(); boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit(); @@ -349,6 +351,65 @@ public void testDisablePit() throws InterruptedException { } } + public void testDisablePitWhenThereIsRemoteIndexInSource() throws InterruptedException { + TransformConfig config = new TransformConfig.Builder(TransformConfigTests.randomTransformConfig()) + // Remote index is configured within source + .setSource(new SourceConfig("remote-cluster:remote-index")) + .build(); + boolean pitEnabled = config.getSettings().getUsePit() == null || config.getSettings().getUsePit(); + + try (PitMockClient client = new PitMockClient(getTestName(), true)) { + MockClientTransformIndexer indexer = new MockClientTransformIndexer( + mock(ThreadPool.class), + new TransformServices( + mock(IndexBasedTransformConfigManager.class), + mock(TransformCheckpointService.class), + mock(TransformAuditor.class), + new TransformScheduler(Clock.systemUTC(), mock(ThreadPool.class), Settings.EMPTY) + ), + mock(CheckpointProvider.class), + new AtomicReference<>(IndexerState.STOPPED), + null, + client, + mock(TransformIndexerStats.class), + config, + null, + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 0L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new TransformCheckpoint( + "transform", + Instant.now().toEpochMilli(), + 2L, + Collections.emptyMap(), + Instant.now().toEpochMilli() + ), + new SeqNoPrimaryTermAndIndex(1, 1, TransformInternalIndexConstants.LATEST_INDEX_NAME), + mock(TransformContext.class), + false + ); + + // Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> assertNull(response.pointInTimeId()) + ); + + // reverse the setting + indexer.applyNewSettings(new SettingsConfig.Builder().setUsePit(pitEnabled == false).build()); + + // Because remote index is configured within source, we expect PIT *not* being used regardless the transform settings + this.assertAsync( + listener -> indexer.doNextSearch(0, listener), + response -> assertNull(response.pointInTimeId()) + ); + } + } + public void testHandlePitIndexNotFound() throws InterruptedException { // simulate a deleted index due to ILM try (PitMockClient client = new PitMockClient(getTestName(), true)) {