From 5a4a980cf53a316540f1cc697ac17b57dbeb432c Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Wed, 14 Feb 2024 12:46:12 +0100 Subject: [PATCH] [Transform] Make unattended transform create destination index explicitly --- .../integration/TransformChainIT.java | 33 +++++- .../TransformInsufficientPermissionsIT.java | 11 +- .../integration/TransformDestIndexIT.java | 106 +++++++++++++----- .../xpack/transform/Transform.java | 2 +- .../transforms/ClientTransformIndexer.java | 28 +++++ .../ClientTransformIndexerBuilder.java | 24 ++++ .../transforms/TransformIndexer.java | 15 ++- .../TransformPersistentTasksExecutor.java | 12 +- .../ClientTransformIndexerTests.java | 21 ++++ .../TransformIndexerFailureHandlingTests.java | 12 ++ ...IndexerFailureOnStatePersistenceTests.java | 18 +++ .../TransformIndexerStateTests.java | 10 ++ .../transforms/TransformIndexerTests.java | 5 + ...TransformPersistentTasksExecutorTests.java | 3 +- .../transforms/TransformTaskTests.java | 5 + 15 files changed, 261 insertions(+), 44 deletions(-) diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformChainIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformChainIT.java index 74ee8ea88e04f..600ceb3cd8202 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformChainIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformChainIT.java @@ -38,7 +38,16 @@ public class TransformChainIT extends TransformRestTestCase { }, "dest": { "index": "%s", - "pipeline": "%s" + "pipeline": "%s", + "aliases": [ + { + "alias": "%s" + }, + { + "alias": "%s", + "move_on_creation": true + } + ] }, "sync": { "time": { @@ -172,9 +181,14 @@ private void testChainedTransforms(final int numTransforms) throws Exception { // The number of documents is expected to be the same in all the indices. String sourceIndex = i == 0 ? reviewsIndexName : transformIds.get(i - 1) + "-dest"; String destIndex = transformId + "-dest"; + String destReadAlias = destIndex + "-read"; + String destWriteAlias = destIndex + "-write"; assertFalse(indexExists(destIndex)); + assertFalse(aliasExists(destReadAlias)); + assertFalse(aliasExists(destWriteAlias)); - assertAcknowledged(putTransform(transformId, createTransformConfig(sourceIndex, destIndex), true, RequestOptions.DEFAULT)); + String transformConfig = createTransformConfig(sourceIndex, destIndex, destReadAlias, destWriteAlias); + assertAcknowledged(putTransform(transformId, transformConfig, true, RequestOptions.DEFAULT)); } List transformIdsShuffled = new ArrayList<>(transformIds); @@ -198,6 +212,17 @@ private void testChainedTransforms(final int numTransforms) throws Exception { } }, 60, TimeUnit.SECONDS); + for (String transformId : transformIds) { + String destIndex = transformId + "-dest"; + String destReadAlias = destIndex + "-read"; + String destWriteAlias = destIndex + "-write"; + // Verify that the destination index is created. + assertTrue(indexExists(destIndex)); + // Verify that the destination index aliases are set up. + assertTrue(aliasExists(destReadAlias)); + assertTrue(aliasExists(destWriteAlias)); + } + // Stop all the transforms. for (String transformId : transformIds) { stopTransform(transformId); @@ -208,12 +233,14 @@ private void testChainedTransforms(final int numTransforms) throws Exception { } } - private static String createTransformConfig(String sourceIndex, String destIndex) { + private static String createTransformConfig(String sourceIndex, String destIndex, String destReadAlias, String destWriteAlias) { return Strings.format( TRANSFORM_CONFIG_TEMPLATE, sourceIndex, destIndex, SET_INGEST_TIME_PIPELINE, + destReadAlias, + destWriteAlias, "1s", "1s", randomBoolean(), diff --git a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java index e46d32295078f..dc48ceb7b309b 100644 --- a/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java +++ b/x-pack/plugin/transform/qa/multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformInsufficientPermissionsIT.java @@ -420,8 +420,14 @@ public void testTransformPermissionsDeferUnattendedNoDest() throws Exception { startTransform(config.getId(), RequestOptions.DEFAULT); - // transform is red due to lacking permissions - assertRed(transformId, authIssue); + // Give the transform indexer enough time to try creating destination index + Thread.sleep(5_000); + + String destIndexIssue = Strings.format("Could not create destination index [%s] for transform [%s]", destIndexName, transformId); + // transform's auth state status is still RED due to: + // - lacking permissions + // - and the inability to create destination index in the indexer (which is also a consequence of lacking permissions) + assertRed(transformId, authIssue, destIndexIssue); // update transform's credentials so that the transform has permission to access source/dest indices updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build()); @@ -576,6 +582,7 @@ private void assertGreen(String transformId) throws IOException { assertThat("Stats were: " + stats, extractValue(stats, "health", "issues"), is(nullValue())); } + // We expect exactly the issues passed as "expectedHealthIssueDetails". Not more, not less. @SuppressWarnings("unchecked") private void assertRed(String transformId, String... expectedHealthIssueDetails) throws IOException { Map stats = getBasicTransformStats(transformId); diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformDestIndexIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformDestIndexIT.java index 29576231d848c..4527fefde4b02 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformDestIndexIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformDestIndexIT.java @@ -24,6 +24,7 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.is; public class TransformDestIndexIT extends TransformRestTestCase { @@ -114,32 +115,26 @@ public void testTransformDestIndexAliases() throws Exception { assertAliases(destIndex2, destAliasAll, destAliasLatest); } - public void testTransformDestIndexCreatedDuringUpdate_NoDeferValidation() throws Exception { - testTransformDestIndexCreatedDuringUpdate(false); + public void testUnattendedTransformDestIndexCreatedDuringUpdate_NoDeferValidation() throws Exception { + testUnattendedTransformDestIndexCreatedDuringUpdate(false); } - public void testTransformDestIndexCreatedDuringUpdate_DeferValidation() throws Exception { - testTransformDestIndexCreatedDuringUpdate(true); + public void testUnattendedTransformDestIndexCreatedDuringUpdate_DeferValidation() throws Exception { + testUnattendedTransformDestIndexCreatedDuringUpdate(true); } - private void testTransformDestIndexCreatedDuringUpdate(boolean deferValidation) throws Exception { + private void testUnattendedTransformDestIndexCreatedDuringUpdate(boolean deferValidation) throws Exception { String transformId = "test_dest_index_on_update" + (deferValidation ? "-defer" : ""); String destIndex = transformId + "-dest"; + String destAliasAll = transformId + ".all"; + String destAliasLatest = transformId + ".latest"; + List destAliases = List.of(new DestAlias(destAliasAll, false), new DestAlias(destAliasLatest, true)); + // Create and start the unattended transform + SettingsConfig settingsConfig = new SettingsConfig.Builder().setUnattended(true).build(); + createPivotReviewsTransform(transformId, destIndex, null, null, destAliases, settingsConfig, null, null, REVIEWS_INDEX_NAME); assertFalse(indexExists(destIndex)); - // Create and start the unattended transform - createPivotReviewsTransform( - transformId, - destIndex, - null, - null, - null, - new SettingsConfig.Builder().setUnattended(true).build(), - null, - null, - REVIEWS_INDEX_NAME - ); startTransform(transformId); // Update the unattended transform. This will trigger destination index creation. @@ -151,6 +146,66 @@ private void testTransformDestIndexCreatedDuringUpdate(boolean deferValidation) // Verify that the destination index now exists assertTrue(indexExists(destIndex)); + // Verify that both aliases are configured on the dest index + assertAliases(destIndex, destAliasAll, destAliasLatest); + } + + public void testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex_NoDeferValidation() throws Exception { + testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex(false); + } + + public void testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex_DeferValidation() throws Exception { + testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex(true); + } + + private void testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex(boolean deferValidation) throws Exception { + String transformId = "test_dest_index_on_update-empty" + (deferValidation ? "-defer" : ""); + String sourceIndexIndex = transformId + "-src"; + String destIndex = transformId + "-dest"; + String destAliasAll = transformId + ".all"; + String destAliasLatest = transformId + ".latest"; + List destAliases = List.of(new DestAlias(destAliasAll, false), new DestAlias(destAliasLatest, true)); + + // We want to use an empty source index to make sure transform will not write to the destination index + putReviewsIndex(sourceIndexIndex, "date", false); + assertFalse(indexExists(destIndex)); + + // Create and start the unattended transform + SettingsConfig settingsConfig = new SettingsConfig.Builder().setUnattended(true).build(); + createPivotReviewsTransform(transformId, destIndex, null, null, destAliases, settingsConfig, null, null, sourceIndexIndex); + startTransform(transformId); + + // Verify that the destination index creation got skipped + assertFalse(indexExists(destIndex)); + + // Update the unattended transform. This will trigger destination index creation. + // The update has to change something in the config (here, max_page_search_size). Otherwise it would have been optimized away. + updateTransform(transformId, """ + { "settings": { "max_page_search_size": 123 } }""", deferValidation); + + // Verify that the destination index now exists + assertTrue(indexExists(destIndex)); + // Verify that both aliases are configured on the dest index + assertAliases(destIndex, destAliasAll, destAliasLatest); + } + + public void testUnattendedTransformDestIndexCreatedByIndexer() throws Exception { + String transformId = "test_dest_index_in_indexer"; + String destIndex = transformId + "-dest"; + String destAliasAll = transformId + ".all"; + String destAliasLatest = transformId + ".latest"; + List destAliases = List.of(new DestAlias(destAliasAll, false), new DestAlias(destAliasLatest, true)); + + // Create and start the unattended transform + SettingsConfig settingsConfig = new SettingsConfig.Builder().setUnattended(true).build(); + createPivotReviewsTransform(transformId, destIndex, null, null, destAliases, settingsConfig, null, null, REVIEWS_INDEX_NAME); + startTransform(transformId); + waitForTransformCheckpoint(transformId, 1); + + // Verify that the destination index exists + assertTrue(indexExists(destIndex)); + // Verify that both aliases are configured on the dest index + assertAliases(destIndex, destAliasAll, destAliasLatest); } public void testTransformDestIndexMappings_DeduceMappings() throws Exception { @@ -245,20 +300,9 @@ private void testTransformDestIndexMappings(String transformId, boolean deduceMa assertTrue(indexExists(destIndex)); assertThat( getIndexMappingAsMap(destIndex), - is( - equalTo( - Map.of( - "properties", - Map.of( - "avg_rating", - Map.of("type", "double"), - "reviewer", - Map.of("type", "keyword"), - "timestamp", - Map.of("type", "date") - ) - ) - ) + hasEntry( + "properties", + Map.of("avg_rating", Map.of("type", "double"), "reviewer", Map.of("type", "keyword"), "timestamp", Map.of("type", "date")) ) ); Map searchResult = getAsMap(destIndex + "/_search?q=reviewer:user_0"); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java index bde5576fb0c92..54a7c9ec733c2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/Transform.java @@ -283,7 +283,7 @@ public List> getPersistentTasksExecutor( threadPool, clusterService, settingsModule.getSettings(), - getTransformExtension().getTransformInternalIndexAdditionalSettings(), + getTransformExtension(), expressionResolver ) ); diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java index 1f9ec86ada8e2..1634f417924c0 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java @@ -27,7 +27,10 @@ import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.client.internal.ParentTaskAssigningClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.logging.LoggerMessageFormat; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -54,9 +57,11 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper; +import org.elasticsearch.xpack.transform.TransformExtension; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; +import org.elasticsearch.xpack.transform.persistence.TransformIndex; import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil; import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder; @@ -75,6 +80,9 @@ class ClientTransformIndexer extends TransformIndexer { private static final Logger logger = LogManager.getLogger(ClientTransformIndexer.class); private final ParentTaskAssigningClient client; + private final ClusterService clusterService; + private final IndexNameExpressionResolver indexNameExpressionResolver; + private final Settings destIndexSettings; private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); private final AtomicReference seqNoPrimaryTermAndIndexHolder; @@ -84,6 +92,9 @@ class ClientTransformIndexer extends TransformIndexer { ClientTransformIndexer( ThreadPool threadPool, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + TransformExtension transformExtension, TransformServices transformServices, CheckpointProvider checkpointProvider, AtomicReference initialState, @@ -112,6 +123,9 @@ class ClientTransformIndexer extends TransformIndexer { context ); this.client = ExceptionsHelper.requireNonNull(client, "client"); + this.clusterService = clusterService; + this.indexNameExpressionResolver = indexNameExpressionResolver; + this.destIndexSettings = transformExtension.getTransformDestinationIndexSettings(); this.seqNoPrimaryTermAndIndexHolder = new AtomicReference<>(seqNoPrimaryTermAndIndex); // TODO: move into context constructor @@ -288,6 +302,20 @@ void doGetFieldMappings(ActionListener> fieldMappingsListene SchemaUtil.getDestinationFieldMappings(client, getConfig().getDestination().getIndex(), fieldMappingsListener); } + @Override + void doMaybeCreateDestIndex(Map deducedDestIndexMappings, ActionListener listener) { + TransformIndex.createDestinationIndex( + client, + auditor, + indexNameExpressionResolver, + clusterService.state(), + transformConfig, + destIndexSettings, + deducedDestIndexMappings, + listener + ); + } + void validate(ActionListener listener) { ClientHelper.executeAsyncWithOrigin( client, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java index 402f2489698e2..815fc66694ea2 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerBuilder.java @@ -8,6 +8,8 @@ package org.elasticsearch.xpack.transform.transforms; import org.elasticsearch.client.internal.ParentTaskAssigningClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.indexing.IndexerState; import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint; @@ -15,6 +17,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition; import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; +import org.elasticsearch.xpack.transform.TransformExtension; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; @@ -23,6 +26,9 @@ class ClientTransformIndexerBuilder { private ParentTaskAssigningClient parentTaskClient; + private ClusterService clusterService; + private IndexNameExpressionResolver indexNameExpressionResolver; + private TransformExtension transformExtension; private TransformServices transformServices; private TransformConfig transformConfig; private TransformIndexerStats initialStats; @@ -44,6 +50,9 @@ ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) { return new ClientTransformIndexer( threadPool, + clusterService, + indexNameExpressionResolver, + transformExtension, transformServices, checkpointProvider, new AtomicReference<>(this.indexerState), @@ -73,6 +82,21 @@ ClientTransformIndexerBuilder setClient(ParentTaskAssigningClient parentTaskClie return this; } + ClientTransformIndexerBuilder setIndexNameExpressionResolver(IndexNameExpressionResolver indexNameExpressionResolver) { + this.indexNameExpressionResolver = indexNameExpressionResolver; + return this; + } + + ClientTransformIndexerBuilder setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + return this; + } + + ClientTransformIndexerBuilder setTransformExtension(TransformExtension transformExtension) { + this.transformExtension = transformExtension; + return this; + } + ClientTransformIndexerBuilder setTransformServices(TransformServices transformServices) { this.transformServices = transformServices; return this; diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java index e56a54a166b39..4b2da731351d7 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformIndexer.java @@ -171,6 +171,8 @@ public TransformIndexer( abstract void doGetFieldMappings(ActionListener> fieldMappingsListener); + abstract void doMaybeCreateDestIndex(Map deducedDestIndexMappings, ActionListener listener); + abstract void doDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, ActionListener responseListener); abstract void refreshDestinationIndex(ActionListener responseListener); @@ -288,7 +290,7 @@ protected void onStart(long now, ActionListener listener) { // On each run, we need to get the total number of docs and reset the count of processed docs // Since multiple checkpoints can be executed in the task while it is running on the same node, we need to gather // the progress here, and not in the executor. - ActionListener configurationReadyListener = ActionListener.wrap(r -> { + ActionListener configurationReadyListener = ActionListener.wrap(unused -> { initializeFunction(); if (initialRun()) { @@ -339,7 +341,16 @@ protected void onStart(long now, ActionListener listener) { // ... otherwise we fall back to index mappings deduced based on source indices this.fieldMappings = deducedDestIndexMappings.get(); } - configurationReadyListener.onResponse(null); + // Since the unattended transform could not have created the destination index yet, we do it here. + // This is important to create the destination index explicitly before indexing first documents. Otherwise, the destination + // index aliases may be missing. + if (destIndexMappings.isEmpty() + && context.getCheckpoint() == 0 + && Boolean.TRUE.equals(transformConfig.getSettings().getUnattended())) { + doMaybeCreateDestIndex(deducedDestIndexMappings.get(), configurationReadyListener); + } else { + configurationReadyListener.onResponse(null); + } }, listener::onFailure); ActionListener reLoadFieldMappingsListener = ActionListener.wrap(updateConfigResponse -> { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java index ac690c625124f..abdafd3dceb9a 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutor.java @@ -46,6 +46,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; import org.elasticsearch.xpack.transform.Transform; +import org.elasticsearch.xpack.transform.TransformExtension; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.notifications.TransformAuditor; import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex; @@ -75,7 +76,7 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor stateHolder = new SetOnce<>(); @@ -348,7 +352,7 @@ protected void nodeOperation(AllocatedPersistentTask task, @Nullable TransformTa TransformInternalIndex.createLatestVersionedIndexIfRequired( clusterService, parentTaskClient, - transformInternalIndexAdditionalSettings, + transformExtension.getTransformInternalIndexAdditionalSettings(), templateCheckListener ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java index ba7c09ed35a6d..43a8f35cfeafe 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexerTests.java @@ -22,6 +22,8 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.client.internal.ParentTaskAssigningClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -49,6 +51,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats; import org.elasticsearch.xpack.core.transform.transforms.TransformProgress; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; +import org.elasticsearch.xpack.transform.TransformExtension; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; @@ -129,6 +132,9 @@ public void testPitInjection() throws InterruptedException { final var client = new PitMockClient(threadPool, true); MockClientTransformIndexer indexer = new MockClientTransformIndexer( mock(ThreadPool.class), + mock(ClusterService.class), + mock(IndexNameExpressionResolver.class), + mock(TransformExtension.class), new TransformServices( mock(IndexBasedTransformConfigManager.class), mock(TransformCheckpointService.class), @@ -223,6 +229,9 @@ public void testPitInjectionIfPitNotSupported() throws InterruptedException { final var client = new PitMockClient(threadPool, false); MockClientTransformIndexer indexer = new MockClientTransformIndexer( mock(ThreadPool.class), + mock(ClusterService.class), + mock(IndexNameExpressionResolver.class), + mock(TransformExtension.class), new TransformServices( mock(IndexBasedTransformConfigManager.class), mock(TransformCheckpointService.class), @@ -306,6 +315,9 @@ public void testDisablePit() throws InterruptedException { final var client = new PitMockClient(threadPool, true); MockClientTransformIndexer indexer = new MockClientTransformIndexer( mock(ThreadPool.class), + mock(ClusterService.class), + mock(IndexNameExpressionResolver.class), + mock(TransformExtension.class), new TransformServices( mock(IndexBasedTransformConfigManager.class), mock(TransformCheckpointService.class), @@ -393,6 +405,9 @@ private static class MockClientTransformIndexer extends ClientTransformIndexer { MockClientTransformIndexer( ThreadPool threadPool, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + TransformExtension transformExtension, TransformServices transformServices, CheckpointProvider checkpointProvider, AtomicReference initialState, @@ -409,6 +424,9 @@ private static class MockClientTransformIndexer extends ClientTransformIndexer { ) { super( threadPool, + clusterService, + indexNameExpressionResolver, + transformExtension, transformServices, checkpointProvider, initialState, @@ -546,6 +564,9 @@ private ClientTransformIndexer createTestIndexer(ParentTaskAssigningClient clien return new ClientTransformIndexer( mock(ThreadPool.class), + mock(ClusterService.class), + mock(IndexNameExpressionResolver.class), + mock(TransformExtension.class), new TransformServices( mock(IndexBasedTransformConfigManager.class), mock(TransformCheckpointService.class), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java index a18c926e21da6..079ab6afe2200 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureHandlingTests.java @@ -20,6 +20,8 @@ import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.internal.Client; import org.elasticsearch.client.internal.ParentTaskAssigningClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker.Durability; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; @@ -48,6 +50,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.transform.Transform; +import org.elasticsearch.xpack.transform.TransformExtension; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; @@ -109,6 +112,9 @@ static class MockedTransformIndexer extends ClientTransformIndexer { MockedTransformIndexer( ThreadPool threadPool, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + TransformExtension transformExtension, String executorName, IndexBasedTransformConfigManager transformsConfigManager, CheckpointProvider checkpointProvider, @@ -124,6 +130,9 @@ static class MockedTransformIndexer extends ClientTransformIndexer { ) { super( threadPool, + clusterService, + indexNameExpressionResolver, + transformExtension, new TransformServices( transformsConfigManager, mock(TransformCheckpointService.class), @@ -1041,6 +1050,9 @@ private MockedTransformIndexer createMockIndexer( }).when(transformConfigManager).getTransformConfiguration(any(), any()); MockedTransformIndexer indexer = new MockedTransformIndexer( threadPool, + mock(ClusterService.class), + mock(IndexNameExpressionResolver.class), + mock(TransformExtension.class), executorName, transformConfigManager, mock(CheckpointProvider.class), diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java index 750e535c4330f..7d5c5a41e3154 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerFailureOnStatePersistenceTests.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.client.internal.ParentTaskAssigningClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; @@ -32,6 +34,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; +import org.elasticsearch.xpack.transform.TransformExtension; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; @@ -61,6 +64,9 @@ private static class MockClientTransformIndexer extends ClientTransformIndexer { MockClientTransformIndexer( ThreadPool threadPool, + ClusterService clusterService, + IndexNameExpressionResolver indexNameExpressionResolver, + TransformExtension transformExtension, TransformServices transformServices, CheckpointProvider checkpointProvider, AtomicReference initialState, @@ -77,6 +83,9 @@ private static class MockClientTransformIndexer extends ClientTransformIndexer { ) { super( threadPool, + clusterService, + indexNameExpressionResolver, + transformExtension, transformServices, checkpointProvider, initialState, @@ -214,6 +223,9 @@ public void fail(Throwable exception, String failureMessage, ActionListener> fieldMappingsListene fieldMappingsListener.onResponse(Collections.emptyMap()); } + @Override + void doMaybeCreateDestIndex(Map deducedDestIndexMappings, ActionListener listener) { + listener.onResponse(null); + } + @Override void persistState(TransformState state, ActionListener listener) { persistedState = state; @@ -312,6 +317,11 @@ void doGetFieldMappings(ActionListener> fieldMappingsListene fieldMappingsListener.onResponse(Collections.emptyMap()); } + @Override + void doMaybeCreateDestIndex(Map deducedDestIndexMappings, ActionListener listener) { + listener.onResponse(null); + } + @Override void doDeleteByQuery(DeleteByQueryRequest deleteByQueryRequest, ActionListener responseListener) { responseListener.onResponse( diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java index ee86f2ca6fcf4..279ce65be0be2 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformIndexerTests.java @@ -255,6 +255,11 @@ void doGetFieldMappings(ActionListener> fieldMappingsListene fieldMappingsListener.onResponse(Collections.emptyMap()); } + @Override + void doMaybeCreateDestIndex(Map deducedDestIndexMappings, ActionListener listener) { + listener.onResponse(null); + } + public boolean waitingForNextSearch() { return super.getScheduledNextSearch() != null; } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java index 69d81c85a62d3..b32ec235fcc6f 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformPersistentTasksExecutorTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.xpack.core.transform.TransformConfigVersion; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.persistence.TransformInternalIndexConstants; +import org.elasticsearch.xpack.transform.DefaultTransformExtension; import org.elasticsearch.xpack.transform.Transform; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; @@ -458,7 +459,7 @@ private TransformPersistentTasksExecutor buildTaskExecutor() { threadPool, clusterService, Settings.EMPTY, - Settings.EMPTY, + new DefaultTransformExtension(), TestIndexNameExpressionResolver.newInstance() ); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java index 0e0d88dee7333..a34d35e4d3cb5 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/TransformTaskTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.client.internal.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; @@ -43,6 +44,7 @@ import org.elasticsearch.xpack.core.transform.transforms.TransformState; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskParams; import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState; +import org.elasticsearch.xpack.transform.DefaultTransformExtension; import org.elasticsearch.xpack.transform.TransformServices; import org.elasticsearch.xpack.transform.checkpoint.TransformCheckpointService; import org.elasticsearch.xpack.transform.notifications.MockTransformAuditor; @@ -196,6 +198,9 @@ private TransformServices transformServices(Clock clock, TransformAuditor audito private ClientTransformIndexerBuilder indexerBuilder(TransformConfig transformConfig, TransformServices transformServices) { return new ClientTransformIndexerBuilder().setClient(new ParentTaskAssigningClient(client, TaskId.EMPTY_TASK_ID)) + .setClusterService(mock(ClusterService.class)) + .setIndexNameExpressionResolver(mock(IndexNameExpressionResolver.class)) + .setTransformExtension(new DefaultTransformExtension()) .setTransformConfig(transformConfig) .setTransformServices(transformServices); }