diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java index 8ffbb7184b4b3..6d4014a1d7e4c 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/MlMigrationIT.java @@ -204,12 +204,15 @@ private void upgradedClusterTests() throws Exception { datafeedStarted = startMigratedDatafeed(OLD_CLUSTER_STOPPED_DATAFEED_ID); } - // wait for the closed and open jobs and datafeed to be migrated - waitForMigration(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID), - Arrays.asList(OLD_CLUSTER_STOPPED_DATAFEED_ID, OLD_CLUSTER_STARTED_DATAFEED_ID)); + // wait for the closed job and datafeed to be migrated + waitForMigration(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_STOPPED_DATAFEED_ID); - checkTaskParamsAreUpdated(OLD_CLUSTER_OPEN_JOB_ID, OLD_CLUSTER_STARTED_DATAFEED_ID); - checkJobsMarkedAsMigrated(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID)); + // The open job and datafeed may or may not be migrated depending on how they were allocated. + // Migration will only occur once all nodes in the cluster are v6.6.0 or higher + // open jobs will only be migrated once they become unallocated. The open job + // will only meet these conditions if it is running on the last node to be + // upgraded + waitForPossibleMigration(OLD_CLUSTER_OPEN_JOB_ID, OLD_CLUSTER_STARTED_DATAFEED_ID); // the job and datafeed left open during upgrade should // be assigned to a node @@ -234,6 +237,11 @@ private void upgradedClusterTests() throws Exception { Request closeJob = new Request("POST", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID + "/_close"); client().performRequest(closeJob); + // if the open job wasn't migrated previously it should be now after it has been closed + waitForMigration(OLD_CLUSTER_OPEN_JOB_ID, OLD_CLUSTER_STARTED_DATAFEED_ID); + checkJobsMarkedAsMigrated(Arrays.asList(OLD_CLUSTER_CLOSED_JOB_ID, OLD_CLUSTER_OPEN_JOB_ID)); + + // and the job left open can be deleted Request deleteDatafeed = new Request("DELETE", "_xpack/ml/datafeeds/" + OLD_CLUSTER_STARTED_DATAFEED_ID); client().performRequest(deleteDatafeed); Request deleteJob = new Request("DELETE", "_xpack/ml/anomaly_detectors/" + OLD_CLUSTER_OPEN_JOB_ID); @@ -323,7 +331,7 @@ private void checkJobsMarkedAsMigrated(List jobIds) throws IOException { } @SuppressWarnings("unchecked") - private void checkTaskParamsAreUpdated(String jobId, String datafeedId) throws Exception { + private void checkTaskParams(String jobId, String datafeedId, boolean expectedUpdated) throws Exception { Request getClusterState = new Request("GET", "/_cluster/state/metadata"); Response response = client().performRequest(getClusterState); Map responseMap = entityAsMap(response); @@ -336,12 +344,21 @@ private void checkTaskParamsAreUpdated(String jobId, String datafeedId) throws E assertNotNull(id); if (id.equals(MlTasks.jobTaskId(jobId))) { Object jobParam = XContentMapValues.extractValue("task.xpack/ml/job.params.job", task); - assertNotNull(jobParam); + if (expectedUpdated) { + assertNotNull(jobParam); + } else { + assertNull(jobParam); + } } else if (id.equals(MlTasks.datafeedTaskId(datafeedId))) { Object jobIdParam = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.job_id", task); - assertNotNull(jobIdParam); Object indices = XContentMapValues.extractValue("task.xpack/ml/datafeed.params.indices", task); - assertNotNull(indices); + if (expectedUpdated) { + assertNotNull(jobIdParam); + assertNotNull(indices); + } else { + assertNull(jobIdParam); + assertNull(indices); + } } } } @@ -373,7 +390,7 @@ private void assertConfigInClusterState() throws IOException { } @SuppressWarnings("unchecked") - private void waitForMigration(List expectedMigratedJobs, List expectedMigratedDatafeeds) throws Exception { + private void waitForMigration(String expectedMigratedJobId, String expectedMigratedDatafeedId) throws Exception { assertBusy(() -> { // wait for the eligible configs to be moved from the clusterstate Request getClusterState = new Request("GET", "/_cluster/state/metadata"); @@ -384,23 +401,55 @@ private void waitForMigration(List expectedMigratedJobs, List ex (List>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap); if (jobs != null) { - for (String jobId : expectedMigratedJobs) { - assertJobMigrated(jobId, jobs); - } + assertJobMigrated(expectedMigratedJobId, jobs); } List> datafeeds = (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap); if (datafeeds != null) { - for (String datafeedId : expectedMigratedDatafeeds) { - assertDatafeedMigrated(datafeedId, datafeeds); - } + assertDatafeedMigrated(expectedMigratedDatafeedId, datafeeds); } }, 30, TimeUnit.SECONDS); } + @SuppressWarnings("unchecked") + private void waitForPossibleMigration(String perhapsMigratedJobId, String perhapsMigratedDatafeedId) throws Exception { + assertBusy(() -> { + Request getClusterState = new Request("GET", "/_cluster/state/metadata"); + Response response = client().performRequest(getClusterState); + Map responseMap = entityAsMap(response); + + List> jobs = + (List>) XContentMapValues.extractValue("metadata.ml.jobs", responseMap); + + boolean jobMigrated = true; + if (jobs != null) { + jobMigrated = jobs.stream().map(map -> map.get("job_id")) + .noneMatch(id -> id.equals(perhapsMigratedJobId)); + } + + List> datafeeds = + (List>) XContentMapValues.extractValue("metadata.ml.datafeeds", responseMap); + + boolean datafeedMigrated = true; + if (datafeeds != null) { + datafeedMigrated = datafeeds.stream().map(map -> map.get("datafeed_id")) + .noneMatch(id -> id.equals(perhapsMigratedDatafeedId)); + } + + if (jobMigrated) { + // if the job is migrated the datafeed should also be + assertTrue(datafeedMigrated); + checkJobsMarkedAsMigrated(Collections.singletonList(perhapsMigratedJobId)); + } + + // if migrated the persistent task params should have been updated + checkTaskParams(perhapsMigratedJobId, perhapsMigratedDatafeedId, jobMigrated); + }); + } + @SuppressWarnings("unchecked") private void waitForJobToBeAssigned(String jobId) throws Exception { assertBusy(() -> {