-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cross-cluster search alternate execution mode #32125
Labels
Comments
javanna
added
:Search/Search
Search-related issues that do not fall into other categories
Meta
labels
Jul 17, 2018
Pinging @elastic/es-search-aggs |
javanna
changed the title
Reduce round-trips required by cross-cluster search
Cross-cluster search alternate execution mode
Nov 9, 2018
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Dec 3, 2018
Given that we check the max buckets limit on each shard when collecting the buckets, and that non final reduction cannot add buckets (see elastic#35921), there is no point in counting and checking the number of buckets as part of non final reduction phases. Such check is still needed though in the final reduction phases to make sure that the number of returned buckets is not above the allowed threshold. Relates somehow to elastic#32125 as we will make use of non final reduction phases in CCS alternate execution mode and that increases the chance that this check trips for nothing when reducing aggs in each remote cluster.
javanna
added a commit
that referenced
this issue
Dec 3, 2018
Given that we check the max buckets limit on each shard when collecting the buckets, and that non final reduction cannot add buckets (see #35921), there is no point in counting and checking the number of buckets as part of non final reduction phases. Such check is still needed though in the final reduction phases to make sure that the number of returned buckets is not above the allowed threshold. Relates somehow to #32125 as we will make use of non final reduction phases in CCS alternate execution mode and that increases the chance that this check trips for nothing when reducing aggs in each remote cluster.
javanna
added a commit
that referenced
this issue
Dec 3, 2018
Given that we check the max buckets limit on each shard when collecting the buckets, and that non final reduction cannot add buckets (see #35921), there is no point in counting and checking the number of buckets as part of non final reduction phases. Such check is still needed though in the final reduction phases to make sure that the number of returned buckets is not above the allowed threshold. Relates somehow to #32125 as we will make use of non final reduction phases in CCS alternate execution mode and that increases the chance that this check trips for nothing when reducing aggs in each remote cluster.
javanna
added a commit
that referenced
this issue
Dec 14, 2018
) In order for CCS alternate execution mode (see #32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of the `SearchHits`, specifically: - lucene `SortField[]` which contains info about the fields that sorting was performed on and their type, which depends on mappings (that the CCS node does not know about) - collapse field (`String`) that field collapsing was executed on, if requested - collapse values (`Object[]`) that field collapsing was based on, if requested This info is needed to be able to reconstruct the `TopFieldDocs` or `CollapseFieldTopDocs` in the CCS coordinating node to feed the `mergeTopDocs` method and reduce multiple search responses received (one per cluster) into one. This commit adds such information to the `SearchHits` class. It's nullable info that is not serialized through the REST layer. `SearchPhaseController` sets such info at the end of the hits reduction phase.
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Dec 14, 2018
For cross cluster search execution mode (see elastic#32125), we will need to take a search request that spans across multiple clusters (based on index prefixes e.g. cluster1:index, cluster2:index etc.) and split it into multiple search requests to be sent to each cluster. A copy constructor added to SearchRequest would make that easy and well maintanable in the future. Something along the same lines already happens in `BulkByScrollParallelizationHelper`, but the corresponding code went outdated as some new fields were added to `SearchRequest` which were not added to the bulk by scroll code. A copy constructor helps making the task of copying a search request maintainable over time.
javanna
added a commit
that referenced
this issue
Dec 14, 2018
For cross cluster search alternate execution mode (see #32125), we will need to take a search request that spans across multiple clusters (based on index prefixes e.g. cluster1:index, cluster2:index etc.) and split it into multiple search requests to be sent to each cluster. A copy constructor added to `SearchRequest` would make that easy and well maintainable in the future. Something along the same lines already happens in `BulkByScrollParallelizationHelper`, but the corresponding code went outdated as some new fields were added to `SearchRequest` which were not added to the bulk by scroll code. A copy constructor helps making the task of copying a search request maintainable over time.
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Dec 14, 2018
In order for CCS alternate execution mode (see elastic#32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of each `SearchHit`. Sort values are already present but they are formatted according to the provided `DocValueFormat` provided. The CCS node needs to be able to reconstruct the lucene `FieldDoc` to include in the `TopFieldDocs` and `CollapseTopFieldDocs` which will feed the `mergeTopDocs` method used to reduce multiple search responses (one per cluster) into one. This commit adds such information to the SearchSortValues and exposes it through a new getter method added to `SearchHit` for retrieval. This info is only serialized at transport and never printed out at REST.
javanna
added a commit
that referenced
this issue
Dec 18, 2018
In order for CCS alternate execution mode (see #32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of each `SearchHit`. Sort values are already present but they are formatted according to the provided `DocValueFormat` provided. The CCS node needs to be able to reconstruct the lucene `FieldDoc` to include in the `TopFieldDocs` and `CollapseTopFieldDocs` which will feed the `mergeTopDocs` method used to reduce multiple search responses (one per cluster) into one. This commit adds such information to the `SearchSortValues` and exposes it through a new getter method added to `SearchHit` for retrieval. This info is only serialized at transport and never printed out at REST.
davidkyle
pushed a commit
that referenced
this issue
Dec 18, 2018
In order for CCS alternate execution mode (see #32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of each `SearchHit`. Sort values are already present but they are formatted according to the provided `DocValueFormat` provided. The CCS node needs to be able to reconstruct the lucene `FieldDoc` to include in the `TopFieldDocs` and `CollapseTopFieldDocs` which will feed the `mergeTopDocs` method used to reduce multiple search responses (one per cluster) into one. This commit adds such information to the `SearchSortValues` and exposes it through a new getter method added to `SearchHit` for retrieval. This info is only serialized at transport and never printed out at REST.
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Dec 18, 2018
…stic#36555) In order for CCS alternate execution mode (see elastic#32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of the `SearchHits`, specifically: - lucene `SortField[]` which contains info about the fields that sorting was performed on and their type, which depends on mappings (that the CCS node does not know about) - collapse field (`String`) that field collapsing was executed on, if requested - collapse values (`Object[]`) that field collapsing was based on, if requested This info is needed to be able to reconstruct the `TopFieldDocs` or `CollapseFieldTopDocs` in the CCS coordinating node to feed the `mergeTopDocs` method and reduce multiple search responses received (one per cluster) into one. This commit adds such information to the `SearchHits` class. It's nullable info that is not serialized through the REST layer. `SearchPhaseController` sets such info at the end of the hits reduction phase.
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Dec 18, 2018
For cross cluster search alternate execution mode (see elastic#32125), we will need to take a search request that spans across multiple clusters (based on index prefixes e.g. cluster1:index, cluster2:index etc.) and split it into multiple search requests to be sent to each cluster. A copy constructor added to `SearchRequest` would make that easy and well maintainable in the future. Something along the same lines already happens in `BulkByScrollParallelizationHelper`, but the corresponding code went outdated as some new fields were added to `SearchRequest` which were not added to the bulk by scroll code. A copy constructor helps making the task of copying a search request maintainable over time.
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Dec 18, 2018
…tic#36617) In order for CCS alternate execution mode (see elastic#32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of each `SearchHit`. Sort values are already present but they are formatted according to the provided `DocValueFormat` provided. The CCS node needs to be able to reconstruct the lucene `FieldDoc` to include in the `TopFieldDocs` and `CollapseTopFieldDocs` which will feed the `mergeTopDocs` method used to reduce multiple search responses (one per cluster) into one. This commit adds such information to the `SearchSortValues` and exposes it through a new getter method added to `SearchHit` for retrieval. This info is only serialized at transport and never printed out at REST.
javanna
added a commit
that referenced
this issue
Dec 18, 2018
) In order for CCS alternate execution mode (see #32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of the `SearchHits`, specifically: - lucene `SortField[]` which contains info about the fields that sorting was performed on and their type, which depends on mappings (that the CCS node does not know about) - collapse field (`String`) that field collapsing was executed on, if requested - collapse values (`Object[]`) that field collapsing was based on, if requested This info is needed to be able to reconstruct the `TopFieldDocs` or `CollapseFieldTopDocs` in the CCS coordinating node to feed the `mergeTopDocs` method and reduce multiple search responses received (one per cluster) into one. This commit adds such information to the `SearchHits` class. It's nullable info that is not serialized through the REST layer. `SearchPhaseController` sets such info at the end of the hits reduction phase.
javanna
added a commit
that referenced
this issue
Dec 18, 2018
For cross cluster search alternate execution mode (see #32125), we will need to take a search request that spans across multiple clusters (based on index prefixes e.g. cluster1:index, cluster2:index etc.) and split it into multiple search requests to be sent to each cluster. A copy constructor added to `SearchRequest` would make that easy and well maintainable in the future. Something along the same lines already happens in `BulkByScrollParallelizationHelper`, but the corresponding code went outdated as some new fields were added to `SearchRequest` which were not added to the bulk by scroll code. A copy constructor helps making the task of copying a search request maintainable over time.
javanna
added a commit
that referenced
this issue
Dec 18, 2018
In order for CCS alternate execution mode (see #32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of each `SearchHit`. Sort values are already present but they are formatted according to the provided `DocValueFormat` provided. The CCS node needs to be able to reconstruct the lucene `FieldDoc` to include in the `TopFieldDocs` and `CollapseTopFieldDocs` which will feed the `mergeTopDocs` method used to reduce multiple search responses (one per cluster) into one. This commit adds such information to the `SearchSortValues` and exposes it through a new getter method added to `SearchHit` for retrieval. This info is only serialized at transport and never printed out at REST.
davidkyle
added a commit
that referenced
this issue
Dec 18, 2018
* [ML] Job and datafeed mappings with index template (#32719) Index mappings for the configuration documents * [ML] Job config document CRUD operations (#32738) * [ML] Datafeed config CRUD operations (#32854) * [ML] Change JobManager to work with Job config in index (#33064) * [ML] Change Datafeed actions to read config from the config index (#33273) * [ML] Allocate jobs based on JobParams rather than cluster state config (#33994) * [ML] Return missing job error when .ml-config is does not exist (#34177) * [ML] Close job in index (#34217) * [ML] Adjust finalize job action to work with documents (#34226) * [ML] Job in index: Datafeed node selector (#34218) * [ML] Job in Index: Stop and preview datafeed (#34605) * [ML] Delete job document (#34595) * [ML] Convert job data remover to work with index configs (#34532) * [ML] Job in index: Get datafeed and job stats from index (#34645) * [ML] Job in Index: Convert get calendar events to index docs (#34710) * [ML] Job in index: delete filter action (#34642) This changes the delete filter action to search for jobs using the filter to be deleted in the index rather than the cluster state. * [ML] Job in Index: Enable integ tests (#34851) Enables the ml integration tests excluding the rolling upgrade tests and a lot of fixes to make the tests pass again. * [ML] Reimplement established model memory (#35500) This is the 7.0 implementation of a master node service to keep track of the native process memory requirement of each ML job with an associated native process. The new ML memory tracker service works when the whole cluster is upgraded to at least version 6.6. For mixed version clusters the old mechanism of established model memory stored on the job in cluster state was used. This means that the old (and complex) code to keep established model memory up to date on the job object has been removed in 7.0. Forward port of #35263 * [ML] Need to wait for shards to replicate in distributed test (#35541) Because the cluster was expanded from 1 node to 3 indices would initially start off with 0 replicas. If the original node was killed before auto-expansion to 1 replica was complete then the test would fail because the indices would be unavailable. * [ML] DelayedDataCheckConfig index mappings (#35646) * [ML] JIndex: Restore finalize job action (#35939) * [ML] Replace Version.CURRENT in streaming functions (#36118) * [ML] Use 'anomaly-detector' in job config doc name (#36254) * [ML] Job In Index: Migrate config from the clusterstate (#35834) Migrate ML configuration from clusterstate to index for closed jobs only once all nodes are v6.6.0 or higher * [ML] Check groups against job Ids on update (#36317) * [ML] Adapt to periodic persistent task refresh (#36633) * [ML] Adapt to periodic persistent task refresh If https://github.com/elastic/elasticsearch/pull/36069/files is merged then the approach for reallocating ML persistent tasks after refreshing job memory requirements can be simplified. This change begins the simplification process. * Remove AwaitsFix and implement TODO * [ML] Default search size for configs * Fix TooManyJobsIT.testMultipleNodes Two problems: 1. Stack overflow during async iteration when lots of jobs on same machine 2. Not effectively setting search size in all cases * Use execute() instead of submit() in MlMemoryTracker We don't need a Future to wait for completion * [ML][TEST] Fix NPE in JobManagerTests * [ML] JIindex: Limit the size of bulk migrations (#36481) * [ML] Prevent updates and upgrade tests (#36649) * [FEATURE][ML] Add cluster setting that enables/disables config migration (#36700) This commit adds a cluster settings called `xpack.ml.enable_config_migration`. The setting is `true` by default. When set to `false`, no config migration will be attempted and non-migrated resources (e.g. jobs, datafeeds) will be able to be updated normally. Relates #32905 * [ML] Snapshot ml configs before migrating (#36645) * [FEATURE][ML] Split in batches and migrate all jobs and datafeeds (#36716) Relates #32905 * SQL: Fix translation of LIKE/RLIKE keywords (#36672) * SQL: Fix translation of LIKE/RLIKE keywords Refactor Like/RLike functions to simplify internals and improve query translation when chained or within a script context. Fix #36039 Fix #36584 * Fixing line length for EnvironmentTests and RecoveryTests (#36657) Relates #34884 * Add back one line removed by mistake regarding java version check and COMPAT jvm parameter existence * Do not resolve addresses in remote connection info (#36671) The remote connection info API leads to resolving addresses of seed nodes when invoked. This is problematic because if a hostname fails to resolve, we would not display any remote connection info. Yet, a hostname not resolving can happen across remote clusters, especially in the modern world of cloud services with dynamically chaning IPs. Instead, the remote connection info API should be providing the configured seed nodes. This commit changes the remote connection info to display the configured seed nodes, avoiding a hostname resolution. Note that care was taken to preserve backwards compatibility with previous versions that expect the remote connection info to serialize a transport address instead of a string representing the hostname. * [Painless] Add boxed type to boxed type casts for method/return (#36571) This adds implicit boxed type to boxed types casts for non-def types to create asymmetric casting relative to the def type when calling methods or returning values. This means that a user calling a method taking an Integer can call it with a Byte, Short, etc. legally which matches the way def works. This creates consistency in the casting model that did not previously exist. * SNAPSHOTS: Adjust BwC Versions in Restore Logic (#36718) * Re-enables bwc tests with adjusted version conditions now that #36397 enables concurrent snapshots in 6.6+ * ingest: fix on_failure with Drop processor (#36686) This commit allows a document to be dropped when a Drop processor is used in the on_failure fork of the processor chain. Fixes #36151 * Initialize startup `CcrRepositories` (#36730) Currently, the CcrRepositoryManger only listens for settings updates and installs new repositories. It does not install the repositories that are in the initial settings. This commit, modifies the manager to install the initial repositories. Additionally, it modifies the ccr integration test to configure the remote leader node at startup, instead of using a settings update. * [TEST] fix float comparison in RandomObjects#getExpectedParsedValue This commit fixes a test bug introduced with #36597. This caused some test failure as stored field values comparisons would not work when CBOR xcontent type was used. Closes #29080 * [Geo] Integrate Lucene's LatLonShape (BKD Backed GeoShapes) as default `geo_shape` indexing approach (#35320) This commit exposes lucene's LatLonShape field as the default type in GeoShapeFieldMapper. To use the new indexing approach, simply set "type" : "geo_shape" in the mappings without setting any of the strategy, precision, tree_levels, or distance_error_pct parameters. Note the following when using the new indexing approach: * geo_shape query does not support querying by MULTIPOINT. * LINESTRING and MULTILINESTRING queries do not yet support WITHIN relation. * CONTAINS relation is not yet supported. The tree, precision, tree_levels, distance_error_pct, and points_only parameters are deprecated. * TESTS:Debug Log. IndexStatsIT#testFilterCacheStats * ingest: support default pipelines + bulk upserts (#36618) This commit adds support to enable bulk upserts to use an index's default pipeline. Bulk upsert, doc_as_upsert, and script_as_upsert are all supported. However, bulk script_as_upsert has slightly surprising behavior since the pipeline is executed _before_ the script is evaluated. This means that the pipeline only has access the data found in the upsert field of the script_as_upsert. The non-bulk script_as_upsert (existing behavior) runs the pipeline _after_ the script is executed. This commit does _not_ attempt to consolidate the bulk and non-bulk behavior for script_as_upsert. This commit also adds additional testing for the non-bulk behavior, which remains unchanged with this commit. fixes #36219 * Fix duplicate phrase in shrink/split error message (#36734) This commit removes a duplicate "must be a" from the shrink/split error messages. * Deprecate types in get_source and exist_source (#36426) This change adds a new untyped endpoint `{index}/_source/{id}` for both the GET and the HEAD methods to get the source of a document or check for its existance. It also adds deprecation warnings to RestGetSourceAction that emit a warning when the old deprecated "type" parameter is still used. Also updating documentation and tests where appropriate. Relates to #35190 * Revert "[Geo] Integrate Lucene's LatLonShape (BKD Backed GeoShapes) as default `geo_shape` indexing approach (#35320)" This reverts commit 5bc7822. * Enhance Invalidate Token API (#35388) This change: - Adds functionality to invalidate all (refresh+access) tokens for all users of a realm - Adds functionality to invalidate all (refresh+access)tokens for a user in all realms - Adds functionality to invalidate all (refresh+access) tokens for a user in a specific realm - Changes the response format for the invalidate token API to contain information about the number of the invalidated tokens and possible errors that were encountered. - Updates the API Documentation After back-porting to 6.x, the `created` field will be removed from master as a field in the response Resolves: #35115 Relates: #34556 * Add raw sort values to SearchSortValues transport serialization (#36617) In order for CCS alternate execution mode (see #32125) to be able to do the final reduction step on the CCS coordinating node, we need to serialize additional info in the transport layer as part of each `SearchHit`. Sort values are already present but they are formatted according to the provided `DocValueFormat` provided. The CCS node needs to be able to reconstruct the lucene `FieldDoc` to include in the `TopFieldDocs` and `CollapseTopFieldDocs` which will feed the `mergeTopDocs` method used to reduce multiple search responses (one per cluster) into one. This commit adds such information to the `SearchSortValues` and exposes it through a new getter method added to `SearchHit` for retrieval. This info is only serialized at transport and never printed out at REST. * Watcher: Ensure all internal search requests count hits (#36697) In previous commits only the stored toXContent version of a search request was using the old format. However an executed search request was already disabling hit counts. In 7.0 hit counts will stay enabled by default to allow for proper migration. Closes #36177 * [TEST] Ensure shard follow tasks have really stopped. Relates to #36696 * Ensure MapperService#getAllMetaFields elements order is deterministic (#36739) MapperService#getAllMetaFields returns an array, which is created out of an `ObjectHashSet`. Such set does not guarantee deterministic hash ordering. The array returned by its toArray may be sorted differently at each run. This caused some repeatability issues in our tests (see #29080) as we pick random fields from the array of possible metadata fields, but that won't be repeatable if the input array is sorted differently at every run. Once setting the tests seed, hppc picks that up and the sorting is deterministic, but failures don't repeat with the seed that gets printed out originally (as a seed was not originally set). See also https://issues.carrot2.org/projects/HPPC/issues/HPPC-173. With this commit, we simply create a static sorted array that is used for `getAllMetaFields`. The change is in production code but really affects only testing as the only production usage of this method was to iterate through all values when parsing fields in the high-level REST client code. Anyways, this seems like a good change as returning an array would imply that it's deterministically sorted. * Expose Sequence Number based Optimistic Concurrency Control in the rest layer (#36721) Relates #36148 Relates #10708 * [ML] Mute MlDistributedFailureIT
javanna
added a commit
that referenced
this issue
Jan 21, 2019
This will be used in cross-cluster search when reduction will be performed locally on each cluster. The CCS coordinating node will send one search request per remote cluster involved and will get one search response back from each one of them. Such responses contain all the info to be able to perform an additional reduction and return results back to the user. Relates to #32125
javanna
added a commit
that referenced
this issue
Jan 23, 2019
This commit moves the collectSearchShards method out of RemoteClusterService into TransportSearchAction that currently calls it. RemoteClusterService used to be used only for cross-cluster search but is now also used in cross-cluster replication where different API are called through the RemoteClusterAwareClient. There is no reason for the collectSearchShards and fetchShards methods to be respectively in RemoteClusterService and RemoteClusterConnection. The search shards API can be called through the RemoteClusterAwareClient too, the only missing bit is a way to handle failures based on the skip_unavailable setting for each cluster (currently only supported in RemoteClusterConnection#fetchShards) which is achieved by adding a isSkipUnavailable(String clusterAlias) method to RemoteClusterService. This change is useful for #32125 as we will very soon need to also call the search API against remote clusters, which will be done through RemoteClusterAwareClient. In that case we will also need to support skip_unavailable when calling the search API so we need some way to handle the skip_unavailable setting like we currently do for the search_shards call. Relates to #32125
javanna
added a commit
that referenced
this issue
Jan 31, 2019
With #37566 we have introduced the ability to merge multiple search responses into one. That makes it possible to expose a new way of executing cross-cluster search requests, that makes CCS much faster whenever there is network latency between the CCS coordinating node and the remote clusters. The coordinating node can now send a single search request to each remote cluster, which gets reduced by each one of them. from + size results are requested to each cluster, and the reduce phase in each cluster is non final (meaning that buckets are not pruned and pipeline aggs are not executed). The CCS coordinating node performs an additional, final reduction, which produces one search response out of the multiple responses received from the different clusters. This new execution path will be activated by default for any CCS request unless a scroll is provided or inner hits are requested as part of field collapsing. The search API accepts now a new parameter called ccs_minimize_roundtrips that allows to opt-out of the default behaviour. Relates to #32125
javanna
added a commit
that referenced
this issue
Feb 1, 2019
With #37000 we made sure that fnial reduction is automatically disabled whenever a localClusterAlias is provided with a SearchRequest. While working on #37838, we found a scenario where we do need to set a localClusterAlias yet we would like to perform a final reduction in the remote cluster: when searching on a single remote cluster. Relates to #32125 This commit adds support for a separate finalReduce flag to SearchRequest and makes use of it in TransportSearchAction in case we are searching against a single remote cluster. This also makes sure that num_reduce_phases is correct when searching against a single remote cluster: it makes little sense to return `num_reduce_phases` set to `2`, which looks especially weird in case the search was performed against a single remote shard. We should perform one reduction phase only in this case and `num_reduce_phases` should reflect that. * line length
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Mar 14, 2019
We recently introduced the option to minimize network roundtrips when executing cross-cluster search requests. All the changes made around that are separately unit tested, and there are some yaml test that exercise the new code-path which involves multiple coordination steps. This commit adds new integration tests that compare the output given by CCS when running the same queries using the two different execution modes available. Relates to elastic#32125
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Mar 14, 2019
When using DFS_QUERY_THEN_FETCH search type, the dfs phase is run and its results are used in the query phase to make scoring accurate. When using CCS, depending on whether the DFS phase runs in the CCS coordinating node (like if all shards were local) or in each remote cluster (when minimizing round-trips), scoring will differ. This commit disables minimizing round-trips whenever DFS is requested, as it is not currently possible to ensure that scoring is accurate in that case. Relates to elastic#32125
javanna
added a commit
that referenced
this issue
Mar 14, 2019
When using DFS_QUERY_THEN_FETCH search type, the dfs phase is run and its results are used in the query phase to make scoring accurate. When using CCS, depending on whether the DFS phase runs in the CCS coordinating node (like if all shards were local) or in each remote cluster (when minimizing round-trips), scoring will differ. This commit disables minimizing round-trips whenever DFS is requested, as it is not currently possible to ensure that scoring is accurate in that case. Relates to #32125
javanna
added a commit
that referenced
this issue
Mar 18, 2019
When minimizing round-trips, each cluster returns its own independent search response. In case sort by field and/or field collapsing were requested, when one cluster has no results to return, the information about the field that sorting was based on (SortField array) as well as the field (and the values) that collapsing was performed on are missing in the search response. That causes problems as we can't build the proper `TopDocs` instance which would need to be either `TopFieldDocs` or `CollapseTopFieldDocs`. The merge routine expects that all the top docs are of the same exact type which can't be guaranteed. Given that the problematic results are empty, hence have no impact on the final results, we can simply skip them. Relates to #32125 Closes #40067
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Mar 19, 2019
) When using DFS_QUERY_THEN_FETCH search type, the dfs phase is run and its results are used in the query phase to make scoring accurate. When using CCS, depending on whether the DFS phase runs in the CCS coordinating node (like if all shards were local) or in each remote cluster (when minimizing round-trips), scoring will differ. This commit disables minimizing round-trips whenever DFS is requested, as it is not currently possible to ensure that scoring is accurate in that case. Relates to elastic#32125
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Mar 19, 2019
When minimizing round-trips, each cluster returns its own independent search response. In case sort by field and/or field collapsing were requested, when one cluster has no results to return, the information about the field that sorting was based on (SortField array) as well as the field (and the values) that collapsing was performed on are missing in the search response. That causes problems as we can't build the proper `TopDocs` instance which would need to be either `TopFieldDocs` or `CollapseTopFieldDocs`. The merge routine expects that all the top docs are of the same exact type which can't be guaranteed. Given that the problematic results are empty, hence have no impact on the final results, we can simply skip them. Relates to elastic#32125 Closes elastic#40067
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Mar 19, 2019
) When using DFS_QUERY_THEN_FETCH search type, the dfs phase is run and its results are used in the query phase to make scoring accurate. When using CCS, depending on whether the DFS phase runs in the CCS coordinating node (like if all shards were local) or in each remote cluster (when minimizing round-trips), scoring will differ. This commit disables minimizing round-trips whenever DFS is requested, as it is not currently possible to ensure that scoring is accurate in that case. Relates to elastic#32125
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Mar 19, 2019
When minimizing round-trips, each cluster returns its own independent search response. In case sort by field and/or field collapsing were requested, when one cluster has no results to return, the information about the field that sorting was based on (SortField array) as well as the field (and the values) that collapsing was performed on are missing in the search response. That causes problems as we can't build the proper `TopDocs` instance which would need to be either `TopFieldDocs` or `CollapseTopFieldDocs`. The merge routine expects that all the top docs are of the same exact type which can't be guaranteed. Given that the problematic results are empty, hence have no impact on the final results, we can simply skip them. Relates to elastic#32125 Closes elastic#40067
javanna
added a commit
that referenced
this issue
Mar 19, 2019
When using DFS_QUERY_THEN_FETCH search type, the dfs phase is run and its results are used in the query phase to make scoring accurate. When using CCS, depending on whether the DFS phase runs in the CCS coordinating node (like if all shards were local) or in each remote cluster (when minimizing round-trips), scoring will differ. This commit disables minimizing round-trips whenever DFS is requested, as it is not currently possible to ensure that scoring is accurate in that case. Relates to #32125
javanna
added a commit
that referenced
this issue
Mar 19, 2019
When minimizing round-trips, each cluster returns its own independent search response. In case sort by field and/or field collapsing were requested, when one cluster has no results to return, the information about the field that sorting was based on (SortField array) as well as the field (and the values) that collapsing was performed on are missing in the search response. That causes problems as we can't build the proper `TopDocs` instance which would need to be either `TopFieldDocs` or `CollapseTopFieldDocs`. The merge routine expects that all the top docs are of the same exact type which can't be guaranteed. Given that the problematic results are empty, hence have no impact on the final results, we can simply skip them. Relates to #32125 Closes #40067
javanna
added a commit
that referenced
this issue
Mar 19, 2019
When using DFS_QUERY_THEN_FETCH search type, the dfs phase is run and its results are used in the query phase to make scoring accurate. When using CCS, depending on whether the DFS phase runs in the CCS coordinating node (like if all shards were local) or in each remote cluster (when minimizing round-trips), scoring will differ. This commit disables minimizing round-trips whenever DFS is requested, as it is not currently possible to ensure that scoring is accurate in that case. Relates to #32125
javanna
added a commit
that referenced
this issue
Mar 19, 2019
When minimizing round-trips, each cluster returns its own independent search response. In case sort by field and/or field collapsing were requested, when one cluster has no results to return, the information about the field that sorting was based on (SortField array) as well as the field (and the values) that collapsing was performed on are missing in the search response. That causes problems as we can't build the proper `TopDocs` instance which would need to be either `TopFieldDocs` or `CollapseTopFieldDocs`. The merge routine expects that all the top docs are of the same exact type which can't be guaranteed. Given that the problematic results are empty, hence have no impact on the final results, we can simply skip them. Relates to #32125 Closes #40067
javanna
added a commit
that referenced
this issue
Mar 25, 2019
We recently introduced the option to minimize network roundtrips when executing cross-cluster search requests. All the changes made around that are separately unit tested, and there are some yaml tests that exercise the new code-path which involves multiple coordination steps. This commit adds new integration tests that compare the output given by CCS when running the same queries using the two different execution modes available. Relates to #32125
All the steps listed above are now completed. |
javanna
added a commit
to javanna/elasticsearch
that referenced
this issue
Mar 29, 2019
We recently introduced the option to minimize network roundtrips when executing cross-cluster search requests. All the changes made around that are separately unit tested, and there are some yaml tests that exercise the new code-path which involves multiple coordination steps. This commit adds new integration tests that compare the output given by CCS when running the same queries using the two different execution modes available. Relates to elastic#32125
javanna
added a commit
that referenced
this issue
Mar 29, 2019
We recently introduced the option to minimize network roundtrips when executing cross-cluster search requests. All the changes made around that are separately unit tested, and there are some yaml tests that exercise the new code-path which involves multiple coordination steps. This commit adds new integration tests that compare the output given by CCS when running the same queries using the two different execution modes available. Relates to #32125
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
The current implementation of cross-cluster search treats all the remote shards as if they were local, meaning that the coordinating node first figures out where all the shards are (remote plus local ones), then fans out to all of them. From then on the search operation works the same as if all shards were local (aka query then fetch). When a significant number of shards are involved in the request, this can cause problems especially if the connection between the coordinating node and the remote clusters is not fast enough. In fact, the coordinating node will send as many shard level requests as the number of shards that are involved in the search request, which causes a significant number of round-trips between the coordinating node and the remote clusters.
In this meta issue we will outline the steps needed to implement a cross-cluster search alternate execution mode that reduces the round-trips required, which should improve its performance and scalability in certain scenarios. This task is currently at POC stage, hence we don't know if/when such changes are going to be released. The idea is to allow for multiple coordination steps in each remote cluster, so that the cross-cluster coordinating node sends out only one request per remote cluster, each of which will have its own coordinating node returning enough information for the last coordination step to be performed, which consists of reducing results coming from different clusters in one result set to be returned.
Trade-off: as we reduce the number of round-trips, the single response that is returned from each remote cluster will need to contain more info than a normal search response. Pagination will require from+size results to be requested to each cluster, terms aggregation will need a mechanism similar to
shard_size
, per cluster, in order not to lose accuracy, and hits will be fetched (at least initially) in each remote cluster, meaning that most of the documents fetched will not be returned to the end user.The first phase of the POC focuses on splitting the CCS request into one search request per cluster, and being able to merge the resulting search responses in one to be returned to the end user. The alternate execution mode will use the transport layer like CCS already does. This allows us to optionally serialize additional information as part of a search response without affecting the REST response of the search API. The following are the identified steps needed so far:
Benchmark the preliminary alternate execution mode and identify what scenarios are positively impacted and which scenarios are better off using the ordinary execution mode
Add missing info to
SearchResponse
returned by each cluster: no change is needed for sort by score, as the response already holds all the needed info. When sorting by field, we need to reconstruct theSortField[]
(to rebuild theTopFieldDocs
), which depends on mappings in the remote clusters. We also need the collapse field and collapse values in case field collapsing is used, which could be deducted from iterating all the hits and their fields but it's better to have them summarized in theSearcHits
likeCollapseFieldTopDocs
does. Also, for each hit we need the raw sorting values, from before they went throughDocValueFormat#format
. These bits of information need to be added to theSearchResponse
. One option could be to optionally serializeTopDocs
entirely from each cluster. Given though that most of the needed information is already available in other places, the size ofTopDocs
and the fact that it most likely cannot be used as-is in the coordinating node but it needs some modifications, we could instead serialize only the missing info as part of theSearchResponse
(Add sort and collapse info to SearchHits transport serialization #36555 & Add raw sort values to SearchSortValues transport serialization #36617)Make sure that all clusters involved use the same current time value when executing time-based queries. This should be defined in the coordinating node and serialized to the remote clusters (Add support for providing absolute start time to SearchRequest #37142)
Make it possible to disable the single shard optimization when determining shard_size for certain types of aggregations (see https://github.com/elastic/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketUtils.java#L42). Although we may be executing against a single shard per remote cluster, we will need to merge results coming from multiple clusters hence we need to make sure that we always collect enough info from each shard and never optimize for the single shard scenario (Remove single shard optimization when suggesting shard_size #37041)
Add a way to provide an index prefix to each search requests, so that the
_index
field will be valued with the local index name prefixed with the cluster alias. Note that this info is not known locally, hence needs to be provided by the CCS coordinating node. This is needed for scenarios where users may be aggregating on the_index
field, or when errors from each clusters are returned and the index metadata in the exception must contain the cluster alias to know where the error comes from (Add support for local cluster alias to SearchRequest #36997)Add support for disabling the final reduction phase for aggregations. Terms aggregations should not be truncated, and pipeline aggs should not be executed locally in each cluster. The final reduction phase needs to be executed later by the CCS coordinating node once it has received all the results coming from the remote clusters involved. (Skip final reduction if SearchRequest holds a cluster alias #37000)
Allow to skip max number of buckets check in non final reduction phase, as the limit is expected to be reached much more easily when aggregations are reduced but not pruned (Enforce max_buckets limit only in the final reduction phase #36152)
Add support for merging multiple
SearchResponse
objects which contain all the needed info (added in the steps above) into one (Add support for merging multiple search responses into one #37566)Add support for alternate execution mode to
TransportSearchAction
, with one request per cluster, in which each cluster performs remote reduction. The coordinating node makes one search call per cluster and merges the search responses obtained into one, then returns it (Streamline skip_unavailable handling #37672 & Introduce ability to minimize round-trips in CCS #37828)enable remote reduction automatically, and add a an option to opt-out in the search request (Introduce ability to minimize round-trips in CCS #37828)
Introduce tests that compare results obtained from the two CCS execution modes and make sure that the output is exactly the same ( Add integration tests to verify CCS output #40038)
Limitations (in all of these cases we automatically fall-back to ordinary CCS):
The text was updated successfully, but these errors were encountered: