From 5105df252466bfe1b343eff7f8bf13fb302b5da6 Mon Sep 17 00:00:00 2001 From: Dimitris Athanasiou Date: Mon, 17 Dec 2018 14:14:53 +0200 Subject: [PATCH] [FEATURE][ML] Split in batches and migrate all jobs and datafeeds Relates #32905 --- .../xpack/ml/MlConfigMigrator.java | 95 +++++++++++-------- .../ml/integration/MlConfigMigratorIT.java | 77 ++++++++++++++- 2 files changed, 132 insertions(+), 40 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java index e17c23da0686e..c3b9626ffd042 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MlConfigMigrator.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -38,6 +39,7 @@ import org.elasticsearch.xpack.core.ml.utils.ToXContentParams; import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider; import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider; +import org.elasticsearch.xpack.ml.utils.ChainTaskExecutor; import java.io.IOException; import java.util.ArrayList; @@ -96,14 +98,14 @@ public class MlConfigMigrator { private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck; private final AtomicBoolean migrationInProgress; - private final AtomicBoolean firstTime; + private final AtomicBoolean tookConfigSnapshot; public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) { this.client = Objects.requireNonNull(client); this.clusterService = Objects.requireNonNull(clusterService); this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService); this.migrationInProgress = new AtomicBoolean(false); - this.firstTime = new AtomicBoolean(true); + this.tookConfigSnapshot = new AtomicBoolean(false); } /** @@ -135,12 +137,7 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener return; } - Collection stoppedDatafeeds = stoppedDatafeedConfigs(clusterState); - Map eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream() - .map(MlConfigMigrator::updateJobForMigration) - .collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a)); - - JobsAndDatafeeds jobsAndDatafeedsToMigrate = limitWrites(stoppedDatafeeds, eligibleJobs); + logger.debug("migrating ml configurations"); ActionListener unMarkMigrationInProgress = ActionListener.wrap( response -> { @@ -153,37 +150,36 @@ public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener } ); - if (firstTime.get()) { - snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap( - response -> { - firstTime.set(false); - migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress); - }, - unMarkMigrationInProgress::onFailure - )); - return; - } + snapshotMlMeta(MlMetadata.getMlMetadata(clusterState), ActionListener.wrap( + response -> { + // We have successfully snapshotted the ML configs so we don't need to try again + tookConfigSnapshot.set(true); - migrate(jobsAndDatafeedsToMigrate, unMarkMigrationInProgress); + List batches = splitInBatches(clusterState); + if (batches.isEmpty()) { + unMarkMigrationInProgress.onResponse(Boolean.FALSE); + return; + } + migrateBatches(batches, unMarkMigrationInProgress); + }, + unMarkMigrationInProgress::onFailure + )); } - private void migrate(JobsAndDatafeeds jobsAndDatafeedsToMigrate, ActionListener listener) { - if (jobsAndDatafeedsToMigrate.totalCount() == 0) { - listener.onResponse(Boolean.FALSE); - return; - } - - logger.debug("migrating ml configurations"); - - writeConfigToIndex(jobsAndDatafeedsToMigrate.datafeedConfigs, jobsAndDatafeedsToMigrate.jobs, ActionListener.wrap( + private void migrateBatches(List batches, ActionListener listener) { + ChainTaskExecutor chainTaskExecutor = new ChainTaskExecutor(EsExecutors.newDirectExecutorService(), true); + for (JobsAndDatafeeds batch : batches) { + chainTaskExecutor.add(chainedListener -> writeConfigToIndex(batch.datafeedConfigs, batch.jobs, ActionListener.wrap( failedDocumentIds -> { - List successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.jobs); + List successfulJobWrites = filterFailedJobConfigWrites(failedDocumentIds, batch.jobs); List successfulDatafeedWrites = - filterFailedDatafeedConfigWrites(failedDocumentIds, jobsAndDatafeedsToMigrate.datafeedConfigs); - removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, listener); + filterFailedDatafeedConfigWrites(failedDocumentIds, batch.datafeedConfigs); + removeFromClusterState(successfulJobWrites, successfulDatafeedWrites, chainedListener); }, - listener::onFailure - )); + chainedListener::onFailure + ))); + } + chainTaskExecutor.execute(ActionListener.wrap(aVoid -> listener.onResponse(true), listener::onFailure)); } // Exposed for testing @@ -208,9 +204,9 @@ public void writeConfigToIndex(Collection datafeedsToMigrate, } private void removeFromClusterState(List jobsToRemoveIds, List datafeedsToRemoveIds, - ActionListener listener) { + ActionListener listener) { if (jobsToRemoveIds.isEmpty() && datafeedsToRemoveIds.isEmpty()) { - listener.onResponse(Boolean.FALSE); + listener.onResponse(null); return; } @@ -244,7 +240,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS logger.info("ml datafeed configurations migrated: {}", removedConfigs.get().removedDatafeedIds); } } - listener.onResponse(Boolean.TRUE); + listener.onResponse(null); } }); } @@ -326,12 +322,17 @@ private IndexRequest indexRequest(ToXContentObject source, String documentId, To // public for testing public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listener) { + if (tookConfigSnapshot.get()) { + listener.onResponse(true); + return; + } + if (mlMetadata.getJobs().isEmpty() && mlMetadata.getDatafeeds().isEmpty()) { - listener.onResponse(Boolean.TRUE); + listener.onResponse(true); return; } - logger.debug("taking a snapshot of mlmetadata"); + logger.debug("taking a snapshot of ml_metadata"); String documentId = "ml-config"; IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.jobStateIndexName(), ElasticsearchMappings.DOC_TYPE, documentId) @@ -345,7 +346,7 @@ public void snapshotMlMeta(MlMetadata mlMetadata, ActionListener listen indexRequest.setSource(builder); } catch (IOException e) { - logger.error("failed to serialise mlmetadata", e); + logger.error("failed to serialise ml_metadata", e); listener.onFailure(e); return; } @@ -437,6 +438,22 @@ public int totalCount() { } } + public static List splitInBatches(ClusterState clusterState) { + Collection stoppedDatafeeds = stoppedDatafeedConfigs(clusterState); + Map eligibleJobs = nonDeletingJobs(closedJobConfigs(clusterState)).stream() + .map(MlConfigMigrator::updateJobForMigration) + .collect(Collectors.toMap(Job::getId, Function.identity(), (a, b) -> a)); + + List batches = new ArrayList<>(); + while (stoppedDatafeeds.isEmpty() == false || eligibleJobs.isEmpty() == false) { + JobsAndDatafeeds batch = limitWrites(stoppedDatafeeds, eligibleJobs); + batches.add(batch); + stoppedDatafeeds.removeAll(batch.datafeedConfigs); + batch.jobs.forEach(job -> eligibleJobs.remove(job.getId())); + } + return batches; + } + /** * Return at most {@link #MAX_BULK_WRITE_SIZE} configs favouring * datafeed and job pairs so if a datafeed is chosen so is its job. diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java index 1dc06e0e2aef6..d98abea55535c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/integration/MlConfigMigratorIT.java @@ -109,7 +109,6 @@ public void testWriteConfigToIndex() throws InterruptedException { } public void testMigrateConfigs() throws InterruptedException, IOException { - // and jobs and datafeeds clusterstate MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); mlMetadata.putJob(buildJobBuilder("job-foo").build(), false); @@ -166,6 +165,82 @@ public void testMigrateConfigs() throws InterruptedException, IOException { assertEquals("df-1", datafeedsHolder.get().get(0).getId()); } + public void testMigrateConfigs_GivenLargeNumberOfJobsAndDatafeeds() throws InterruptedException { + int jobCount = randomIntBetween(150, 201); + int datafeedCount = randomIntBetween(150, jobCount); + + // and jobs and datafeeds clusterstate + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + for (int i = 0; i < jobCount; i++) { + mlMetadata.putJob(buildJobBuilder("job-" + i).build(), false); + } + for (int i = 0; i < datafeedCount; i++) { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("df-" + i, "job-" + i); + builder.setIndices(Collections.singletonList("beats*")); + mlMetadata.putDatafeed(builder.build(), Collections.emptyMap()); + } + + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + + doAnswer(invocation -> { + ClusterStateUpdateTask listener = (ClusterStateUpdateTask) invocation.getArguments()[1]; + listener.clusterStateProcessed("source", mock(ClusterState.class), mock(ClusterState.class)); + return null; + }).when(clusterService).submitStateUpdateTask(eq("remove-migrated-ml-configs"), any()); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + + // do the migration + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); + blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + responseHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertTrue(responseHolder.get()); + + // check the jobs have been migrated + AtomicReference> jobsHolder = new AtomicReference<>(); + JobConfigProvider jobConfigProvider = new JobConfigProvider(client()); + blockingCall(actionListener -> jobConfigProvider.expandJobs("*", true, true, actionListener), + jobsHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertThat(jobsHolder.get(), hasSize(jobCount)); + + // check datafeeds are migrated + DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client(), xContentRegistry()); + AtomicReference> datafeedsHolder = new AtomicReference<>(); + blockingCall(actionListener -> datafeedConfigProvider.expandDatafeedConfigs("*", true, actionListener), + datafeedsHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertThat(datafeedsHolder.get(), hasSize(datafeedCount)); + } + + public void testMigrateConfigs_GivenNoJobsOrDatafeeds() throws InterruptedException { + // Add empty ML metadata + MlMetadata.Builder mlMetadata = new MlMetadata.Builder(); + ClusterState clusterState = ClusterState.builder(new ClusterName("_name")) + .metaData(MetaData.builder() + .putCustom(MlMetadata.TYPE, mlMetadata.build())) + .build(); + + AtomicReference exceptionHolder = new AtomicReference<>(); + AtomicReference responseHolder = new AtomicReference<>(); + + // do the migration + MlConfigMigrator mlConfigMigrator = new MlConfigMigrator(nodeSettings(), client(), clusterService); + blockingCall(actionListener -> mlConfigMigrator.migrateConfigsWithoutTasks(clusterState, actionListener), + responseHolder, exceptionHolder); + + assertNull(exceptionHolder.get()); + assertFalse(responseHolder.get()); + } + public void testMigrateConfigsWithoutTasks_GivenMigrationIsDisabled() throws InterruptedException { Settings settings = Settings.builder().put(nodeSettings()) .put(MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION.getKey(), false)