diff --git a/x-pack/plugin/rollup/build.gradle b/x-pack/plugin/rollup/build.gradle index 18ef7abee5c64..ff9c30ed9a934 100644 --- a/x-pack/plugin/rollup/build.gradle +++ b/x-pack/plugin/rollup/build.gradle @@ -1,6 +1,3 @@ -import com.carrotsearch.gradle.junit4.RandomizedTestingTask -import org.elasticsearch.gradle.BuildPlugin - evaluationDependsOn(xpackModule('core')) apply plugin: 'elasticsearch.esplugin' @@ -23,33 +20,8 @@ dependencies { testCompile project(path: xpackModule('core'), configuration: 'testArtifacts') } -dependencyLicenses { - ignoreSha 'x-pack-core' -} - run { plugin xpackModule('core') } integTest.enabled = false - - -// Instead we create a separate task to run the -// tests based on ESIntegTestCase -task internalClusterTest(type: RandomizedTestingTask, - group: JavaBasePlugin.VERIFICATION_GROUP, - description: 'Multi-node tests', - dependsOn: test.dependsOn) { - configure(BuildPlugin.commonTestConfig(project)) - classpath = project.test.classpath - testClassesDirs = project.test.testClassesDirs - include '**/*IT.class' - systemProperty 'es.set.netty.runtime.available.processors', 'false' -} -check.dependsOn internalClusterTest -internalClusterTest.mustRunAfter test - -// also add an "alias" task to make typing on the command line easier task icTest { -task icTest { - dependsOn internalClusterTest -} diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java deleted file mode 100644 index 157cd6a5b9d1a..0000000000000 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/RollupIT.java +++ /dev/null @@ -1,498 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.rollup; - -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.admin.indices.get.GetIndexResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.SearchAction; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.support.WriteRequest; -import org.elasticsearch.analysis.common.CommonAnalysisPlugin; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.query.MatchAllQueryBuilder; -import org.elasticsearch.license.LicenseService; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.search.aggregations.Aggregation; -import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; -import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.Netty4Plugin; -import org.elasticsearch.xpack.core.XPackSettings; -import org.elasticsearch.xpack.core.rollup.action.DeleteRollupJobAction; -import org.elasticsearch.xpack.core.rollup.action.GetRollupJobsAction; -import org.elasticsearch.xpack.core.rollup.action.PutRollupJobAction; -import org.elasticsearch.xpack.core.rollup.action.RollupSearchAction; -import org.elasticsearch.xpack.core.rollup.action.StartRollupJobAction; -import org.elasticsearch.xpack.core.rollup.action.StopRollupJobAction; -import org.elasticsearch.xpack.core.rollup.job.DateHistoGroupConfig; -import org.elasticsearch.xpack.core.rollup.job.GroupConfig; -import org.elasticsearch.xpack.core.rollup.job.IndexerState; -import org.elasticsearch.xpack.core.rollup.job.MetricConfig; -import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; -import org.elasticsearch.xpack.core.rollup.job.RollupJobStatus; -import org.hamcrest.Matchers; -import org.joda.time.DateTime; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import static org.elasticsearch.search.aggregations.AggregationBuilders.dateHistogram; -import static org.hamcrest.core.IsEqual.equalTo; - -@ThreadLeakScope(ThreadLeakScope.Scope.NONE) -public class RollupIT extends ESIntegTestCase { - - private String taskId = "test-bigID"; - - @Override - protected boolean ignoreExternalCluster() { - return true; - } - - @Override - protected Collection> nodePlugins() { - return Arrays.asList(LocalStateRollup.class, CommonAnalysisPlugin.class, Netty4Plugin.class); - } - - @Override - protected Collection> transportClientPlugins() { - return nodePlugins(); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - Settings.Builder builder = Settings.builder(); - builder.put(XPackSettings.ROLLUP_ENABLED.getKey(), true); - builder.put(XPackSettings.SECURITY_ENABLED.getKey(), false); - builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial"); - return builder.build(); - } - - @Override - protected Settings externalClusterClientSettings() { - return nodeSettings(0); - } - - @Override - protected Settings transportClientSettings() { - return Settings.builder().put(super.transportClientSettings()) - .put(XPackSettings.ROLLUP_ENABLED.getKey(), true) - .put(XPackSettings.SECURITY_ENABLED.getKey(), false) - .build(); - } - - @Before - public void createIndex() { - client().admin().indices().prepareCreate("test-1").addMapping("doc", "{\"doc\": {\"properties\": {" + - "\"date_histo\": {\"type\": \"date\"}, " + - "\"histo\": {\"type\": \"integer\"}, " + - "\"terms\": {\"type\": \"keyword\"}}}}", XContentType.JSON).get(); - client().admin().cluster().prepareHealth("test-1").setWaitForYellowStatus().get(); - - BulkRequestBuilder bulk = client().prepareBulk(); - Map source = new HashMap<>(3); - for (int i = 0; i < 20; i++) { - for (int j = 0; j < 20; j++) { - for (int k = 0; k < 20; k++) { - source.put("date_histo", new DateTime().minusDays(i).toString()); - source.put("histo", Integer.toString(j * 100)); - source.put("terms", Integer.toString(k * 100)); - source.put("foo", k); - bulk.add(new IndexRequest("test-1", "doc").source(source)); - source.clear(); - } - } - } - bulk.get(); - client().admin().indices().prepareRefresh("test-1").get(); - } - - public void testGetJob() throws ExecutionException, InterruptedException { - MetricConfig metricConfig = new MetricConfig.Builder() - .setField("foo") - .setMetrics(Arrays.asList("sum", "min", "max", "avg")) - .build(); - - DateHistoGroupConfig.Builder datehistoGroupConfig = new DateHistoGroupConfig.Builder(); - datehistoGroupConfig.setField("date_histo"); - datehistoGroupConfig.setInterval(new DateHistogramInterval("1d")); - - GroupConfig.Builder groupConfig = new GroupConfig.Builder(); - groupConfig.setDateHisto(datehistoGroupConfig.build()); - - - RollupJobConfig.Builder config = new RollupJobConfig.Builder(); - config.setIndexPattern("test-1"); - config.setRollupIndex("rolled"); - config.setId("testGet"); - config.setGroupConfig(groupConfig.build()); - config.setMetricsConfig(Collections.singletonList(metricConfig)); - config.setCron("* * * * * ? *"); - config.setPageSize(10); - - PutRollupJobAction.Request request = new PutRollupJobAction.Request(); - request.setConfig(config.build()); - client().execute(PutRollupJobAction.INSTANCE, request).get(); - - GetRollupJobsAction.Request getRequest = new GetRollupJobsAction.Request("testGet"); - GetRollupJobsAction.Response response = client().execute(GetRollupJobsAction.INSTANCE, getRequest).get(); - assertThat(response.getJobs().size(), equalTo(1)); - assertThat(response.getJobs().get(0).getJob().getId(), equalTo("testGet")); - } - - public void testIndexPattern() throws Exception { - MetricConfig metricConfig = new MetricConfig.Builder() - .setField("foo") - .setMetrics(Arrays.asList("sum", "min", "max", "avg")) - .build(); - - DateHistoGroupConfig.Builder datehistoGroupConfig = new DateHistoGroupConfig.Builder(); - datehistoGroupConfig.setField("date_histo"); - datehistoGroupConfig.setInterval(new DateHistogramInterval("1d")); - - GroupConfig.Builder groupConfig = new GroupConfig.Builder(); - groupConfig.setDateHisto(datehistoGroupConfig.build()); - - - RollupJobConfig.Builder config = new RollupJobConfig.Builder(); - config.setIndexPattern("test-*"); - config.setId("testIndexPattern"); - config.setRollupIndex("rolled"); - config.setGroupConfig(groupConfig.build()); - config.setMetricsConfig(Collections.singletonList(metricConfig)); - config.setCron("* * * * * ? *"); - config.setPageSize(10); - - PutRollupJobAction.Request request = new PutRollupJobAction.Request(); - request.setConfig(config.build()); - client().execute(PutRollupJobAction.INSTANCE, request).get(); - - StartRollupJobAction.Request startRequest = new StartRollupJobAction.Request("testIndexPattern"); - StartRollupJobAction.Response startResponse = client().execute(StartRollupJobAction.INSTANCE, startRequest).get(); - Assert.assertThat(startResponse.isStarted(), equalTo(true)); - - // Make sure it started - ESTestCase.assertBusy(() -> { - RollupJobStatus rollupJobStatus = getRollupJobStatus("testIndexPattern"); - if (rollupJobStatus == null) { - fail("null"); - } - - IndexerState state = rollupJobStatus.getIndexerState(); - assertTrue(state.equals(IndexerState.STARTED) || state.equals(IndexerState.INDEXING)); - }, 60, TimeUnit.SECONDS); - - // And wait for it to finish - ESTestCase.assertBusy(() -> { - RollupJobStatus rollupJobStatus = getRollupJobStatus("testIndexPattern"); - if (rollupJobStatus == null) { - fail("null"); - } - - IndexerState state = rollupJobStatus.getIndexerState(); - assertTrue(state.equals(IndexerState.STARTED) && rollupJobStatus.getPosition() != null); - }, 60, TimeUnit.SECONDS); - - GetRollupJobsAction.Request getRequest = new GetRollupJobsAction.Request("testIndexPattern"); - GetRollupJobsAction.Response response = client().execute(GetRollupJobsAction.INSTANCE, getRequest).get(); - Assert.assertThat(response.getJobs().size(), equalTo(1)); - Assert.assertThat(response.getJobs().get(0).getJob().getId(), equalTo("testIndexPattern")); - - GetIndexResponse getIndexResponse = client().admin().indices().prepareGetIndex().addIndices("rolled").get(); - Assert.assertThat(getIndexResponse.indices().length, Matchers.greaterThan(0)); - } - - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/30290") - public void testTwoJobsStartStopDeleteOne() throws Exception { - MetricConfig metricConfig = new MetricConfig.Builder() - .setField("foo") - .setMetrics(Arrays.asList("sum", "min", "max", "avg")) - .build(); - - DateHistoGroupConfig.Builder datehistoGroupConfig = new DateHistoGroupConfig.Builder(); - datehistoGroupConfig.setField("date_histo"); - datehistoGroupConfig.setInterval(new DateHistogramInterval("1d")); - - GroupConfig.Builder groupConfig = new GroupConfig.Builder(); - groupConfig.setDateHisto(datehistoGroupConfig.build()); - - - RollupJobConfig.Builder config = new RollupJobConfig.Builder(); - config.setIndexPattern("test-1"); - config.setRollupIndex("rolled"); - config.setId("job1"); - config.setGroupConfig(groupConfig.build()); - config.setMetricsConfig(Collections.singletonList(metricConfig)); - config.setCron("* * * * * ? *"); - config.setPageSize(10); - - PutRollupJobAction.Request request = new PutRollupJobAction.Request(); - request.setConfig(config.build()); - client().execute(PutRollupJobAction.INSTANCE, request).get(); - - RollupJobConfig.Builder config2 = new RollupJobConfig.Builder(); - config2.setIndexPattern("test-1"); - config2.setRollupIndex("rolled"); - config2.setId("job2"); - config2.setGroupConfig(groupConfig.build()); - config2.setMetricsConfig(Collections.singletonList(metricConfig)); - config2.setCron("* * * * * ? *"); - config2.setPageSize(10); - - PutRollupJobAction.Request request2 = new PutRollupJobAction.Request(); - request2.setConfig(config2.build()); - client().execute(PutRollupJobAction.INSTANCE, request2).get(); - - StartRollupJobAction.Request startRequest = new StartRollupJobAction.Request("job1"); - StartRollupJobAction.Response response = client().execute(StartRollupJobAction.INSTANCE, startRequest).get(); - Assert.assertThat(response.isStarted(), equalTo(true)); - - // Make sure it started - ESTestCase.assertBusy(() -> { - RollupJobStatus rollupJobStatus = getRollupJobStatus("job1"); - if (rollupJobStatus == null) { - fail("null"); - } - - IndexerState state = rollupJobStatus.getIndexerState(); - assertTrue(state.equals(IndexerState.STARTED) || state.equals(IndexerState.INDEXING)); - }, 60, TimeUnit.SECONDS); - - //but not the other task - ESTestCase.assertBusy(() -> { - RollupJobStatus rollupJobStatus = getRollupJobStatus("job2"); - - IndexerState state = rollupJobStatus.getIndexerState(); - assertTrue(state.equals(IndexerState.STOPPED)); - }, 60, TimeUnit.SECONDS); - - // Delete the task - DeleteRollupJobAction.Request deleteRequest = new DeleteRollupJobAction.Request("job1"); - DeleteRollupJobAction.Response deleteResponse = client().execute(DeleteRollupJobAction.INSTANCE, deleteRequest).get(); - Assert.assertTrue(deleteResponse.isAcknowledged()); - - // Make sure the first job's task is gone - ESTestCase.assertBusy(() -> { - RollupJobStatus rollupJobStatus = getRollupJobStatus("job1"); - assertTrue(rollupJobStatus == null); - }, 60, TimeUnit.SECONDS); - - // And that we don't see it in the GetJobs API - GetRollupJobsAction.Request getRequest = new GetRollupJobsAction.Request("job1"); - GetRollupJobsAction.Response getResponse = client().execute(GetRollupJobsAction.INSTANCE, getRequest).get(); - Assert.assertThat(getResponse.getJobs().size(), equalTo(0)); - - // But make sure the other job is still there - getRequest = new GetRollupJobsAction.Request("job2"); - getResponse = client().execute(GetRollupJobsAction.INSTANCE, getRequest).get(); - Assert.assertThat(getResponse.getJobs().size(), equalTo(1)); - Assert.assertThat(getResponse.getJobs().get(0).getJob().getId(), equalTo("job2")); - - // and still STOPPED - ESTestCase.assertBusy(() -> { - RollupJobStatus rollupJobStatus = getRollupJobStatus("job2"); - - IndexerState state = rollupJobStatus.getIndexerState(); - assertTrue(state.equals(IndexerState.STOPPED)); - }, 60, TimeUnit.SECONDS); - } - - public void testBig() throws Exception { - - client().admin().indices().prepareCreate("test-big") - .addMapping("test-big", "{\"test-big\": {\"properties\": {\"timestamp\": {\"type\": \"date\"}, " + - "\"thefield\": {\"type\": \"integer\"}}}}", XContentType.JSON) - .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get(); - client().admin().cluster().prepareHealth("test-big").setWaitForYellowStatus().get(); - - client().admin().indices().prepareCreate("test-verify") - .addMapping("test-big", "{\"test-big\": {\"properties\": {\"timestamp\": {\"type\": \"date\"}, " + - "\"thefield\": {\"type\": \"integer\"}}}}", XContentType.JSON) - .setSettings(Settings.builder() - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).get(); - client().admin().cluster().prepareHealth("test-verify").setWaitForYellowStatus().get(); - - BulkRequestBuilder bulk = client().prepareBulk(); - Map source = new HashMap<>(3); - - int numDays = 90; - int numDocsPerDay = 100; - - for (int i = 0; i < numDays; i++) { - DateTime ts = new DateTime().minusDays(i); - for (int j = 0; j < numDocsPerDay; j++) { - - int value = ESTestCase.randomIntBetween(0,100); - source.put("timestamp", ts.toString()); - source.put("thefield", value); - bulk.add(new IndexRequest("test-big", "test-big").source(source)); - bulk.add(new IndexRequest("test-verify", "test-big").source(source)); - source.clear(); - } - - bulk.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - bulk.get(); - bulk = client().prepareBulk(); - logger.info("Day: [" + i + "]: " + ts.toString() + " [" + ts.getMillis() + "]" ); - } - - - client().admin().indices().prepareRefresh("test-big").get(); - client().admin().indices().prepareRefresh("test-verify").get(); - - MetricConfig metricConfig = new MetricConfig.Builder() - .setField("thefield") - .setMetrics(Arrays.asList("sum", "min", "max", "avg")) - .build(); - - DateHistoGroupConfig.Builder datehistoGroupConfig = new DateHistoGroupConfig.Builder(); - datehistoGroupConfig.setField("timestamp"); - datehistoGroupConfig.setInterval(new DateHistogramInterval("1d")); - - GroupConfig.Builder groupConfig = new GroupConfig.Builder(); - groupConfig.setDateHisto(datehistoGroupConfig.build()); - - RollupJobConfig.Builder config = new RollupJobConfig.Builder(); - config.setIndexPattern("test-big"); - config.setRollupIndex("rolled"); - config.setId(taskId); - config.setGroupConfig(groupConfig.build()); - config.setMetricsConfig(Collections.singletonList(metricConfig)); - config.setCron("* * * * * ? *"); - config.setPageSize(1000); - - PutRollupJobAction.Request request = new PutRollupJobAction.Request(); - request.setConfig(config.build()); - client().execute(PutRollupJobAction.INSTANCE, request).get(); - - StartRollupJobAction.Request startRequest = new StartRollupJobAction.Request(taskId); - StartRollupJobAction.Response response = client().execute(StartRollupJobAction.INSTANCE, startRequest).get(); - Assert.assertThat(response.isStarted(), equalTo(true)); - - ESTestCase.assertBusy(() -> { - RollupJobStatus rollupJobStatus = getRollupJobStatus(taskId); - if (rollupJobStatus == null) { - fail("null"); - } - - IndexerState state = rollupJobStatus.getIndexerState(); - logger.error("state: [" + state + "]"); - assertTrue(state.equals(IndexerState.STARTED) && rollupJobStatus.getPosition() != null); - }, 60, TimeUnit.SECONDS); - - RollupJobStatus rollupJobStatus = getRollupJobStatus(taskId); - if (rollupJobStatus == null) { - Assert.fail("rollup job status should not be null"); - } - - client().admin().indices().prepareRefresh("rolled").get(); - - SearchResponse count = client().prepareSearch("rolled").setSize(10).get(); - // total document is numDays minus 1 because we don't build rollup for - // buckets that are not full (bucket for the current day). - Assert.assertThat(count.getHits().totalHits, equalTo(Long.valueOf(numDays-1))); - - if (ESTestCase.randomBoolean()) { - client().admin().indices().prepareDelete("test-big").get(); - client().admin().indices().prepareRefresh().get(); - } - - // Execute the rollup search - SearchRequest rollupRequest = new SearchRequest("rolled") - .source(new SearchSourceBuilder() - .aggregation(dateHistogram("timestamp") - .interval(1000*86400) - .field("timestamp")) - .size(0)); - SearchResponse searchResponse = client().execute(RollupSearchAction.INSTANCE, rollupRequest).get(); - Assert.assertNotNull(searchResponse); - - // And a regular search against the verification index - SearchRequest verifyRequest = new SearchRequest("test-verify") - .source(new SearchSourceBuilder() - .aggregation(dateHistogram("timestamp") - .interval(1000*86400) - .field("timestamp")) - .size(0)); - SearchResponse verifyResponse = client().execute(SearchAction.INSTANCE, verifyRequest).get(); - - Map rollupAggs = searchResponse.getAggregations().asMap(); - - for (Aggregation agg : verifyResponse.getAggregations().asList()) { - Aggregation rollupAgg = rollupAggs.get(agg.getName()); - - Assert.assertNotNull(rollupAgg); - Assert.assertThat(rollupAgg.getType(), equalTo(agg.getType())); - verifyAgg((InternalDateHistogram)agg, (InternalDateHistogram)rollupAgg); - } - - // And a quick sanity check for doc type - SearchRequest rollupRawRequest = new SearchRequest("rolled") - .source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()) - .size(1)); - SearchResponse searchRawResponse = client().execute(SearchAction.INSTANCE, rollupRawRequest).get(); - Assert.assertNotNull(searchRawResponse); - assertThat(searchRawResponse.getHits().getAt(0).getType(), equalTo("_doc")); - } - - private void verifyAgg(InternalDateHistogram verify, InternalDateHistogram rollup) { - for (int i = 0; i < rollup.getBuckets().size(); i++) { - InternalDateHistogram.Bucket verifyBucket = verify.getBuckets().get(i); - InternalDateHistogram.Bucket rollupBucket = rollup.getBuckets().get(i); - Assert.assertThat(rollupBucket.getDocCount(), equalTo(verifyBucket.getDocCount())); - Assert.assertThat(((DateTime)rollupBucket.getKey()).getMillis(), equalTo(((DateTime)verifyBucket.getKey()).getMillis())); - Assert.assertTrue(rollupBucket.getAggregations().equals(verifyBucket.getAggregations())); - } - } - - private RollupJobStatus getRollupJobStatus(final String taskId) { - final GetRollupJobsAction.Request request = new GetRollupJobsAction.Request(taskId); - final GetRollupJobsAction.Response response = client().execute(GetRollupJobsAction.INSTANCE, request).actionGet(); - - if (response.getJobs() != null && response.getJobs().isEmpty() == false) { - assertThat("Expect 1 rollup job with id " + taskId, response.getJobs().size(), equalTo(1)); - return response.getJobs().iterator().next().getStatus(); - } - return null; - } - - @After - public void cleanup() throws ExecutionException, InterruptedException { - GetRollupJobsAction.Request getRequest = new GetRollupJobsAction.Request("_all"); - GetRollupJobsAction.Response response = client().execute(GetRollupJobsAction.INSTANCE, getRequest).get(); - - for (GetRollupJobsAction.JobWrapper job : response.getJobs()) { - StopRollupJobAction.Request stopRequest = new StopRollupJobAction.Request(job.getJob().getId()); - try { - client().execute(StopRollupJobAction.INSTANCE, stopRequest).get(); - } catch (ElasticsearchException e) { - // - } - - DeleteRollupJobAction.Request deleteRequest = new DeleteRollupJobAction.Request(job.getJob().getId()); - client().execute(DeleteRollupJobAction.INSTANCE, deleteRequest).get(); - } - } -} diff --git a/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java b/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java new file mode 100644 index 0000000000000..b0142ae141853 --- /dev/null +++ b/x-pack/qa/multi-node/src/test/java/org/elasticsearch/multi_node/RollupIT.java @@ -0,0 +1,326 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.multi_node; + +import org.apache.http.HttpStatus; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.xpack.core.rollup.job.RollupJob; +import org.elasticsearch.xpack.core.watcher.support.xcontent.ObjectPath; +import org.junit.After; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isOneOf; + +public class RollupIT extends ESRestTestCase { + + @Override + protected Settings restClientSettings() { + return getClientSettings("super-user", "x-pack-super-password"); + } + + @Override + protected Settings restAdminSettings() { + return getClientSettings("super-user", "x-pack-super-password"); + } + + private Settings getClientSettings(final String username, final String password) { + final String token = basicAuthHeaderValue(username, new SecureString(password.toCharArray())); + return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); + } + + static Map toMap(Response response) throws IOException { + return toMap(EntityUtils.toString(response.getEntity())); + } + + static Map toMap(String response) throws IOException { + return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false); + } + + @After + public void clearRollupMetadata() throws Exception { + deleteAllJobs(); + waitForPendingTasks(); + // indices will be deleted by the ESRestTestCase class + } + + public void testBigRollup() throws Exception { + final int numDocs = 200; + + // index documents for the rollup job + final StringBuilder bulk = new StringBuilder(); + for (int i = 0; i < numDocs; i++) { + bulk.append("{\"index\":{\"_index\":\"rollup-docs\",\"_type\":\"_doc\"}}\n"); + ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochSecond(1531221196 + (60*i)), ZoneId.of("UTC")); + String date = zdt.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); + bulk.append("{\"timestamp\":\"").append(date).append("\",\"value\":").append(i).append("}\n"); + } + bulk.append("\r\n"); + + final Request bulkRequest = new Request("POST", "/_bulk"); + bulkRequest.addParameter("refresh", "true"); + bulkRequest.setJsonEntity(bulk.toString()); + client().performRequest(bulkRequest); + // create the rollup job + final Request createRollupJobRequest = new Request("PUT", "/_xpack/rollup/job/rollup-job-test"); + createRollupJobRequest.setJsonEntity("{" + + "\"index_pattern\":\"rollup-*\"," + + "\"rollup_index\":\"results-rollup\"," + + "\"cron\":\"*/1 * * * * ?\"," // fast cron and big page size so test runs quickly + + "\"page_size\":20," + + "\"groups\":{" + + " \"date_histogram\":{" + + " \"field\":\"timestamp\"," + + " \"interval\":\"5m\"" + + " }" + + "}," + + "\"metrics\":[" + + " {\"field\":\"value\",\"metrics\":[\"min\",\"max\",\"sum\"]}" + + "]" + + "}"); + + Map createRollupJobResponse = toMap(client().performRequest(createRollupJobRequest)); + assertThat(createRollupJobResponse.get("acknowledged"), equalTo(Boolean.TRUE)); + + // start the rollup job + final Request startRollupJobRequest = new Request("POST", "_xpack/rollup/job/rollup-job-test/_start"); + Map startRollupJobResponse = toMap(client().performRequest(startRollupJobRequest)); + assertThat(startRollupJobResponse.get("started"), equalTo(Boolean.TRUE)); + + assertRollUpJob("rollup-job-test"); + + // Wait for the job to finish, by watching how many rollup docs we've indexed + assertBusy(() -> { + final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/rollup-job-test"); + Response getRollupJobResponse = client().performRequest(getRollupJobRequest); + assertThat(getRollupJobResponse.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + + Map job = getJob(getRollupJobResponse, "rollup-job-test"); + if (job != null) { + assertThat(ObjectPath.eval("status.job_state", job), equalTo("started")); + assertThat(ObjectPath.eval("stats.rollups_indexed", job), equalTo(41)); + } + }, 30L, TimeUnit.SECONDS); + + // Refresh the rollup index to make sure all newly indexed docs are searchable + final Request refreshRollupIndex = new Request("POST", "results-rollup/_refresh"); + toMap(client().performRequest(refreshRollupIndex)); + + String jsonRequestBody = "{\n" + + " \"size\": 0,\n" + + " \"query\": {\n" + + " \"match_all\": {}\n" + + " },\n" + + " \"aggs\": {\n" + + " \"date_histo\": {\n" + + " \"date_histogram\": {\n" + + " \"field\": \"timestamp\",\n" + + " \"interval\": \"1h\"\n" + + " },\n" + + " \"aggs\": {\n" + + " \"the_max\": {\n" + + " \"max\": {\n" + + " \"field\": \"value\"\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + " }\n" + + "}"; + + Request request = new Request("GET", "rollup-docs/_search"); + request.setJsonEntity(jsonRequestBody); + Response liveResponse = client().performRequest(request); + Map liveBody = toMap(liveResponse); + + request = new Request("GET", "results-rollup/_rollup_search"); + request.setJsonEntity(jsonRequestBody); + Response rollupResponse = client().performRequest(request); + Map rollupBody = toMap(rollupResponse); + + // Do the live agg results match the rollup agg results? + assertThat(ObjectPath.eval("aggregations.date_histo.buckets", liveBody), + equalTo(ObjectPath.eval("aggregations.date_histo.buckets", rollupBody))); + + request = new Request("GET", "rollup-docs/_rollup_search"); + request.setJsonEntity(jsonRequestBody); + Response liveRollupResponse = client().performRequest(request); + Map liveRollupBody = toMap(liveRollupResponse); + + // Does searching the live index via rollup_search work match the live search? + assertThat(ObjectPath.eval("aggregations.date_histo.buckets", liveBody), + equalTo(ObjectPath.eval("aggregations.date_histo.buckets", liveRollupBody))); + + } + + @SuppressWarnings("unchecked") + private void assertRollUpJob(final String rollupJob) throws Exception { + String[] states = new String[]{"indexing", "started"}; + waitForRollUpJob(rollupJob, states); + + // check that the rollup job is started using the RollUp API + final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob); + Map getRollupJobResponse = toMap(client().performRequest(getRollupJobRequest)); + Map job = getJob(getRollupJobResponse, rollupJob); + if (job != null) { + assertThat(ObjectPath.eval("status.job_state", job), isOneOf(states)); + } + + // check that the rollup job is started using the Tasks API + final Request taskRequest = new Request("GET", "_tasks"); + taskRequest.addParameter("detailed", "true"); + taskRequest.addParameter("actions", "xpack/rollup/*"); + Map taskResponse = toMap(client().performRequest(taskRequest)); + Map taskResponseNodes = (Map) taskResponse.get("nodes"); + Map taskResponseNode = (Map) taskResponseNodes.values().iterator().next(); + Map taskResponseTasks = (Map) taskResponseNode.get("tasks"); + Map taskResponseStatus = (Map) taskResponseTasks.values().iterator().next(); + assertThat(ObjectPath.eval("status.job_state", taskResponseStatus), isOneOf(states)); + + // check that the rollup job is started using the Cluster State API + final Request clusterStateRequest = new Request("GET", "_cluster/state/metadata"); + Map clusterStateResponse = toMap(client().performRequest(clusterStateRequest)); + List> rollupJobTasks = ObjectPath.eval("metadata.persistent_tasks.tasks", clusterStateResponse); + + boolean hasRollupTask = false; + for (Map task : rollupJobTasks) { + if (ObjectPath.eval("id", task).equals(rollupJob)) { + hasRollupTask = true; + + final String jobStateField = "task.xpack/rollup/job.state.job_state"; + assertThat("Expected field [" + jobStateField + "] to be started or indexing in " + task.get("id"), + ObjectPath.eval(jobStateField, task), isOneOf(states)); + break; + } + } + if (hasRollupTask == false) { + fail("Expected persistent task for [" + rollupJob + "] but none found."); + } + + } + + private void waitForRollUpJob(final String rollupJob,String[] expectedStates) throws Exception { + assertBusy(() -> { + final Request getRollupJobRequest = new Request("GET", "_xpack/rollup/job/" + rollupJob); + Response getRollupJobResponse = client().performRequest(getRollupJobRequest); + assertThat(getRollupJobResponse.getStatusLine().getStatusCode(), equalTo(RestStatus.OK.getStatus())); + + Map job = getJob(getRollupJobResponse, rollupJob); + if (job != null) { + assertThat(ObjectPath.eval("status.job_state", job), isOneOf(expectedStates)); + } + }, 30L, TimeUnit.SECONDS); + } + + private Map getJob(Response response, String targetJobId) throws IOException { + return getJob(ESRestTestCase.entityAsMap(response), targetJobId); + } + + @SuppressWarnings("unchecked") + private Map getJob(Map jobsMap, String targetJobId) throws IOException { + + List> jobs = + (List>) XContentMapValues.extractValue("jobs", jobsMap); + + if (jobs == null) { + return null; + } + + for (Map job : jobs) { + String jobId = (String) ((Map) job.get("config")).get("id"); + if (jobId.equals(targetJobId)) { + return job; + } + } + return null; + } + + private void waitForPendingTasks() throws Exception { + ESTestCase.assertBusy(() -> { + try { + Request request = new Request("GET", "/_cat/tasks"); + request.addParameter("detailed", "true"); + Response response = adminClient().performRequest(request); + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + try (BufferedReader responseReader = new BufferedReader( + new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) { + int activeTasks = 0; + String line; + StringBuilder tasksListString = new StringBuilder(); + while ((line = responseReader.readLine()) != null) { + + // We only care about Rollup jobs, otherwise this fails too easily due to unrelated tasks + if (line.startsWith(RollupJob.NAME) == true) { + activeTasks++; + tasksListString.append(line); + tasksListString.append('\n'); + } + } + assertEquals(activeTasks + " active tasks found:\n" + tasksListString, 0, activeTasks); + } + } + } catch (IOException e) { + throw new AssertionError("Error getting active tasks list", e); + } + }); + } + + @SuppressWarnings("unchecked") + private void deleteAllJobs() throws Exception { + Request request = new Request("GET", "/_xpack/rollup/job/_all"); + Response response = adminClient().performRequest(request); + Map jobs = ESRestTestCase.entityAsMap(response); + @SuppressWarnings("unchecked") + List> jobConfigs = + (List>) XContentMapValues.extractValue("jobs", jobs); + + if (jobConfigs == null) { + return; + } + + for (Map jobConfig : jobConfigs) { + logger.debug(jobConfig); + String jobId = (String) ((Map) jobConfig.get("config")).get("id"); + logger.debug("Deleting job " + jobId); + try { + request = new Request("DELETE", "/_xpack/rollup/job/" + jobId); + adminClient().performRequest(request); + } catch (Exception e) { + // ok + } + } + } + + private static String responseEntityToString(Response response) throws Exception { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) { + return reader.lines().collect(Collectors.joining("\n")); + } + } +}