diff --git a/corehq/apps/es/client.py b/corehq/apps/es/client.py index 1a8a2267fe08..f0618a08c456 100644 --- a/corehq/apps/es/client.py +++ b/corehq/apps/es/client.py @@ -399,7 +399,8 @@ def _validate_single_index(index): def reindex( self, source, dest, wait_for_completion=False, - refresh=False, batch_size=1000, requests_per_second=None, copy_doc_ids=True, query=None, + refresh=False, batch_size=1000, requests_per_second=None, + copy_doc_ids=True, query=None, purge_ids=False ): """ Starts the reindex process in elastic search cluster @@ -416,6 +417,7 @@ def reindex( and can be reduced if you encounter scroll timeouts. :param query: ``dict`` optional parameter to include a term query to filter which documents are included in the reindex + :param purge_ids: ``bool`` if True, will remove the _id field from the documents :returns: None if wait_for_completion is True else would return task_id of reindex task """ @@ -435,27 +437,35 @@ def reindex( "conflicts": "proceed" } - # Should be removed after ES 5-6 migration - if copy_doc_ids and source == const.HQ_USERS_INDEX_NAME: - # Remove password from form index + if copy_doc_ids or purge_ids: reindex_body["script"] = { "lang": "painless", - "source": """ - ctx._source.remove('password'); - if (!ctx._source.containsKey('doc_id')) { - ctx._source['doc_id'] = ctx._id; - } - """ - } - elif copy_doc_ids: - reindex_body["script"] = { - "lang": "painless", - "source": """ - if (!ctx._source.containsKey('doc_id')) { - ctx._source['doc_id'] = ctx._id; - } - """ + "source": "" } + script_parts = [] + + if purge_ids: + script_parts.append(""" + if (ctx._source.containsKey('_id')) { + ctx._source.remove('_id'); + } + """) + + if source == const.HQ_USERS_INDEX_NAME: + # Remove password field from users index + script_parts.append(""" + ctx._source.remove('password'); + """) + + if copy_doc_ids: + # Add doc_id field to the documents + script_parts.append(""" + if (!ctx._source.containsKey('doc_id')) { + ctx._source['doc_id'] = ctx._id; + } + """) + + reindex_body["script"]["source"] = " ".join(script_parts) reindex_kwargs = { "wait_for_completion": wait_for_completion, @@ -1118,6 +1128,10 @@ def __init__(self, primary_adapter, secondary_adapter): def mapping(self): return self.primary.mapping + @property + def parent_index_cname(self): + return self.primary.parent_index_cname + def export_adapter(self): adapter = copy.copy(self) adapter.primary = adapter.primary.export_adapter() diff --git a/corehq/apps/es/management/commands/elastic_sync_multiplexed.py b/corehq/apps/es/management/commands/elastic_sync_multiplexed.py index 99f7781f4e18..2367fb03b5e6 100644 --- a/corehq/apps/es/management/commands/elastic_sync_multiplexed.py +++ b/corehq/apps/es/management/commands/elastic_sync_multiplexed.py @@ -42,7 +42,7 @@ class ESSyncUtil: def __init__(self): self.es = get_client() - def start_reindex(self, cname, reindex_batch_size=1000, requests_per_second=None): + def start_reindex(self, cname, reindex_batch_size=1000, requests_per_second=None, purge_ids=False): adapter = doc_adapter_from_cname(cname) @@ -57,7 +57,8 @@ def start_reindex(self, cname, reindex_batch_size=1000, requests_per_second=None logger.info("Starting ReIndex process") task_id = es_manager.reindex( - source_index, destination_index, requests_per_second=requests_per_second + source_index, destination_index, + requests_per_second=requests_per_second, batch_size=reindex_batch_size, purge_ids=purge_ids ) logger.info(f"Copying docs from index {source_index} to index {destination_index}") task_number = task_id.split(':')[1] @@ -303,7 +304,7 @@ def _copy_checkpoints(self, pillow, new_checkpoint_id): def estimate_disk_space_for_reindex(self, stdout=None): indices_info = es_manager.indices_info() - index_cname_map = self._get_index_name_cname_map() + index_cname_map = self._get_index_name_cname_map(ignore_subindices=True) index_size_rows = [] total_size = 0 for index_name in index_cname_map.keys(): @@ -320,8 +321,13 @@ def estimate_disk_space_for_reindex(self, stdout=None): print("\n\n") print(f"Minimum free disk space recommended before starting the reindex: {recommended_disk}") - def _get_index_name_cname_map(self): - return {adapter.index_name: cname for cname, adapter in CANONICAL_NAME_ADAPTER_MAP.items()} + def _get_index_name_cname_map(self, ignore_subindices=False): + index_name_cname_map = {} + for cname, adapter in CANONICAL_NAME_ADAPTER_MAP.items(): + if ignore_subindices and adapter.parent_index_cname: + continue + index_name_cname_map[adapter.index_name] = cname + return index_name_cname_map def _format_bytes(self, size): units = ['B', 'KB', 'MB', 'GB', 'TB'] @@ -463,7 +469,7 @@ class Command(BaseCommand): For getting current count of both the indices ```bash - /manage.py elastic_sync_multiplexed display_doc_counts + ./manage.py elastic_sync_multiplexed display_doc_counts ``` For getting current shard allocation status for the cluster @@ -602,7 +608,12 @@ def handle(self, **options): sub_cmd = options['sub_command'] cmd_func = options.get('func') if sub_cmd == 'start': - cmd_func(options['index_cname'], options['batch_size'], options['requests_per_second']) + cmd_func( + options['index_cname'], + options['batch_size'], + options['requests_per_second'], + options['purge_ids'] + ) elif sub_cmd == 'delete': cmd_func(options['index_cname']) elif sub_cmd == 'cleanup' or sub_cmd == 'display_doc_counts': diff --git a/corehq/apps/hqadmin/views/data.py b/corehq/apps/hqadmin/views/data.py index 7d0ac1a1ae29..73bb34cd46a6 100644 --- a/corehq/apps/hqadmin/views/data.py +++ b/corehq/apps/hqadmin/views/data.py @@ -1,12 +1,13 @@ import json +from django.conf import settings from django.http import Http404, HttpResponse, JsonResponse from django.shortcuts import render from django.utils.translation import gettext as _ from corehq.apps.domain.decorators import require_superuser from corehq.apps.es.es_query import ESQuery -from corehq.apps.es.transient_util import iter_index_cnames +from corehq.apps.es.transient_util import doc_adapter_from_cname, iter_index_cnames from corehq.apps.hqwebapp.doc_lookup import ( get_databases, get_db_from_db_name, @@ -31,6 +32,12 @@ def to_json(doc): found_indices = {} es_doc_type = None for index in iter_index_cnames(): + if not settings.IS_SAAS_ENVIRONMENT: + # If we're not in a SaaS environment, we don't need to check the sub-indices + # because they were not created in non-saas environments. + doc_adapter = doc_adapter_from_cname(index) + if doc_adapter.parent_index_cname: + continue es_doc = lookup_doc_in_es(doc_id, index) if es_doc: found_indices[index] = to_json(es_doc)