Skip to content

Commit

Permalink
[Transform] Make unattended transform create destination index explic…
Browse files Browse the repository at this point in the history
…itly
  • Loading branch information
przemekwitek committed Feb 14, 2024
1 parent a0ed9ba commit 5a4a980
Show file tree
Hide file tree
Showing 15 changed files with 261 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ public class TransformChainIT extends TransformRestTestCase {
},
"dest": {
"index": "%s",
"pipeline": "%s"
"pipeline": "%s",
"aliases": [
{
"alias": "%s"
},
{
"alias": "%s",
"move_on_creation": true
}
]
},
"sync": {
"time": {
Expand Down Expand Up @@ -172,9 +181,14 @@ private void testChainedTransforms(final int numTransforms) throws Exception {
// The number of documents is expected to be the same in all the indices.
String sourceIndex = i == 0 ? reviewsIndexName : transformIds.get(i - 1) + "-dest";
String destIndex = transformId + "-dest";
String destReadAlias = destIndex + "-read";
String destWriteAlias = destIndex + "-write";
assertFalse(indexExists(destIndex));
assertFalse(aliasExists(destReadAlias));
assertFalse(aliasExists(destWriteAlias));

assertAcknowledged(putTransform(transformId, createTransformConfig(sourceIndex, destIndex), true, RequestOptions.DEFAULT));
String transformConfig = createTransformConfig(sourceIndex, destIndex, destReadAlias, destWriteAlias);
assertAcknowledged(putTransform(transformId, transformConfig, true, RequestOptions.DEFAULT));
}

List<String> transformIdsShuffled = new ArrayList<>(transformIds);
Expand All @@ -198,6 +212,17 @@ private void testChainedTransforms(final int numTransforms) throws Exception {
}
}, 60, TimeUnit.SECONDS);

for (String transformId : transformIds) {
String destIndex = transformId + "-dest";
String destReadAlias = destIndex + "-read";
String destWriteAlias = destIndex + "-write";
// Verify that the destination index is created.
assertTrue(indexExists(destIndex));
// Verify that the destination index aliases are set up.
assertTrue(aliasExists(destReadAlias));
assertTrue(aliasExists(destWriteAlias));
}

// Stop all the transforms.
for (String transformId : transformIds) {
stopTransform(transformId);
Expand All @@ -208,12 +233,14 @@ private void testChainedTransforms(final int numTransforms) throws Exception {
}
}

private static String createTransformConfig(String sourceIndex, String destIndex) {
private static String createTransformConfig(String sourceIndex, String destIndex, String destReadAlias, String destWriteAlias) {
return Strings.format(
TRANSFORM_CONFIG_TEMPLATE,
sourceIndex,
destIndex,
SET_INGEST_TIME_PIPELINE,
destReadAlias,
destWriteAlias,
"1s",
"1s",
randomBoolean(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,14 @@ public void testTransformPermissionsDeferUnattendedNoDest() throws Exception {

startTransform(config.getId(), RequestOptions.DEFAULT);

// transform is red due to lacking permissions
assertRed(transformId, authIssue);
// Give the transform indexer enough time to try creating destination index
Thread.sleep(5_000);

String destIndexIssue = Strings.format("Could not create destination index [%s] for transform [%s]", destIndexName, transformId);
// transform's auth state status is still RED due to:
// - lacking permissions
// - and the inability to create destination index in the indexer (which is also a consequence of lacking permissions)
assertRed(transformId, authIssue, destIndexIssue);

// update transform's credentials so that the transform has permission to access source/dest indices
updateConfig(transformId, "{}", RequestOptions.DEFAULT.toBuilder().addHeader(AUTH_KEY, Users.SENIOR.header).build());
Expand Down Expand Up @@ -576,6 +582,7 @@ private void assertGreen(String transformId) throws IOException {
assertThat("Stats were: " + stats, extractValue(stats, "health", "issues"), is(nullValue()));
}

// We expect exactly the issues passed as "expectedHealthIssueDetails". Not more, not less.
@SuppressWarnings("unchecked")
private void assertRed(String transformId, String... expectedHealthIssueDetails) throws IOException {
Map<String, Object> stats = getBasicTransformStats(transformId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;

public class TransformDestIndexIT extends TransformRestTestCase {
Expand Down Expand Up @@ -114,32 +115,26 @@ public void testTransformDestIndexAliases() throws Exception {
assertAliases(destIndex2, destAliasAll, destAliasLatest);
}

public void testTransformDestIndexCreatedDuringUpdate_NoDeferValidation() throws Exception {
testTransformDestIndexCreatedDuringUpdate(false);
public void testUnattendedTransformDestIndexCreatedDuringUpdate_NoDeferValidation() throws Exception {
testUnattendedTransformDestIndexCreatedDuringUpdate(false);
}

public void testTransformDestIndexCreatedDuringUpdate_DeferValidation() throws Exception {
testTransformDestIndexCreatedDuringUpdate(true);
public void testUnattendedTransformDestIndexCreatedDuringUpdate_DeferValidation() throws Exception {
testUnattendedTransformDestIndexCreatedDuringUpdate(true);
}

private void testTransformDestIndexCreatedDuringUpdate(boolean deferValidation) throws Exception {
private void testUnattendedTransformDestIndexCreatedDuringUpdate(boolean deferValidation) throws Exception {
String transformId = "test_dest_index_on_update" + (deferValidation ? "-defer" : "");
String destIndex = transformId + "-dest";
String destAliasAll = transformId + ".all";
String destAliasLatest = transformId + ".latest";
List<DestAlias> destAliases = List.of(new DestAlias(destAliasAll, false), new DestAlias(destAliasLatest, true));

// Create and start the unattended transform
SettingsConfig settingsConfig = new SettingsConfig.Builder().setUnattended(true).build();
createPivotReviewsTransform(transformId, destIndex, null, null, destAliases, settingsConfig, null, null, REVIEWS_INDEX_NAME);
assertFalse(indexExists(destIndex));

// Create and start the unattended transform
createPivotReviewsTransform(
transformId,
destIndex,
null,
null,
null,
new SettingsConfig.Builder().setUnattended(true).build(),
null,
null,
REVIEWS_INDEX_NAME
);
startTransform(transformId);

// Update the unattended transform. This will trigger destination index creation.
Expand All @@ -151,6 +146,66 @@ private void testTransformDestIndexCreatedDuringUpdate(boolean deferValidation)

// Verify that the destination index now exists
assertTrue(indexExists(destIndex));
// Verify that both aliases are configured on the dest index
assertAliases(destIndex, destAliasAll, destAliasLatest);
}

public void testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex_NoDeferValidation() throws Exception {
testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex(false);
}

public void testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex_DeferValidation() throws Exception {
testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex(true);
}

private void testUnattendedTransformDestIndexCreatedDuringUpdate_EmptySourceIndex(boolean deferValidation) throws Exception {
String transformId = "test_dest_index_on_update-empty" + (deferValidation ? "-defer" : "");
String sourceIndexIndex = transformId + "-src";
String destIndex = transformId + "-dest";
String destAliasAll = transformId + ".all";
String destAliasLatest = transformId + ".latest";
List<DestAlias> destAliases = List.of(new DestAlias(destAliasAll, false), new DestAlias(destAliasLatest, true));

// We want to use an empty source index to make sure transform will not write to the destination index
putReviewsIndex(sourceIndexIndex, "date", false);
assertFalse(indexExists(destIndex));

// Create and start the unattended transform
SettingsConfig settingsConfig = new SettingsConfig.Builder().setUnattended(true).build();
createPivotReviewsTransform(transformId, destIndex, null, null, destAliases, settingsConfig, null, null, sourceIndexIndex);
startTransform(transformId);

// Verify that the destination index creation got skipped
assertFalse(indexExists(destIndex));

// Update the unattended transform. This will trigger destination index creation.
// The update has to change something in the config (here, max_page_search_size). Otherwise it would have been optimized away.
updateTransform(transformId, """
{ "settings": { "max_page_search_size": 123 } }""", deferValidation);

// Verify that the destination index now exists
assertTrue(indexExists(destIndex));
// Verify that both aliases are configured on the dest index
assertAliases(destIndex, destAliasAll, destAliasLatest);
}

public void testUnattendedTransformDestIndexCreatedByIndexer() throws Exception {
String transformId = "test_dest_index_in_indexer";
String destIndex = transformId + "-dest";
String destAliasAll = transformId + ".all";
String destAliasLatest = transformId + ".latest";
List<DestAlias> destAliases = List.of(new DestAlias(destAliasAll, false), new DestAlias(destAliasLatest, true));

// Create and start the unattended transform
SettingsConfig settingsConfig = new SettingsConfig.Builder().setUnattended(true).build();
createPivotReviewsTransform(transformId, destIndex, null, null, destAliases, settingsConfig, null, null, REVIEWS_INDEX_NAME);
startTransform(transformId);
waitForTransformCheckpoint(transformId, 1);

// Verify that the destination index exists
assertTrue(indexExists(destIndex));
// Verify that both aliases are configured on the dest index
assertAliases(destIndex, destAliasAll, destAliasLatest);
}

public void testTransformDestIndexMappings_DeduceMappings() throws Exception {
Expand Down Expand Up @@ -245,20 +300,9 @@ private void testTransformDestIndexMappings(String transformId, boolean deduceMa
assertTrue(indexExists(destIndex));
assertThat(
getIndexMappingAsMap(destIndex),
is(
equalTo(
Map.of(
"properties",
Map.of(
"avg_rating",
Map.of("type", "double"),
"reviewer",
Map.of("type", "keyword"),
"timestamp",
Map.of("type", "date")
)
)
)
hasEntry(
"properties",
Map.of("avg_rating", Map.of("type", "double"), "reviewer", Map.of("type", "keyword"), "timestamp", Map.of("type", "date"))
)
);
Map<String, Object> searchResult = getAsMap(destIndex + "/_search?q=reviewer:user_0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
threadPool,
clusterService,
settingsModule.getSettings(),
getTransformExtension().getTransformInternalIndexAdditionalSettings(),
getTransformExtension(),
expressionResolver
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
Expand All @@ -54,9 +57,11 @@
import org.elasticsearch.xpack.core.transform.transforms.TransformStoredDoc;
import org.elasticsearch.xpack.core.transform.transforms.TransformTaskState;
import org.elasticsearch.xpack.core.transform.utils.ExceptionsHelper;
import org.elasticsearch.xpack.transform.TransformExtension;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
import org.elasticsearch.xpack.transform.persistence.TransformIndex;
import org.elasticsearch.xpack.transform.transforms.pivot.SchemaUtil;
import org.elasticsearch.xpack.transform.utils.ExceptionRootCauseFinder;

Expand All @@ -75,6 +80,9 @@ class ClientTransformIndexer extends TransformIndexer {
private static final Logger logger = LogManager.getLogger(ClientTransformIndexer.class);

private final ParentTaskAssigningClient client;
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final Settings destIndexSettings;
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false);

private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndexHolder;
Expand All @@ -84,6 +92,9 @@ class ClientTransformIndexer extends TransformIndexer {

ClientTransformIndexer(
ThreadPool threadPool,
ClusterService clusterService,
IndexNameExpressionResolver indexNameExpressionResolver,
TransformExtension transformExtension,
TransformServices transformServices,
CheckpointProvider checkpointProvider,
AtomicReference<IndexerState> initialState,
Expand Down Expand Up @@ -112,6 +123,9 @@ class ClientTransformIndexer extends TransformIndexer {
context
);
this.client = ExceptionsHelper.requireNonNull(client, "client");
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.destIndexSettings = transformExtension.getTransformDestinationIndexSettings();
this.seqNoPrimaryTermAndIndexHolder = new AtomicReference<>(seqNoPrimaryTermAndIndex);

// TODO: move into context constructor
Expand Down Expand Up @@ -288,6 +302,20 @@ void doGetFieldMappings(ActionListener<Map<String, String>> fieldMappingsListene
SchemaUtil.getDestinationFieldMappings(client, getConfig().getDestination().getIndex(), fieldMappingsListener);
}

@Override
void doMaybeCreateDestIndex(Map<String, String> deducedDestIndexMappings, ActionListener<Boolean> listener) {
TransformIndex.createDestinationIndex(
client,
auditor,
indexNameExpressionResolver,
clusterService.state(),
transformConfig,
destIndexSettings,
deducedDestIndexMappings,
listener
);
}

void validate(ActionListener<ValidateTransformAction.Response> listener) {
ClientHelper.executeAsyncWithOrigin(
client,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@
package org.elasticsearch.xpack.transform.transforms;

import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexing.IndexerState;
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats;
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
import org.elasticsearch.xpack.transform.TransformExtension;
import org.elasticsearch.xpack.transform.TransformServices;
import org.elasticsearch.xpack.transform.checkpoint.CheckpointProvider;
import org.elasticsearch.xpack.transform.persistence.SeqNoPrimaryTermAndIndex;
Expand All @@ -23,6 +26,9 @@

class ClientTransformIndexerBuilder {
private ParentTaskAssigningClient parentTaskClient;
private ClusterService clusterService;
private IndexNameExpressionResolver indexNameExpressionResolver;
private TransformExtension transformExtension;
private TransformServices transformServices;
private TransformConfig transformConfig;
private TransformIndexerStats initialStats;
Expand All @@ -44,6 +50,9 @@ ClientTransformIndexer build(ThreadPool threadPool, TransformContext context) {

return new ClientTransformIndexer(
threadPool,
clusterService,
indexNameExpressionResolver,
transformExtension,
transformServices,
checkpointProvider,
new AtomicReference<>(this.indexerState),
Expand Down Expand Up @@ -73,6 +82,21 @@ ClientTransformIndexerBuilder setClient(ParentTaskAssigningClient parentTaskClie
return this;
}

ClientTransformIndexerBuilder setIndexNameExpressionResolver(IndexNameExpressionResolver indexNameExpressionResolver) {
this.indexNameExpressionResolver = indexNameExpressionResolver;
return this;
}

ClientTransformIndexerBuilder setClusterService(ClusterService clusterService) {
this.clusterService = clusterService;
return this;
}

ClientTransformIndexerBuilder setTransformExtension(TransformExtension transformExtension) {
this.transformExtension = transformExtension;
return this;
}

ClientTransformIndexerBuilder setTransformServices(TransformServices transformServices) {
this.transformServices = transformServices;
return this;
Expand Down
Loading

0 comments on commit 5a4a980

Please sign in to comment.