From 811e58f4af13b5db7086fbc655f2fc95e026f050 Mon Sep 17 00:00:00 2001 From: Andrei Dan Date: Mon, 30 Oct 2023 15:31:01 +0000 Subject: [PATCH] [DSL] skip deleting indices that have in-progress downsampling operations (#101495) (#101529) A backing index for which we kicked off downsampling will have a target `downsample-{interval}-.ds...` index created outside the data stream. The source (backing) index will have a downsample status of `STARTED` at this point. When downsampling completes te downsample status of the source index is changed to `SUCCESS` and DSL will atomically swap the downsample index in the data stream instead of the source index, and delete the source index. If retention time for the source index lapses before we finished the downsample operation, the target downsample index will remain in the system, outside the data stream as the source index will be deleted. This makes the DSL retention wait for a potential in-progress downsampling operation to complete, before executing retention. --- docs/changelog/101495.yaml | 5 ++ .../lifecycle/DataStreamLifecycleService.java | 27 +++++-- .../DataStreamLifecycleServiceTests.java | 79 +++++++++++++++++++ 3 files changed, 106 insertions(+), 5 deletions(-) create mode 100644 docs/changelog/101495.yaml diff --git a/docs/changelog/101495.yaml b/docs/changelog/101495.yaml new file mode 100644 index 0000000000000..f61c9b824b77c --- /dev/null +++ b/docs/changelog/101495.yaml @@ -0,0 +1,5 @@ +pr: 101495 +summary: "[DSL] skip deleting indices that have in-progress downsampling operations" +area: Data streams +type: bug +issues: [] diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index ff6bf755734ac..5475193a28209 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -90,6 +90,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE; import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.STARTED; +import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.UNKNOWN; import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_DOWNSAMPLE_STATUS; import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY; @@ -764,14 +765,30 @@ private Set maybeExecuteRetention(ClusterState state, DataStream dataStre for (Index index : backingIndicesOlderThanRetention) { if (indicesToExcludeForRemainingRun.contains(index) == false) { - indicesToBeRemoved.add(index); IndexMetadata backingIndex = metadata.index(index); assert backingIndex != null : "the data stream backing indices must exist"; - // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) - // let's start simple and reevaluate - String indexName = backingIndex.getIndex().getName(); - deleteIndexOnce(indexName, "the lapsed [" + retention + "] retention period"); + IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings()); + // we don't want to delete the source index if they have an in-progress downsampling operation because the + // target downsample index will remain in the system as a standalone index + if (downsampleStatus.equals(UNKNOWN)) { + indicesToBeRemoved.add(index); + + // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) + // let's start simple and reevaluate + String indexName = backingIndex.getIndex().getName(); + deleteIndexOnce(indexName, "the lapsed [" + retention + "] retention period"); + } else { + // there's an opportunity here to cancel downsampling and delete the source index now + logger.trace( + "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed " + + "because there's a downsampling operation currently in progress for this index. Current downsampling " + + "status is [{}]. When downsampling completes, DSL will delete this index.", + index.getName(), + retention, + downsampleStatus + ); + } } } } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index 1e14dbff3265f..a75071f27181c 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -98,6 +98,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock.WRITE; import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.STARTED; import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.SUCCESS; +import static org.elasticsearch.cluster.metadata.IndexMetadata.DownsampleTaskStatus.UNKNOWN; import static org.elasticsearch.datastreams.DataStreamsPlugin.LIFECYCLE_CUSTOM_INDEX_METADATA_KEY; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.createDataStream; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService.DATA_STREAM_MERGE_POLICY_TARGET_FACTOR_SETTING; @@ -293,6 +294,84 @@ public void testRetentionNotExecutedForTSIndicesWithinTimeBounds() { assertThat(((DeleteIndexRequest) deleteIndexRequest).indices(), is(new String[] { dataStream.getIndices().get(0).getName() })); } + public void testRetentionSkippedWhilstDownsamplingInProgress() { + String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + int numBackingIndices = 3; + Metadata.Builder builder = Metadata.builder(); + DataStream dataStream = createDataStream( + builder, + dataStreamName, + numBackingIndices, + settings(IndexVersion.current()), + DataStreamLifecycle.newBuilder().dataRetention(TimeValue.timeValueMillis(0)).build(), + now + ); + builder.put(dataStream); + + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); + + { + Metadata metadata = state.metadata(); + Metadata.Builder metaBuilder = Metadata.builder(metadata); + + String firstBackingIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + IndexMetadata indexMetadata = metadata.index(firstBackingIndex); + IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(indexMetadata); + indexMetaBuilder.settings( + Settings.builder() + .put(indexMetadata.getSettings()) + .put( + IndexMetadata.INDEX_DOWNSAMPLE_STATUS_KEY, + randomValueOtherThan(UNKNOWN, () -> randomFrom(IndexMetadata.DownsampleTaskStatus.values())) + ) + ); + indexMetaBuilder.putCustom( + LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, + Map.of(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY, String.valueOf(System.currentTimeMillis())) + ); + metaBuilder.put(indexMetaBuilder); + state = ClusterState.builder(ClusterName.DEFAULT).metadata(metaBuilder).build(); + + dataStreamLifecycleService.run(state); + assertThat(clientSeenRequests.size(), is(2)); // rollover the write index and delete the second generation + assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class)); + assertThat(clientSeenRequests.get(1), instanceOf(DeleteIndexRequest.class)); + assertThat( + ((DeleteIndexRequest) clientSeenRequests.get(1)).indices()[0], + is(DataStream.getDefaultBackingIndexName(dataStreamName, 2)) + ); + } + + { + // a lack of downsample status (i.e. the default `UNKNOWN`) must not prevent retention + Metadata metadata = state.metadata(); + Metadata.Builder metaBuilder = Metadata.builder(metadata); + + String firstBackingIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1); + IndexMetadata indexMetadata = metadata.index(firstBackingIndex); + IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(indexMetadata); + indexMetaBuilder.settings( + Settings.builder().put(indexMetadata.getSettings()).putNull(IndexMetadata.INDEX_DOWNSAMPLE_STATUS_KEY) + ); + metaBuilder.put(indexMetaBuilder); + state = ClusterState.builder(ClusterName.DEFAULT).metadata(metaBuilder).build(); + + dataStreamLifecycleService.run(state); + assertThat(clientSeenRequests.size(), is(3)); // rollover the write index and delete the other two generations + assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class)); + assertThat(clientSeenRequests.get(1), instanceOf(DeleteIndexRequest.class)); + assertThat( + ((DeleteIndexRequest) clientSeenRequests.get(1)).indices()[0], + is(DataStream.getDefaultBackingIndexName(dataStreamName, 2)) + ); + assertThat(clientSeenRequests.get(2), instanceOf(DeleteIndexRequest.class)); + assertThat( + ((DeleteIndexRequest) clientSeenRequests.get(2)).indices()[0], + is(DataStream.getDefaultBackingIndexName(dataStreamName, 1)) + ); + } + } + public void testIlmManagedIndicesAreSkipped() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3;