Skip to content

Commit

Permalink
[DSL] skip deleting indices that have in-progress downsampling operat…
Browse files Browse the repository at this point in the history
…ions (elastic#101495) (elastic#101529)

A backing index for which we kicked off downsampling will have a target
`downsample-{interval}-.ds...` index created outside the data stream.
The source (backing) index will have a downsample status of `STARTED` at
this point.
When downsampling completes te downsample status of the source index is
changed to `SUCCESS` and DSL will atomically swap the downsample index
in the data stream instead of the source index, and delete the source
index.

If retention time for the source index lapses before we finished the
downsample operation, the target downsample index will remain in the
system, outside the data stream as the source index will be deleted.

This makes the DSL retention wait for a potential in-progress downsampling
operation to complete, before executing retention.
  • Loading branch information
andreidan authored Oct 30, 2023
1 parent f9bf18a commit 811e58f
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 5 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/101495.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 101495
summary: "[DSL] skip deleting indices that have in-progress downsampling operations"
area: Data streams
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@

import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.STARTED;
import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.UNKNOWN;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS;
import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY;

Expand Down Expand Up @@ -764,14 +765,30 @@ private Set<Index> maybeExecuteRetention(ClusterState state, DataStream dataStre

for (Index index : backingIndicesOlderThanRetention) {
if (indicesToExcludeForRemainingRun.contains(index) == false) {
indicesToBeRemoved.add(index);
IndexMetadata backingIndex = metadata.index(index);
assert backingIndex != null : "the data stream backing indices must exist";

// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
// let's start simple and reevaluate
String indexName = backingIndex.getIndex().getName();
deleteIndexOnce(indexName, "the lapsed [" + retention + "] retention period");
IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings());
// we don't want to delete the source index if they have an in-progress downsampling operation because the
// target downsample index will remain in the system as a standalone index
if (downsampleStatus.equals(UNKNOWN)) {
indicesToBeRemoved.add(index);

// there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request)
// let's start simple and reevaluate
String indexName = backingIndex.getIndex().getName();
deleteIndexOnce(indexName, "the lapsed [" + retention + "] retention period");
} else {
// there's an opportunity here to cancel downsampling and delete the source index now
logger.trace(
"Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed "
+ "because there's a downsampling operation currently in progress for this index. Current downsampling "
+ "status is [{}]. When downsampling completes, DSL will delete this index.",
index.getName(),
retention,
downsampleStatus
);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE;
import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.STARTED;
import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.SUCCESS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.UNKNOWN;
import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.createDataStream;
import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING;
Expand Down Expand Up @@ -293,6 +294,84 @@ public void testRetentionNotExecutedForTSIndicesWithinTimeBounds() {
assertThat(((DeleteIndexRequest) deleteIndexRequest).indices(), is(new String[] { dataStream.getIndices().get(0).getName() }));
}

public void testRetentionSkippedWhilstDownsamplingInProgress() {
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
int numBackingIndices = 3;
Metadata.Builder builder = Metadata.builder();
DataStream dataStream = createDataStream(
builder,
dataStreamName,
numBackingIndices,
settings(IndexVersion.current()),
DataStreamLifecycle.newBuilder().dataRetention(TimeValue.timeValueMillis(0)).build(),
now
);
builder.put(dataStream);

ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();

{
Metadata metadata = state.metadata();
Metadata.Builder metaBuilder = Metadata.builder(metadata);

String firstBackingIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
IndexMetadata indexMetadata = metadata.index(firstBackingIndex);
IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(indexMetadata);
indexMetaBuilder.settings(
Settings.builder()
.put(indexMetadata.getSettings())
.put(
IndexMetadata.INDEX_DOWNSAMPLE_STATUS_KEY,
randomValueOtherThan(UNKNOWN, () -> randomFrom(IndexMetadata.DownsampleTaskStatus.values()))
)
);
indexMetaBuilder.putCustom(
LIFECYCLE_CUSTOM_INDEX_METADATA_KEY,
Map.of(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY, String.valueOf(System.currentTimeMillis()))
);
metaBuilder.put(indexMetaBuilder);
state = ClusterState.builder(ClusterName.DEFAULT).metadata(metaBuilder).build();

dataStreamLifecycleService.run(state);
assertThat(clientSeenRequests.size(), is(2)); // rollover the write index and delete the second generation
assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
assertThat(clientSeenRequests.get(1), instanceOf(DeleteIndexRequest.class));
assertThat(
((DeleteIndexRequest) clientSeenRequests.get(1)).indices()[0],
is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))
);
}

{
// a lack of downsample status (i.e. the default `UNKNOWN`) must not prevent retention
Metadata metadata = state.metadata();
Metadata.Builder metaBuilder = Metadata.builder(metadata);

String firstBackingIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1);
IndexMetadata indexMetadata = metadata.index(firstBackingIndex);
IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(indexMetadata);
indexMetaBuilder.settings(
Settings.builder().put(indexMetadata.getSettings()).putNull(IndexMetadata.INDEX_DOWNSAMPLE_STATUS_KEY)
);
metaBuilder.put(indexMetaBuilder);
state = ClusterState.builder(ClusterName.DEFAULT).metadata(metaBuilder).build();

dataStreamLifecycleService.run(state);
assertThat(clientSeenRequests.size(), is(3)); // rollover the write index and delete the other two generations
assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class));
assertThat(clientSeenRequests.get(1), instanceOf(DeleteIndexRequest.class));
assertThat(
((DeleteIndexRequest) clientSeenRequests.get(1)).indices()[0],
is(DataStream.getDefaultBackingIndexName(dataStreamName, 2))
);
assertThat(clientSeenRequests.get(2), instanceOf(DeleteIndexRequest.class));
assertThat(
((DeleteIndexRequest) clientSeenRequests.get(2)).indices()[0],
is(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
);
}
}

public void testIlmManagedIndicesAreSkipped() {
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
int numBackingIndices = 3;
Expand Down

0 comments on commit 811e58f

Please sign in to comment.