diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterConfiguration.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterConfiguration.groovy index 0a53787c10597..e64f30d48bd3f 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterConfiguration.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterConfiguration.groovy @@ -118,11 +118,16 @@ class ClusterConfiguration { if (seedNode == node) { return null } - ant.waitfor(maxwait: '40', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond') { + ant.waitfor(maxwait: '40', maxwaitunit: 'second', checkevery: '500', checkeveryunit: 'millisecond', + timeoutproperty: "failed.${seedNode.transportPortsFile.path}") { resourceexists { file(file: seedNode.transportPortsFile.toString()) } } + if (ant.properties.containsKey("failed.${seedNode.transportPortsFile.path}".toString())) { + throw new GradleException("Failed to locate seed node transport file [${seedNode.transportPortsFile}]: " + + "timed out waiting for it to be created after ${waitSeconds} seconds") + } return seedNode.transportUri() } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/vagrant/VagrantTestPlugin.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/vagrant/VagrantTestPlugin.groovy index 603e217ecda86..210fb939c7113 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/vagrant/VagrantTestPlugin.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/vagrant/VagrantTestPlugin.groovy @@ -547,7 +547,11 @@ class VagrantTestPlugin implements Plugin { project.gradle.removeListener(batsPackagingReproListener) } if (project.extensions.esvagrant.boxes.contains(box)) { - packagingTest.dependsOn(batsPackagingTest) + // these tests are temporarily disabled for suse boxes while we debug an issue + // https://github.com/elastic/elasticsearch/issues/30295 + if (box.equals("opensuse-42") == false && box.equals("sles-12") == false) { + packagingTest.dependsOn(batsPackagingTest) + } } } @@ -586,7 +590,11 @@ class VagrantTestPlugin implements Plugin { project.gradle.removeListener(javaPackagingReproListener) } if (project.extensions.esvagrant.boxes.contains(box)) { - packagingTest.dependsOn(javaPackagingTest) + // these tests are temporarily disabled for suse boxes while we debug an issue + // https://github.com/elastic/elasticsearch/issues/30295 + if (box.equals("opensuse-42") == false && box.equals("sles-12") == false) { + packagingTest.dependsOn(javaPackagingTest) + } } /* diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/SyncTestClustersConfiguration.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/SyncTestClustersConfiguration.java deleted file mode 100644 index d1a86a38c66ff..0000000000000 --- a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/SyncTestClustersConfiguration.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.gradle.testclusters; - -import org.gradle.api.DefaultTask; -import org.gradle.api.Project; -import org.gradle.api.file.FileCollection; -import org.gradle.api.tasks.InputFiles; -import org.gradle.api.tasks.OutputDirectory; -import org.gradle.api.tasks.TaskAction; - -import java.io.File; -import java.util.Set; -import java.util.stream.Collectors; - -public class SyncTestClustersConfiguration extends DefaultTask { - - @InputFiles - public FileCollection getDependencies() { - Set nonZip = getProject().getConfigurations() - .getByName(TestClustersPlugin.HELPER_CONFIGURATION_NAME) - .getFiles() - .stream() - .filter(file -> file.getName().endsWith(".zip") == false) - .collect(Collectors.toSet()); - if(nonZip.isEmpty() == false) { - throw new IllegalStateException("Expected only zip files in configuration : " + - TestClustersPlugin.HELPER_CONFIGURATION_NAME + " but found " + - nonZip - ); - } - return getProject().files( - getProject().getConfigurations() - .getByName(TestClustersPlugin.HELPER_CONFIGURATION_NAME) - .getFiles() - ); - } - - @OutputDirectory - public File getOutputDir() { - return getTestClustersConfigurationExtractDir(getProject()); - } - - @TaskAction - public void doExtract() { - File outputDir = getOutputDir(); - getProject().delete(outputDir); - outputDir.mkdirs(); - getDependencies().forEach(dep -> - getProject().copy(spec -> { - spec.from(getProject().zipTree(dep)); - spec.into(new File(outputDir, "zip")); - }) - ); - } - - static File getTestClustersConfigurationExtractDir(Project project) { - return new File(TestClustersPlugin.getTestClustersBuildDir(project), "extract"); - } - -} diff --git a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java index 3abc9a6a6177e..56ff501a3885c 100644 --- a/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java +++ b/buildSrc/src/main/java/org/elasticsearch/gradle/testclusters/TestClustersPlugin.java @@ -25,11 +25,14 @@ import org.gradle.api.Project; import org.gradle.api.Task; import org.gradle.api.artifacts.Configuration; +import org.gradle.api.artifacts.component.ComponentArtifactIdentifier; import org.gradle.api.execution.TaskActionListener; import org.gradle.api.execution.TaskExecutionListener; +import org.gradle.api.file.FileTree; import org.gradle.api.logging.Logger; import org.gradle.api.logging.Logging; import org.gradle.api.plugins.ExtraPropertiesExtension; +import org.gradle.api.tasks.Sync; import org.gradle.api.tasks.TaskState; import java.io.File; @@ -39,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -87,6 +91,20 @@ public void apply(Project project) { "Internal helper configuration used by cluster configuration to download " + "ES distributions and plugins." ); + helperConfiguration.getIncoming().afterResolve(resolvableDependencies -> { + Set nonZipComponents = resolvableDependencies.getArtifacts() + .getArtifacts() + .stream() + .filter(artifact -> artifact.getFile().getName().endsWith(".zip") == false) + .map(artifact -> artifact.getId()) + .collect(Collectors.toSet()); + + if(nonZipComponents.isEmpty() == false) { + throw new IllegalStateException("Dependencies with non-zip artifacts found in configuration '" + + TestClustersPlugin.HELPER_CONFIGURATION_NAME + "': " + nonZipComponents + ); + } + }); // When running in the Daemon it's possible for this to hold references to past usedClusters.clear(); @@ -98,7 +116,15 @@ public void apply(Project project) { // the clusters will look for artifacts there based on the naming conventions. // Tasks that use a cluster will add this as a dependency automatically so it's guaranteed to run early in // the build. - rootProject.getTasks().create(SYNC_ARTIFACTS_TASK_NAME, SyncTestClustersConfiguration.class); + rootProject.getTasks().create(SYNC_ARTIFACTS_TASK_NAME, Sync.class, sync -> { + sync.from((Callable>) () -> + helperConfiguration.getFiles() + .stream() + .map(project::zipTree) + .collect(Collectors.toList()) + ); + sync.into(new File(getTestClustersConfigurationExtractDir(project), "zip")); + }); // When we know what tasks will run, we claim the clusters of those task to differentiate between clusters // that are defined in the build script and the ones that will actually be used in this invocation of gradle @@ -129,7 +155,7 @@ private NamedDomainObjectContainer createTestClustersContaine project.getPath(), name, GradleServicesAdapter.getInstance(project), - SyncTestClustersConfiguration.getTestClustersConfigurationExtractDir(project), + getTestClustersConfigurationExtractDir(project), new File(project.getBuildDir(), "testclusters") ) ); @@ -249,8 +275,8 @@ public void beforeExecute(Task task) {} ); } - static File getTestClustersBuildDir(Project project) { - return new File(project.getRootProject().getBuildDir(), "testclusters"); + static File getTestClustersConfigurationExtractDir(Project project) { + return new File(project.getRootProject().getBuildDir(), "testclusters/extract"); } /** diff --git a/docs/reference/analysis/tokenfilters/edgengram-tokenfilter.asciidoc b/docs/reference/analysis/tokenfilters/edgengram-tokenfilter.asciidoc index be37d24f7dd7c..e460725523cf6 100644 --- a/docs/reference/analysis/tokenfilters/edgengram-tokenfilter.asciidoc +++ b/docs/reference/analysis/tokenfilters/edgengram-tokenfilter.asciidoc @@ -1,9 +1,9 @@ [[analysis-edgengram-tokenfilter]] === Edge NGram Token Filter -A token filter of type `edgeNGram`. +A token filter of type `edge_ngram`. -The following are settings that can be set for a `edgeNGram` token +The following are settings that can be set for a `edge_ngram` token filter type: [cols="<,<",options="header",] diff --git a/docs/reference/analysis/tokenfilters/ngram-tokenfilter.asciidoc b/docs/reference/analysis/tokenfilters/ngram-tokenfilter.asciidoc index acc178a2741fa..53bda23d12bf9 100644 --- a/docs/reference/analysis/tokenfilters/ngram-tokenfilter.asciidoc +++ b/docs/reference/analysis/tokenfilters/ngram-tokenfilter.asciidoc @@ -1,9 +1,9 @@ [[analysis-ngram-tokenfilter]] === NGram Token Filter -A token filter of type `nGram`. +A token filter of type `ngram`. -The following are settings that can be set for a `nGram` token filter +The following are settings that can be set for a `ngram` token filter type: [cols="<,<",options="header",] diff --git a/docs/reference/images/sql/rest/console-triple-quotes.png b/docs/reference/images/sql/rest/console-triple-quotes.png new file mode 100644 index 0000000000000..4a13acb986114 Binary files /dev/null and b/docs/reference/images/sql/rest/console-triple-quotes.png differ diff --git a/docs/reference/index-modules/allocation/filtering.asciidoc b/docs/reference/index-modules/allocation/filtering.asciidoc index 9e7a67946a997..0ae331d0e446d 100644 --- a/docs/reference/index-modules/allocation/filtering.asciidoc +++ b/docs/reference/index-modules/allocation/filtering.asciidoc @@ -49,6 +49,7 @@ settings support three types of filters: `include`, `exclude`, and `require`. For example, to tell {es} to allocate shards from the `test` index to either `big` or `medium` nodes, use `index.routing.allocation.include`: + +-- [source,js] ------------------------ PUT test/_settings @@ -58,11 +59,11 @@ PUT test/_settings ------------------------ // CONSOLE // TEST[s/^/PUT test\n/] -+ + If you specify multiple filters, all conditions must be satisfied for shards to be relocated. For example, to move the `test` index to `big` nodes in `rack1`, you could specify: -+ + [source,js] ------------------------ PUT test/_settings @@ -73,6 +74,7 @@ PUT test/_settings ------------------------ // CONSOLE // TEST[s/^/PUT test\n/] +-- [float] [[index-allocation-settings]] diff --git a/docs/reference/migration/migrate_8_0.asciidoc b/docs/reference/migration/migrate_8_0.asciidoc index 4477090dc16b6..ea1166e36d3b9 100644 --- a/docs/reference/migration/migrate_8_0.asciidoc +++ b/docs/reference/migration/migrate_8_0.asciidoc @@ -9,4 +9,8 @@ your application to {es} 8.0. See also <> and <>. -coming[8.0.0] \ No newline at end of file +coming[8.0.0] + +* <> + +include::migrate_8_0/mappings.asciidoc[] \ No newline at end of file diff --git a/docs/reference/migration/migrate_8_0/mappings.asciidoc b/docs/reference/migration/migrate_8_0/mappings.asciidoc new file mode 100644 index 0000000000000..ea36309447e4c --- /dev/null +++ b/docs/reference/migration/migrate_8_0/mappings.asciidoc @@ -0,0 +1,10 @@ +[float] +[[breaking_80_mappings_changes]] +=== Mapping changes + +[float] +==== The `nGram` and `edgeNGram` token filter names have been removed + +The `nGram` and `edgeNGram` token filter names that have been deprecated since +version 6.4 have been removed. Both token filters should be used by their +alternative names `ngram` and `edge_ngram` instead. \ No newline at end of file diff --git a/docs/reference/search/suggesters/completion-suggest.asciidoc b/docs/reference/search/suggesters/completion-suggest.asciidoc index c0b527c06e550..b27e6f0ef0b54 100644 --- a/docs/reference/search/suggesters/completion-suggest.asciidoc +++ b/docs/reference/search/suggesters/completion-suggest.asciidoc @@ -43,6 +43,7 @@ PUT music Mapping supports the following parameters: +[horizontal] `analyzer`:: The index analyzer to use, defaults to `simple`. In case you are wondering why we did not opt for the `standard` @@ -70,7 +71,7 @@ Mapping supports the following parameters: Limits the length of a single input, defaults to `50` UTF-16 code points. This limit is only used at index time to reduce the total number of characters per input string in order to prevent massive inputs from - bloating the underlying datastructure. Most usecases won't be influenced + bloating the underlying datastructure. Most use cases won't be influenced by the default value since prefix completions seldom grow beyond prefixes longer than a handful of characters. @@ -97,6 +98,7 @@ PUT music/_doc/1?refresh The following parameters are supported: +[horizontal] `input`:: The input to store, this can be an array of strings or just a string. This field is mandatory. @@ -285,6 +287,7 @@ Which should look like: The basic completion suggester query supports the following parameters: +[horizontal] `field`:: The name of the field on which to run the query (required). `size`:: The number of suggestions to return (defaults to `5`). `skip_duplicates`:: Whether duplicate suggestions should be filtered out (defaults to `false`). @@ -326,13 +329,13 @@ POST music/_search?pretty -------------------------------------------------- // CONSOLE -WARNING: when set to true this option can slow down search because more suggestions +WARNING: When set to true, this option can slow down search because more suggestions need to be visited to find the top N. [[fuzzy]] ==== Fuzzy queries -The completion suggester also supports fuzzy queries - this means, +The completion suggester also supports fuzzy queries -- this means you can have a typo in your search and still get results back. [source,js] diff --git a/docs/reference/sql/endpoints/rest.asciidoc b/docs/reference/sql/endpoints/rest.asciidoc index 1cc42c3071b44..e21071a49dbbc 100644 --- a/docs/reference/sql/endpoints/rest.asciidoc +++ b/docs/reference/sql/endpoints/rest.asciidoc @@ -4,8 +4,8 @@ == SQL REST API The SQL REST API accepts SQL in a JSON document, executes it, -and returns the results. For example: - +and returns the results. +For example: [source,js] -------------------------------------------------- @@ -32,19 +32,68 @@ James S.A. Corey |Leviathan Wakes |561 |2011-06-02T00:00:00.000Z // TESTRESPONSE[s/\|/\\|/ s/\+/\\+/] // TESTRESPONSE[_cat] -While the `text/plain` format is nice for humans, computers prefer something -more structured. You can replace the value of `format` with: -- `json` aka `application/json` -- `yaml` aka `application/yaml` -- `smile` aka `application/smile` -- `cbor` aka `application/cbor` -- `txt` aka `text/plain` -- `csv` aka `text/csv` -- `tsv` aka `text/tab-separated-values` +[[sql-kibana-console]] +.Using Kibana Console +If you are using {kibana-ref}/console-kibana.html[Kibana Console]. +(which is highly recommended), take advantage of the +triple quotes `"""` when creating the query. This not only automatically escapes double +quotes (`"`) inside the query string but also support multi-line as shown below: +image:images/sql/rest/console-triple-quotes.png[] + +[[sql-rest-format]] +[float] +=== Response Data Formats + +While the textual format is nice for humans, computers prefer something +more structured. + +{es-sql} can return the data in the following formats which can be set +either through the `format` property in the URL or by setting the `Accept` HTTP header: + +NOTE: The URL parameter takes precedence over the `Accept` HTTP header. +If neither is specified then the response is returned in the same format as the request. + +[cols="^m,^4m,^8"] + +|=== +s|format +s|`Accept` HTTP header +s|Description + +3+h| Human Readable + +|csv +|text/csv +|https://en.wikipedia.org/wiki/Comma-separated_values[Comma-separated values] + +|json +|application/json +|https://www.json.org/[JSON] (JavaScript Object Notation) human-readable format + +|tsv +|text/tab-separated-values +|https://en.wikipedia.org/wiki/Tab-separated_values[Tab-separated values] + +|txt +|text/plain +|CLI-like representation + +|yaml +|application/yaml +|https://en.wikipedia.org/wiki/YAML[YAML] (YAML Ain't Markup Language) human-readable format + +3+h| Binary Formats + +|cbor +|application/cbor +|http://cbor.io/[Concise Binary Object Representation] + +|smile +|application/smile +|https://en.wikipedia.org/wiki/Smile_(data_interchange_format)[Smile] binary data format similar to CBOR + +|=== -Alternatively you can set the `Accept` HTTP header to the appropriate media -format. The GET parameter takes precedence over the header. If neither is -specified then the response is returned in the same format as the request. [source,js] -------------------------------------------------- @@ -80,7 +129,11 @@ Which returns: -------------------------------------------------- // TESTRESPONSE[s/sDXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAAEWWWdrRlVfSS1TbDYtcW9lc1FJNmlYdw==:BAFmBmF1dGhvcgFmBG5hbWUBZgpwYWdlX2NvdW50AWYMcmVsZWFzZV9kYXRl\+v\/\/\/w8=/$body.cursor/] -You can continue to the next page by sending back the `cursor` field. In +[[sql-pagination]] +[float] +=== Paginating through a large response + +Using the example above, onu can continue to the next page by sending back the `cursor` field. In case of text format the cursor is returned as `Cursor` http header. [source,js] @@ -111,7 +164,7 @@ Which looks like: -------------------------------------------------- // TESTRESPONSE[s/sDXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAAEWODRMaXBUaVlRN21iTlRyWHZWYUdrdw==:BAFmBmF1dGhvcgFmBG5hbWUBZgpwYWdlX2NvdW50AWYMcmVsZWFzZV9kYXRl9f\/\/\/w8=/$body.cursor/] -Note that the `column` object is only part of the first page. +Note that the `columns` object is only part of the first page. You've reached the last page when there is no `cursor` returned in the results. Like Elasticsearch's <>, @@ -145,9 +198,11 @@ Which will like return the [[sql-rest-filtering]] +[float] +=== Filtering using {es} query DSL You can filter the results that SQL will run on using a standard -Elasticsearch query DSL by specifying the query in the filter +{es} query DSL by specifying the query in the filter parameter. [source,js] @@ -181,10 +236,48 @@ Douglas Adams |The Hitchhiker's Guide to the Galaxy|180 |1979-10-12T // TESTRESPONSE[_cat] [[sql-rest-fields]] -In addition to the `query` and `cursor` fields, the request can -contain `fetch_size` and `time_zone`. `fetch_size` is a hint for how -many results to return in each page. SQL might chose to return more -or fewer results though. `time_zone` is the time zone to use for datetime -functions and datetime parsing. `time_zone` defaults to `utc` and can take -any values documented -http://www.joda.org/joda-time/apidocs/org/joda/time/DateTimeZone.html[here]. +[float] +=== Supported REST parameters + +In addition to the `query` and `fetch_size`, a request a number of user-defined fields for specifying +the request time-outs or localization information (such as timezone). + +The table below lists the supported parameters: + +[cols="^m,^m,^5"] + +|=== + +s|name +s|Default value +s|Description + +|query +|Mandatory +|SQL query to execute + +|fetch_size +|1000 +|The maximum number of rows (or entries) to return in one response + +|filter +|none +|Optional {es} query DSL for additional <>. + +|request_timeout +|90s +|The timeout before the request fails. + +|page_timeout +|45s +|The timeout before a pagination request fails. + +|time_zone +|`Z` (or `UTC`) +|Time-zone in ISO 8601 used for executing the query on the server. +More information available https://docs.oracle.com/javase/8/docs/api/java/time/ZoneId.html[here]. + +|=== + +Do note that most parameters (outside the timeout ones) make sense only during the initial query - any follow-up pagination request only requires the `cursor` parameter as explained in the <> chapter. +That's because the query has already been executed and the calls are simply about returning the found results - thus the parameters are simply ignored. \ No newline at end of file diff --git a/docs/reference/sql/endpoints/translate.asciidoc b/docs/reference/sql/endpoints/translate.asciidoc index cd7f1cc846e8a..6a347ff614af7 100644 --- a/docs/reference/sql/endpoints/translate.asciidoc +++ b/docs/reference/sql/endpoints/translate.asciidoc @@ -4,7 +4,7 @@ == SQL Translate API The SQL Translate API accepts SQL in a JSON document and translates it -into native Elasticsearch queries. For example: +into native {es} queries. For example: [source,js] -------------------------------------------------- diff --git a/docs/reference/sql/language/syntax/lexic/index.asciidoc b/docs/reference/sql/language/syntax/lexic/index.asciidoc index 423934b4c6bbd..014193a0469dd 100644 --- a/docs/reference/sql/language/syntax/lexic/index.asciidoc +++ b/docs/reference/sql/language/syntax/lexic/index.asciidoc @@ -20,7 +20,7 @@ Take the following example: SELECT * FROM table ---- -This query has four tokens: `SELECT`, `\*`, `FROM` and `table`. The first three, namely `SELECT`, `*` and `FROM` are __key words__ meaning words that have a fixed meaning in SQL. The token `table` is an _identifier_ meaning it identifies (by name) an entity inside SQL such as a table (in this case), a column, etc... +This query has four tokens: `SELECT`, `*`, `FROM` and `table`. The first three, namely `SELECT`, `*` and `FROM` are __key words__ meaning words that have a fixed meaning in SQL. The token `table` is an _identifier_ meaning it identifies (by name) an entity inside SQL such as a table (in this case), a column, etc... As one can see, both key words and identifiers have the _same_ lexical structure and thus one cannot know whether a token is one or the other without knowing the SQL language; the complete list of key words is available in the <>. Do note that key words are case-insensitive meaning the previous example can be written as: @@ -45,7 +45,7 @@ Identifiers can be of two types: __quoted__ and __unquoted__: SELECT ip_address FROM "hosts-*" ---- -This query has two identifiers, `ip_address` and `hosts-\*` (an <>). As `ip_address` does not clash with any key words it can be used verbatim, `hosts-*` on the other hand cannot as it clashes with `-` (minus operation) and `*` hence the double quotes. +This query has two identifiers, `ip_address` and `hosts-*` (an <>). As `ip_address` does not clash with any key words it can be used verbatim, `hosts-*` on the other hand cannot as it clashes with `-` (minus operation) and `*` hence the double quotes. Another example: @@ -213,7 +213,7 @@ s|Description Two styles are supported: Single Line:: Comments start with a double dash `--` and continue until the end of the line. -Multi line:: Comments that start with `/\*` and end with `*/` (also known as C-style). +Multi line:: Comments that start with `/*` and end with `*/` (also known as C-style). [source, sql] diff --git a/docs/reference/upgrade/close-ml.asciidoc b/docs/reference/upgrade/close-ml.asciidoc new file mode 100644 index 0000000000000..c4efddca759c9 --- /dev/null +++ b/docs/reference/upgrade/close-ml.asciidoc @@ -0,0 +1,32 @@ +[testenv="platinum"] + +If your {ml} indices were created earlier than the previous major version, they +must be reindexed. In those circumstances, there must be no machine learning +jobs running during the upgrade. + +In all other circumstances, there is no requirement to close your {ml} jobs. +There are, however, advantages to doing so. If you choose to leave your jobs +running during the upgrade, they are affected when you stop the {ml} nodes. The +jobs move to another {ml} node and restore the model states. This scenario has +the least disruption to the active {ml} jobs but incurs the highest load on the +cluster. + +To close all {ml} jobs before you upgrade, see +{stack-ov}/stopping-ml.html[Stopping {ml}]. This method persists the model +state at the moment of closure, which means that when you open your jobs after +the upgrade, they use the exact same model. This scenario takes the most time, +however, especially if you have many jobs or jobs with large model states. + +To temporarily halt the tasks associated with your {ml} jobs and {dfeeds} and +prevent new jobs from opening, use the <>: + +[source,js] +-------------------------------------------------- +POST _ml/set_upgrade_mode?enabled=true +-------------------------------------------------- +// CONSOLE + +This method does not persist the absolute latest model state, rather it uses the +last model state that was automatically saved. By halting the tasks, you avoid +incurring the cost of managing active jobs during the upgrade and it's quicker +than stopping {dfeeds} and closing jobs. \ No newline at end of file diff --git a/docs/reference/upgrade/cluster_restart.asciidoc b/docs/reference/upgrade/cluster_restart.asciidoc index 4c229e373f505..a8552a82bb8d2 100644 --- a/docs/reference/upgrade/cluster_restart.asciidoc +++ b/docs/reference/upgrade/cluster_restart.asciidoc @@ -26,8 +26,11 @@ recovery. include::synced-flush.asciidoc[] -- -. *Stop any machine learning jobs that are running.* See -{xpack-ref}/stopping-ml.html[Stopping Machine Learning]. +. *Stop any machine learning jobs that are running.* ++ +-- +include::close-ml.asciidoc[] +-- . *Shutdown all nodes.* + @@ -132,3 +135,7 @@ GET _cat/recovery -- . *Restart machine learning jobs.* ++ +-- +include::open-ml.asciidoc[] +-- diff --git a/docs/reference/upgrade/open-ml.asciidoc b/docs/reference/upgrade/open-ml.asciidoc new file mode 100644 index 0000000000000..b9b6b772bbe8d --- /dev/null +++ b/docs/reference/upgrade/open-ml.asciidoc @@ -0,0 +1,13 @@ +[testenv="platinum"] +If you closed all {ml} jobs before the upgrade, you must open them. Use {kib} or +the <>. + +Alternatively, if you temporarily halted the tasks associated with your {ml} jobs, +use the <> to return them to active +states: + +[source,js] +-------------------------------------------------- +POST _ml/set_upgrade_mode?enabled=false +-------------------------------------------------- +// CONSOLE diff --git a/docs/reference/upgrade/rolling_upgrade.asciidoc b/docs/reference/upgrade/rolling_upgrade.asciidoc index dff3895ac4c1d..e62bd9348f1ab 100644 --- a/docs/reference/upgrade/rolling_upgrade.asciidoc +++ b/docs/reference/upgrade/rolling_upgrade.asciidoc @@ -43,8 +43,11 @@ include::synced-flush.asciidoc[] -- -. *Stop any machine learning jobs that are running.* See -{xpack-ref}/stopping-ml.html[Stopping Machine Learning]. +. *Stop any machine learning jobs that are running.* ++ +-- +include::close-ml.asciidoc[] +-- . [[upgrade-node]] *Shut down a single node*. + @@ -160,6 +163,11 @@ for each node that needs to be updated. -- . *Restart machine learning jobs.* ++ +-- +include::open-ml.asciidoc[] +-- + [IMPORTANT] ==================================================== diff --git a/modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/CommonAnalysisPlugin.java b/modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/CommonAnalysisPlugin.java index ac439f4bae227..e05305a8ebd39 100644 --- a/modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/CommonAnalysisPlugin.java +++ b/modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/CommonAnalysisPlugin.java @@ -414,14 +414,6 @@ public List getPreConfiguredTokenFilters() { filters.add(PreConfiguredTokenFilter.singleton("dutch_stem", false, input -> new SnowballFilter(input, new DutchStemmer()))); filters.add(PreConfiguredTokenFilter.singleton("edge_ngram", false, false, input -> new EdgeNGramTokenFilter(input, 1))); - filters.add(PreConfiguredTokenFilter.singletonWithVersion("edgeNGram", false, false, (reader, version) -> { - if (version.onOrAfter(org.elasticsearch.Version.V_6_4_0)) { - deprecationLogger.deprecatedAndMaybeLog("edgeNGram_deprecation", - "The [edgeNGram] token filter name is deprecated and will be removed in a future version. " - + "Please change the filter name to [edge_ngram] instead."); - } - return new EdgeNGramTokenFilter(reader, 1); - })); filters.add(PreConfiguredTokenFilter.singleton("elision", true, input -> new ElisionFilter(input, FrenchAnalyzer.DEFAULT_ARTICLES))); filters.add(PreConfiguredTokenFilter.singleton("french_stem", false, input -> new SnowballFilter(input, new FrenchStemmer()))); @@ -438,14 +430,6 @@ public List getPreConfiguredTokenFilters() { LimitTokenCountFilterFactory.DEFAULT_MAX_TOKEN_COUNT, LimitTokenCountFilterFactory.DEFAULT_CONSUME_ALL_TOKENS))); filters.add(PreConfiguredTokenFilter.singleton("ngram", false, false, reader -> new NGramTokenFilter(reader, 1, 2, false))); - filters.add(PreConfiguredTokenFilter.singletonWithVersion("nGram", false, false, (reader, version) -> { - if (version.onOrAfter(org.elasticsearch.Version.V_6_4_0)) { - deprecationLogger.deprecatedAndMaybeLog("nGram_deprecation", - "The [nGram] token filter name is deprecated and will be removed in a future version. " - + "Please change the filter name to [ngram] instead."); - } - return new NGramTokenFilter(reader, 1, 2, false); - })); filters.add(PreConfiguredTokenFilter.singleton("persian_normalization", true, PersianNormalizationFilter::new)); filters.add(PreConfiguredTokenFilter.singleton("porter_stem", false, PorterStemFilter::new)); filters.add(PreConfiguredTokenFilter.singleton("reverse", false, ReverseStringFilter::new)); diff --git a/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/CommonAnalysisFactoryTests.java b/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/CommonAnalysisFactoryTests.java index 99e882c622085..acb7f2213f641 100644 --- a/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/CommonAnalysisFactoryTests.java +++ b/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/CommonAnalysisFactoryTests.java @@ -185,7 +185,6 @@ protected Map> getPreConfiguredTokenFilters() { filters.put("delimited_payload", org.apache.lucene.analysis.payloads.DelimitedPayloadTokenFilterFactory.class); filters.put("dutch_stem", SnowballPorterFilterFactory.class); filters.put("edge_ngram", null); - filters.put("edgeNGram", null); filters.put("elision", null); filters.put("french_stem", SnowballPorterFilterFactory.class); filters.put("german_stem", null); @@ -197,7 +196,6 @@ protected Map> getPreConfiguredTokenFilters() { filters.put("length", null); filters.put("limit", LimitTokenCountFilterFactory.class); filters.put("ngram", null); - filters.put("nGram", null); filters.put("persian_normalization", null); filters.put("porter_stem", null); filters.put("reverse", ReverseStringFilterFactory.class); diff --git a/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/CommonAnalysisPluginTests.java b/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/CommonAnalysisPluginTests.java index c52c78ffe27e3..f128c07361c45 100644 --- a/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/CommonAnalysisPluginTests.java +++ b/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/CommonAnalysisPluginTests.java @@ -20,8 +20,6 @@ package org.elasticsearch.analysis.common; import org.apache.lucene.analysis.Analyzer; -import org.apache.lucene.analysis.MockTokenizer; -import org.apache.lucene.analysis.Tokenizer; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.settings.Settings; @@ -29,98 +27,14 @@ import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.analysis.IndexAnalyzers; import org.elasticsearch.index.analysis.NamedAnalyzer; -import org.elasticsearch.index.analysis.TokenFilterFactory; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.VersionUtils; import java.io.IOException; -import java.io.StringReader; -import java.util.Map; public class CommonAnalysisPluginTests extends ESTestCase { - /** - * Check that the deprecated name "nGram" issues a deprecation warning for indices created since 6.3.0 - */ - public void testNGramDeprecationWarning() throws IOException { - Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) - .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, Version.CURRENT)) - .build(); - - IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("index", settings); - try (CommonAnalysisPlugin commonAnalysisPlugin = new CommonAnalysisPlugin()) { - Map tokenFilters = createTestAnalysis(idxSettings, settings, commonAnalysisPlugin).tokenFilter; - TokenFilterFactory tokenFilterFactory = tokenFilters.get("nGram"); - Tokenizer tokenizer = new MockTokenizer(); - tokenizer.setReader(new StringReader("foo bar")); - assertNotNull(tokenFilterFactory.create(tokenizer)); - assertWarnings( - "The [nGram] token filter name is deprecated and will be removed in a future version. " - + "Please change the filter name to [ngram] instead."); - } - } - - /** - * Check that the deprecated name "nGram" does NOT issues a deprecation warning for indices created before 6.4.0 - */ - public void testNGramNoDeprecationWarningPre6_4() throws IOException { - Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) - .put(IndexMetaData.SETTING_VERSION_CREATED, - VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_6_3_0)) - .build(); - - IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("index", settings); - try (CommonAnalysisPlugin commonAnalysisPlugin = new CommonAnalysisPlugin()) { - Map tokenFilters = createTestAnalysis(idxSettings, settings, commonAnalysisPlugin).tokenFilter; - TokenFilterFactory tokenFilterFactory = tokenFilters.get("nGram"); - Tokenizer tokenizer = new MockTokenizer(); - tokenizer.setReader(new StringReader("foo bar")); - assertNotNull(tokenFilterFactory.create(tokenizer)); - } - } - - /** - * Check that the deprecated name "edgeNGram" issues a deprecation warning for indices created since 6.3.0 - */ - public void testEdgeNGramDeprecationWarning() throws IOException { - Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) - .put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(), Version.V_6_4_0, Version.CURRENT)) - .build(); - - IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("index", settings); - try (CommonAnalysisPlugin commonAnalysisPlugin = new CommonAnalysisPlugin()) { - Map tokenFilters = createTestAnalysis(idxSettings, settings, commonAnalysisPlugin).tokenFilter; - TokenFilterFactory tokenFilterFactory = tokenFilters.get("edgeNGram"); - Tokenizer tokenizer = new MockTokenizer(); - tokenizer.setReader(new StringReader("foo bar")); - assertNotNull(tokenFilterFactory.create(tokenizer)); - assertWarnings( - "The [edgeNGram] token filter name is deprecated and will be removed in a future version. " - + "Please change the filter name to [edge_ngram] instead."); - } - } - - /** - * Check that the deprecated name "edgeNGram" does NOT issues a deprecation warning for indices created before 6.4.0 - */ - public void testEdgeNGramNoDeprecationWarningPre6_4() throws IOException { - Settings settings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) - .put(IndexMetaData.SETTING_VERSION_CREATED, - VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_6_3_0)) - .build(); - - IndexSettings idxSettings = IndexSettingsModule.newIndexSettings("index", settings); - try (CommonAnalysisPlugin commonAnalysisPlugin = new CommonAnalysisPlugin()) { - Map tokenFilters = createTestAnalysis(idxSettings, settings, commonAnalysisPlugin).tokenFilter; - TokenFilterFactory tokenFilterFactory = tokenFilters.get("edgeNGram"); - Tokenizer tokenizer = new MockTokenizer(); - tokenizer.setReader(new StringReader("foo bar")); - assertNotNull(tokenFilterFactory.create(tokenizer)); - } - } - - /** * Check that the deprecated analyzer name "standard_html_strip" throws exception for indices created since 7.0.0 */ diff --git a/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/HighlighterWithAnalyzersTests.java b/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/HighlighterWithAnalyzersTests.java index e96243efc4254..8f58a074cf102 100644 --- a/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/HighlighterWithAnalyzersTests.java +++ b/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/HighlighterWithAnalyzersTests.java @@ -81,7 +81,7 @@ public void testNgramHighlightingWithBrokenPositions() throws IOException { .put("analysis.tokenizer.autocomplete.max_gram", 20) .put("analysis.tokenizer.autocomplete.min_gram", 1) .put("analysis.tokenizer.autocomplete.token_chars", "letter,digit") - .put("analysis.tokenizer.autocomplete.type", "nGram") + .put("analysis.tokenizer.autocomplete.type", "ngram") .put("analysis.filter.wordDelimiter.type", "word_delimiter") .putList("analysis.filter.wordDelimiter.type_table", "& => ALPHANUM", "| => ALPHANUM", "! => ALPHANUM", diff --git a/modules/analysis-common/src/test/resources/rest-api-spec/test/analysis-common/30_tokenizers.yml b/modules/analysis-common/src/test/resources/rest-api-spec/test/analysis-common/30_tokenizers.yml index 9a7c158fc4734..4fe5162e68743 100644 --- a/modules/analysis-common/src/test/resources/rest-api-spec/test/analysis-common/30_tokenizers.yml +++ b/modules/analysis-common/src/test/resources/rest-api-spec/test/analysis-common/30_tokenizers.yml @@ -23,38 +23,6 @@ - match: { detail.tokenizer.tokens.0.token: Foo Bar! } --- -"nGram": - - do: - indices.analyze: - body: - text: good - explain: true - tokenizer: - type: nGram - min_gram: 2 - max_gram: 2 - - length: { detail.tokenizer.tokens: 3 } - - match: { detail.tokenizer.name: _anonymous_tokenizer } - - match: { detail.tokenizer.tokens.0.token: go } - - match: { detail.tokenizer.tokens.1.token: oo } - - match: { detail.tokenizer.tokens.2.token: od } - ---- -"nGram_exception": - - skip: - version: " - 6.99.99" - reason: only starting from version 7.x this throws an error - - do: - catch: /The difference between max_gram and min_gram in NGram Tokenizer must be less than or equal to[:] \[1\] but was \[2\]\. This limit can be set by changing the \[index.max_ngram_diff\] index level setting\./ - indices.analyze: - body: - text: good - explain: true - tokenizer: - type: nGram - min_gram: 2 - max_gram: 4 ---- "simple_pattern": - do: indices.analyze: @@ -133,7 +101,7 @@ text: "foobar" explain: true tokenizer: - type: nGram + type: ngram min_gram: 3 max_gram: 3 - length: { detail.tokenizer.tokens: 4 } @@ -162,15 +130,31 @@ body: text: "foo" explain: true - tokenizer: nGram + tokenizer: ngram - length: { detail.tokenizer.tokens: 5 } - - match: { detail.tokenizer.name: nGram } + - match: { detail.tokenizer.name: ngram } - match: { detail.tokenizer.tokens.0.token: f } - match: { detail.tokenizer.tokens.1.token: fo } - match: { detail.tokenizer.tokens.2.token: o } - match: { detail.tokenizer.tokens.3.token: oo } - match: { detail.tokenizer.tokens.4.token: o } +--- +"ngram_exception": + - skip: + version: " - 6.99.99" + reason: only starting from version 7.x this throws an error + - do: + catch: /The difference between max_gram and min_gram in NGram Tokenizer must be less than or equal to[:] \[1\] but was \[2\]\. This limit can be set by changing the \[index.max_ngram_diff\] index level setting\./ + indices.analyze: + body: + text: good + explain: true + tokenizer: + type: ngram + min_gram: 2 + max_gram: 4 + --- "edge_ngram": - do: @@ -194,7 +178,7 @@ text: "foo" explain: true tokenizer: - type: edgeNGram + type: edge_ngram min_gram: 1 max_gram: 3 - length: { detail.tokenizer.tokens: 3 } @@ -219,9 +203,9 @@ body: text: "foo" explain: true - tokenizer: edgeNGram + tokenizer: edge_ngram - length: { detail.tokenizer.tokens: 2 } - - match: { detail.tokenizer.name: edgeNGram } + - match: { detail.tokenizer.name: edge_ngram } - match: { detail.tokenizer.tokens.0.token: f } - match: { detail.tokenizer.tokens.1.token: fo } diff --git a/modules/analysis-common/src/test/resources/rest-api-spec/test/indices.analyze/10_analyze.yml b/modules/analysis-common/src/test/resources/rest-api-spec/test/indices.analyze/10_analyze.yml index ec00b6d41f1c5..56bbed7044e14 100644 --- a/modules/analysis-common/src/test/resources/rest-api-spec/test/indices.analyze/10_analyze.yml +++ b/modules/analysis-common/src/test/resources/rest-api-spec/test/indices.analyze/10_analyze.yml @@ -76,7 +76,7 @@ analysis: tokenizer: trigram: - type: nGram + type: ngram min_gram: 3 max_gram: 3 filter: diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 832df83fe0f5c..7ddd6348f5245 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -948,6 +948,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO } } } + markSeqNoAsSeen(index.seqNo()); return plan; } @@ -1301,6 +1302,7 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws delete.seqNo(), delete.version()); } } + markSeqNoAsSeen(delete.seqNo()); return plan; } @@ -1455,6 +1457,7 @@ public void maybePruneDeletes() { public NoOpResult noOp(final NoOp noOp) { NoOpResult noOpResult; try (ReleasableLock ignored = readLock.acquire()) { + markSeqNoAsSeen(noOp.seqNo()); noOpResult = innerNoOp(noOp); } catch (final Exception e) { noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), e); @@ -2434,6 +2437,13 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException { localCheckpointTracker.waitForOpsToComplete(seqNo); } + /** + * Marks the given seq_no as seen and advances the max_seq_no of this engine to at least that value. + */ + protected final void markSeqNoAsSeen(long seqNo) { + localCheckpointTracker.advanceMaxSeqNo(seqNo); + } + /** * Checks if the given operation has been processed in this engine or not. * @return true if the given operation was processed; otherwise false. diff --git a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java index 50f4e311c8d81..a19d9ac4abb94 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java @@ -81,6 +81,15 @@ public synchronized long generateSeqNo() { return nextSeqNo++; } + /** + * Marks the provided sequence number as seen and updates the max_seq_no if needed. + */ + public synchronized void advanceMaxSeqNo(long seqNo) { + if (seqNo >= nextSeqNo) { + nextSeqNo = seqNo + 1; + } + } + /** * Marks the processing of the provided sequence number as completed as updates the checkpoint if possible. * diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 9a7e25d29bb08..f4b834e4d29a6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -46,6 +46,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.NetworkDisruption; import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect; @@ -84,7 +85,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase { @Override protected Collection> nodePlugins() { // disruption tests need MockTransportService - return Arrays.asList(MockTransportService.TestPlugin.class); + return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class); } public void testBulkWeirdScenario() throws Exception { @@ -92,7 +93,9 @@ public void testBulkWeirdScenario() throws Exception { internalCluster().startDataOnlyNodes(2); assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder() - .put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get()); + .put("index.number_of_shards", 1).put("index.number_of_replicas", 1) + .put("index.global_checkpoint_sync.interval", "1s")) + .get()); ensureGreen(); BulkResponse bulkResponse = client().prepareBulk() diff --git a/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java b/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java index 97ba76b822020..bd89ceb64e6df 100644 --- a/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java +++ b/server/src/test/java/org/elasticsearch/discovery/AbstractDisruptionTestCase.java @@ -30,8 +30,10 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.disruption.NetworkDisruption; import org.elasticsearch.test.disruption.NetworkDisruption.Bridge; @@ -65,6 +67,13 @@ protected Settings nodeSettings(int nodeOrdinal) { return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(DEFAULT_SETTINGS).build(); } + @Override + public Settings indexSettings() { + return Settings.builder().put(super.indexSettings()) + // sync global checkpoint quickly so we can verify seq_no_stats aligned between all copies after tests. + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s").build(); + } + @Override protected int numberOfShards() { return 3; @@ -128,7 +137,7 @@ List startCluster(int numberOfNodes) { @Override protected Collection> nodePlugins() { - return Arrays.asList(MockTransportService.TestPlugin.class); + return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class); } ClusterState getNodeClusterState(String node) { diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index f23665d201206..d9ed5cd2c719e 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -5653,4 +5653,42 @@ public void testStoreHonorsLuceneVersion() throws IOException { } } } + + public void testMaxSeqNoInCommitUserData() throws Exception { + AtomicBoolean running = new AtomicBoolean(true); + Thread rollTranslog = new Thread(() -> { + while (running.get() && engine.getTranslog().currentFileGeneration() < 500) { + engine.rollTranslogGeneration(); // make adding operations to translog slower + } + }); + rollTranslog.start(); + + Thread indexing = new Thread(() -> { + long seqNo = 0; + while (running.get() && seqNo <= 1000) { + try { + String id = Long.toString(between(1, 50)); + if (randomBoolean()) { + ParsedDocument doc = testParsedDocument(id, null, testDocumentWithTextField(), SOURCE, null); + engine.index(replicaIndexForDoc(doc, 1L, seqNo, false)); + } else { + engine.delete(replicaDeleteForDoc(id, 1L, seqNo, 0L)); + } + seqNo++; + } catch (IOException e) { + throw new AssertionError(e); + } + } + }); + indexing.start(); + + int numCommits = between(5, 20); + for (int i = 0; i < numCommits; i++) { + engine.flush(false, true); + } + running.set(false); + indexing.join(); + rollTranslog.join(); + assertMaxSeqNoInCommitUserData(engine); + } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 674c252d780f3..fb8574594a874 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -120,6 +120,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -474,18 +475,22 @@ public void testStressMaybeFlushOrRollTranslogGeneration() throws Exception { final FlushStats initialStats = shard.flushStats(); client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); check = () -> { + assertFalse(shard.shouldPeriodicallyFlush()); final FlushStats currentStats = shard.flushStats(); String msg = String.format(Locale.ROOT, "flush stats: total=[%d vs %d], periodic=[%d vs %d]", initialStats.getTotal(), currentStats.getTotal(), initialStats.getPeriodic(), currentStats.getPeriodic()); - assertThat(msg, currentStats.getPeriodic(), equalTo(initialStats.getPeriodic() + 1)); - assertThat(msg, currentStats.getTotal(), equalTo(initialStats.getTotal() + 1)); + assertThat(msg, currentStats.getPeriodic(), + either(equalTo(initialStats.getPeriodic() + 1)).or(equalTo(initialStats.getPeriodic() + 2))); + assertThat(msg, currentStats.getTotal(), + either(equalTo(initialStats.getTotal() + 1)).or(equalTo(initialStats.getTotal() + 2))); }; } else { final long generation = getTranslog(shard).currentFileGeneration(); client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get(); - check = () -> assertEquals( - generation + 1, - getTranslog(shard).currentFileGeneration()); + check = () -> { + assertFalse(shard.shouldRollTranslogGeneration()); + assertEquals(generation + 1, getTranslog(shard).currentFileGeneration()); + }; } assertBusy(check); running.set(false); diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index fb455f37d76f3..c2d35279bdff4 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -110,6 +110,13 @@ protected void beforeIndexDeletion() throws Exception { internalCluster().assertSameDocIdsOnShards(); } + @Override + public Settings indexSettings() { + return Settings.builder().put(super.indexSettings()) + // sync global checkpoint quickly so we can verify seq_no_stats aligned between all copies after tests. + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s").build(); + } + public void testSimpleRelocationNoIndexing() { logger.info("--> starting [node1] ..."); final String node_1 = internalCluster().startNode(); @@ -279,8 +286,7 @@ public void testRelocationWhileRefreshing() throws Exception { .put("index.number_of_shards", 1) .put("index.number_of_replicas", numberOfReplicas) .put("index.refresh_interval", -1) // we want to control refreshes - .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")) - .get(); + ).get(); for (int i = 1; i < numberOfNodes; i++) { logger.info("--> starting [node_{}] ...", i); @@ -465,8 +471,7 @@ public void testIndexAndRelocateConcurrently() throws ExecutionException, Interr final Settings.Builder settings = Settings.builder() .put("index.routing.allocation.exclude.color", "blue") .put(indexSettings()) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)) - .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms"); + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)); assertAcked(prepareCreate("test", settings)); assertAllShardsOnNodes("test", redNodes); int numDocs = randomIntBetween(100, 150); @@ -518,8 +523,8 @@ public void testRelocateWhileWaitingForRefresh() { prepareCreate("test", Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) - .put("index.refresh_interval", -1) // we want to control refreshes - ).get(); + // we want to control refreshes + .put("index.refresh_interval", -1)).get(); logger.info("--> index 10 docs"); for (int i = 0; i < 10; i++) { diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 005bfb42f8a22..a6765e4e44fa6 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -27,6 +27,8 @@ import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.document.StoredField; import org.apache.lucene.document.TextField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.LeafReader; @@ -126,6 +128,7 @@ import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA; import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; public abstract class EngineTestCase extends ESTestCase { @@ -254,18 +257,20 @@ public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @After public void tearDown() throws Exception { super.tearDown(); - if (engine != null && engine.isClosed.get() == false) { - engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); - assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); - } - if (replicaEngine != null && replicaEngine.isClosed.get() == false) { - replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); - assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test")); + try { + if (engine != null && engine.isClosed.get() == false) { + engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); + assertMaxSeqNoInCommitUserData(engine); + } + if (replicaEngine != null && replicaEngine.isClosed.get() == false) { + replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test")); + assertMaxSeqNoInCommitUserData(replicaEngine); + } + } finally { + IOUtils.close(replicaEngine, storeReplica, engine, store, () -> terminate(threadPool)); } - IOUtils.close( - replicaEngine, storeReplica, - engine, store); - terminate(threadPool); } @@ -1067,6 +1072,21 @@ public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine e } } + /** + * Asserts that the max_seq_no stored in the commit's user_data is never smaller than seq_no of any document in the commit. + */ + public static void assertMaxSeqNoInCommitUserData(Engine engine) throws Exception { + List commits = DirectoryReader.listCommits(engine.store.directory()); + for (IndexCommit commit : commits) { + try (DirectoryReader reader = DirectoryReader.open(commit)) { + AtomicLong maxSeqNoFromDocs = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + Lucene.scanSeqNosInReader(reader, 0, Long.MAX_VALUE, n -> maxSeqNoFromDocs.set(Math.max(n, maxSeqNoFromDocs.get()))); + assertThat(Long.parseLong(commit.getUserData().get(SequenceNumbers.MAX_SEQ_NO)), + greaterThanOrEqualTo(maxSeqNoFromDocs.get())); + } + } + } + public static MapperService createMapperService(String type) throws IOException { IndexMetaData indexMetaData = IndexMetaData.builder("test") .settings(Settings.builder() diff --git a/x-pack/docs/en/rest-api/watcher/put-watch.asciidoc b/x-pack/docs/en/rest-api/watcher/put-watch.asciidoc index 89b79b5680056..1349e8def05d9 100644 --- a/x-pack/docs/en/rest-api/watcher/put-watch.asciidoc +++ b/x-pack/docs/en/rest-api/watcher/put-watch.asciidoc @@ -5,7 +5,7 @@ Put watch ++++ -The PUT watch API either registers a new watch in {watcher} or update an +The PUT watch API either registers a new watch in {watcher} or updates an existing one. [float] @@ -21,13 +21,13 @@ the `.watches` index and its trigger is immediately registered with the relevant trigger engine. Typically for the `schedule` trigger, the scheduler is the trigger engine. -IMPORTANT: Putting a watch must be done via this API only. Do not put a watch - directly to the `.watches` index using the Elasticsearch Index API. - If {es} {security-features} are enabled, make sure no `write` - privileges are granted to anyone over the `.watches` index. +IMPORTANT: You must use {kib} or this API to create a watch. Do not put a watch + directly to the `.watches` index using the Elasticsearch index API. + If {es} {security-features} are enabled, do not give users `write` + privileges on the `.watches` index. When adding a watch you can also define its initial -{xpack-ref}/how-watcher-works.html#watch-active-state[active state]. You do that +{stack-ov}/how-watcher-works.html#watch-active-state[active state]. You do that by setting the `active` parameter. [float] @@ -52,16 +52,16 @@ A watch has the following fields: |====== | Name | Description -| `trigger` | The {xpack-ref}/trigger.html[trigger] that defines when +| `trigger` | The {stack-ov}/trigger.html[trigger] that defines when the watch should run. -| `input` | The {xpack-ref}/input.html[input] that defines the input +| `input` | The {stack-ov}/input.html[input] that defines the input that loads the data for the watch. -| `condition` | The {xpack-ref}/condition.html[condition] that defines if +| `condition` | The {stack-ov}/condition.html[condition] that defines if the actions should be run. -| `actions` | The list of {xpack-ref}/actions.html[actions] that will be +| `actions` | The list of {stack-ov}/actions.html[actions] that will be run if the condition matches | `metadata` | Metadata json that will be copied into the history entries. @@ -75,7 +75,7 @@ A watch has the following fields: ==== Authorization You must have `manage_watcher` cluster privileges to use this API. For more -information, see {xpack-ref}/security-privileges.html[Security Privileges]. +information, see {stack-ov}/security-privileges.html[Security Privileges]. [float] ==== Security Integration @@ -148,7 +148,7 @@ PUT _watcher/watch/my-watch // CONSOLE When you add a watch you can also define its initial -{xpack-ref}/how-watcher-works.html#watch-active-state[active state]. You do that +{stack-ov}/how-watcher-works.html#watch-active-state[active state]. You do that by setting the `active` parameter. The following command adds a watch and sets it to be inactive by default: diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java index 656328d5ead9e..4891c51049b62 100644 --- a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java +++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java @@ -28,6 +28,7 @@ import static org.elasticsearch.rest.action.search.RestSearchAction.TOTAL_HITS_AS_INT_PARAM; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class ESCCRRestTestCase extends ESRestTestCase { @@ -139,8 +140,9 @@ protected static void verifyCcrMonitoring(final String expectedLeaderIndex, fina throw new AssertionError("error while searching", e); } - int numberOfOperationsReceived = 0; - int numberOfOperationsIndexed = 0; + int followerMaxSeqNo = 0; + int followerMappingVersion = 0; + int followerSettingsVersion = 0; List hits = (List) XContentMapValues.extractValue("hits.hits", response); assertThat(hits.size(), greaterThanOrEqualTo(1)); @@ -153,16 +155,20 @@ protected static void verifyCcrMonitoring(final String expectedLeaderIndex, fina final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit); assertThat(followerIndex, equalTo(expectedFollowerIndex)); - int foundNumberOfOperationsReceived = - (int) XContentMapValues.extractValue("_source.ccr_stats.operations_read", hit); - numberOfOperationsReceived = Math.max(numberOfOperationsReceived, foundNumberOfOperationsReceived); - int foundNumberOfOperationsIndexed = - (int) XContentMapValues.extractValue("_source.ccr_stats.operations_written", hit); - numberOfOperationsIndexed = Math.max(numberOfOperationsIndexed, foundNumberOfOperationsIndexed); + int foundFollowerMaxSeqNo = + (int) XContentMapValues.extractValue("_source.ccr_stats.follower_max_seq_no", hit); + followerMaxSeqNo = Math.max(followerMaxSeqNo, foundFollowerMaxSeqNo); + int foundFollowerMappingVersion = + (int) XContentMapValues.extractValue("_source.ccr_stats.follower_mapping_version", hit); + followerMappingVersion = Math.max(followerMappingVersion, foundFollowerMappingVersion); + int foundFollowerSettingsVersion = + (int) XContentMapValues.extractValue("_source.ccr_stats.follower_settings_version", hit); + followerSettingsVersion = Math.max(followerSettingsVersion, foundFollowerSettingsVersion); } - assertThat(numberOfOperationsReceived, greaterThanOrEqualTo(1)); - assertThat(numberOfOperationsIndexed, greaterThanOrEqualTo(1)); + assertThat(followerMaxSeqNo, greaterThan(0)); + assertThat(followerMappingVersion, greaterThan(0)); + assertThat(followerSettingsVersion, greaterThan(0)); } protected static void verifyAutoFollowMonitoring() throws IOException { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java index 123bd2c996dae..a82670a52a0c4 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java @@ -93,15 +93,24 @@ public class ShardFollowTask extends ImmutableFollowParameters implements XPackP } public static ShardFollowTask readFrom(StreamInput in) throws IOException { - return new ShardFollowTask(in.readString(), ShardId.readShardId(in), ShardId.readShardId(in), in); - } - - private ShardFollowTask(String remoteCluster, ShardId followShardId, ShardId leaderShardId, StreamInput in) throws IOException { - super(in); - this.remoteCluster = remoteCluster; - this.followShardId = followShardId; - this.leaderShardId = leaderShardId; - this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); + String remoteCluster = in.readString(); + ShardId followShardId = ShardId.readShardId(in); + ShardId leaderShardId = ShardId.readShardId(in); + // TODO: use ImmutableFollowParameters(StreamInput) constructor + int maxReadRequestOperationCount = in.readVInt(); + ByteSizeValue maxReadRequestSize = new ByteSizeValue(in); + int maxOutstandingReadRequests = in.readVInt(); + int maxWriteRequestOperationCount = in.readVInt(); + ByteSizeValue maxWriteRequestSize = new ByteSizeValue(in); + int maxOutstandingWriteRequests = in.readVInt(); + int maxWriteBufferCount = in.readVInt(); + ByteSizeValue maxWriteBufferSize = new ByteSizeValue(in); + TimeValue maxRetryDelay = in.readTimeValue(); + TimeValue readPollTimeout = in.readTimeValue(); + Map headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString)); + return new ShardFollowTask(remoteCluster, followShardId, leaderShardId, maxReadRequestOperationCount, + maxWriteRequestOperationCount, maxOutstandingReadRequests, maxOutstandingWriteRequests, maxReadRequestSize, + maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, readPollTimeout, headers); } public String getRemoteCluster() { @@ -130,7 +139,17 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(remoteCluster); followShardId.writeTo(out); leaderShardId.writeTo(out); - super.writeTo(out); + // TODO: use super.writeTo() + out.writeVLong(getMaxReadRequestOperationCount()); + getMaxReadRequestSize().writeTo(out); + out.writeVInt(getMaxOutstandingReadRequests()); + out.writeVLong(getMaxWriteRequestOperationCount()); + getMaxWriteRequestSize().writeTo(out); + out.writeVInt(getMaxOutstandingWriteRequests()); + out.writeVInt(getMaxWriteBufferCount()); + getMaxWriteBufferSize().writeTo(out); + out.writeTimeValue(getMaxRetryDelay()); + out.writeTimeValue(getReadPollTimeout()); out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index c779b491d581e..e0b0734912b97 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -68,6 +68,7 @@ private void preFlight(final Operation operation) { @Override protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Index index) throws IOException { preFlight(index); + markSeqNoAsSeen(index.seqNo()); // NOTES: refer Engine#getMaxSeqNoOfUpdatesOrDeletes for the explanation of the optimization using sequence numbers. final long maxSeqNoOfUpdatesOrDeletes = getMaxSeqNoOfUpdatesOrDeletes(); assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "max_seq_no_of_updates is not initialized"; @@ -103,6 +104,7 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind @Override protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Delete delete) throws IOException { preFlight(delete); + markSeqNoAsSeen(delete.seqNo()); if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) { // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java index 63ede221b2cbb..3ffacd81ad2d6 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java @@ -48,6 +48,7 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.seqno.SeqNoStats; @@ -70,6 +71,7 @@ import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.MockHttpTransport; import org.elasticsearch.test.NodeConfigurationSource; @@ -139,7 +141,7 @@ public final void startClusters() throws Exception { stopClusters(); Collection> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class, MockHttpTransport.TestPlugin.class, MockTransportService.TestPlugin.class, - MockNioTransportPlugin.class); + MockNioTransportPlugin.class, InternalSettingsPlugin.class); InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), numberOfNodesPerCluster(), "leader_cluster", createNodeConfigurationSource(null), 0, "leader", mockPlugins, @@ -407,6 +409,7 @@ protected String getIndexSettings(final int numberOfShards, final int numberOfRe builder.startObject("settings"); { builder.field(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0); + builder.field(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s"); builder.field("index.number_of_shards", numberOfShards); builder.field("index.number_of_replicas", numberOfReplicas); for (final Map.Entry additionalSetting : additionalIndexSettings.entrySet()) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 67d31ff39007f..69fa23bd3fbcd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -59,6 +59,7 @@ import java.util.stream.Collectors; import static org.elasticsearch.index.engine.EngineTestCase.getDocIds; +import static org.elasticsearch.index.engine.EngineTestCase.getTranslog; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -659,4 +660,49 @@ public void testVerifyShardBeforeIndexClosingIsNoOp() throws IOException { } }); } + + public void testMaxSeqNoInCommitUserData() throws Exception { + final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); + final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + try (Store store = createStore(shardId, indexSettings, newDirectory())) { + final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); + try (FollowingEngine engine = createEngine(store, engineConfig)) { + AtomicBoolean running = new AtomicBoolean(true); + Thread rollTranslog = new Thread(() -> { + while (running.get() && getTranslog(engine).currentFileGeneration() < 500) { + engine.rollTranslogGeneration(); // make adding operations to translog slower + } + }); + rollTranslog.start(); + + Thread indexing = new Thread(() -> { + List ops = EngineTestCase.generateSingleDocHistory(true, VersionType.EXTERNAL, 2, 50, 500, "id"); + engine.advanceMaxSeqNoOfUpdatesOrDeletes(ops.stream().mapToLong(Engine.Operation::seqNo).max().getAsLong()); + for (Engine.Operation op : ops) { + if (running.get() == false) { + return; + } + try { + EngineTestCase.applyOperation(engine, op); + } catch (IOException e) { + throw new AssertionError(e); + } + } + }); + indexing.start(); + + int numCommits = between(5, 20); + for (int i = 0; i < numCommits; i++) { + engine.flush(false, true); + } + running.set(false); + indexing.join(); + rollTranslog.join(); + EngineTestCase.assertMaxSeqNoInCommitUserData(engine); + } + } + } }