Skip to content

Commit

Permalink
Merge pull request #35463 from dimagi/ap/fix-size-estimation-for-reindex
Browse files Browse the repository at this point in the history
Fix reindex command and some es utils
  • Loading branch information
AmitPhulera authored Dec 13, 2024
2 parents af29859 + 079beeb commit bcc5b6d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 27 deletions.
52 changes: 33 additions & 19 deletions corehq/apps/es/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ def _validate_single_index(index):

def reindex(
self, source, dest, wait_for_completion=False,
refresh=False, batch_size=1000, requests_per_second=None, copy_doc_ids=True, query=None,
refresh=False, batch_size=1000, requests_per_second=None,
copy_doc_ids=True, query=None, purge_ids=False
):
"""
Starts the reindex process in elastic search cluster
Expand All @@ -416,6 +417,7 @@ def reindex(
and can be reduced if you encounter scroll timeouts.
:param query: ``dict`` optional parameter to include a term query to filter which documents are included in
the reindex
:param purge_ids: ``bool`` if True, will remove the _id field from the documents
:returns: None if wait_for_completion is True else would return task_id of reindex task
"""

Expand All @@ -435,27 +437,35 @@ def reindex(
"conflicts": "proceed"
}

# Should be removed after ES 5-6 migration
if copy_doc_ids and source == const.HQ_USERS_INDEX_NAME:
# Remove password from form index
if copy_doc_ids or purge_ids:
reindex_body["script"] = {
"lang": "painless",
"source": """
ctx._source.remove('password');
if (!ctx._source.containsKey('doc_id')) {
ctx._source['doc_id'] = ctx._id;
}
"""
}
elif copy_doc_ids:
reindex_body["script"] = {
"lang": "painless",
"source": """
if (!ctx._source.containsKey('doc_id')) {
ctx._source['doc_id'] = ctx._id;
}
"""
"source": ""
}
script_parts = []

if purge_ids:
script_parts.append("""
if (ctx._source.containsKey('_id')) {
ctx._source.remove('_id');
}
""")

if source == const.HQ_USERS_INDEX_NAME:
# Remove password field from users index
script_parts.append("""
ctx._source.remove('password');
""")

if copy_doc_ids:
# Add doc_id field to the documents
script_parts.append("""
if (!ctx._source.containsKey('doc_id')) {
ctx._source['doc_id'] = ctx._id;
}
""")

reindex_body["script"]["source"] = " ".join(script_parts)

reindex_kwargs = {
"wait_for_completion": wait_for_completion,
Expand Down Expand Up @@ -1118,6 +1128,10 @@ def __init__(self, primary_adapter, secondary_adapter):
def mapping(self):
return self.primary.mapping

@property
def parent_index_cname(self):
return self.primary.parent_index_cname

def export_adapter(self):
adapter = copy.copy(self)
adapter.primary = adapter.primary.export_adapter()
Expand Down
25 changes: 18 additions & 7 deletions corehq/apps/es/management/commands/elastic_sync_multiplexed.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ESSyncUtil:
def __init__(self):
self.es = get_client()

def start_reindex(self, cname, reindex_batch_size=1000, requests_per_second=None):
def start_reindex(self, cname, reindex_batch_size=1000, requests_per_second=None, purge_ids=False):

adapter = doc_adapter_from_cname(cname)

Expand All @@ -57,7 +57,8 @@ def start_reindex(self, cname, reindex_batch_size=1000, requests_per_second=None

logger.info("Starting ReIndex process")
task_id = es_manager.reindex(
source_index, destination_index, requests_per_second=requests_per_second
source_index, destination_index,
requests_per_second=requests_per_second, batch_size=reindex_batch_size, purge_ids=purge_ids
)
logger.info(f"Copying docs from index {source_index} to index {destination_index}")
task_number = task_id.split(':')[1]
Expand Down Expand Up @@ -303,7 +304,7 @@ def _copy_checkpoints(self, pillow, new_checkpoint_id):

def estimate_disk_space_for_reindex(self, stdout=None):
indices_info = es_manager.indices_info()
index_cname_map = self._get_index_name_cname_map()
index_cname_map = self._get_index_name_cname_map(ignore_subindices=True)
index_size_rows = []
total_size = 0
for index_name in index_cname_map.keys():
Expand All @@ -320,8 +321,13 @@ def estimate_disk_space_for_reindex(self, stdout=None):
print("\n\n")
print(f"Minimum free disk space recommended before starting the reindex: {recommended_disk}")

def _get_index_name_cname_map(self):
return {adapter.index_name: cname for cname, adapter in CANONICAL_NAME_ADAPTER_MAP.items()}
def _get_index_name_cname_map(self, ignore_subindices=False):
index_name_cname_map = {}
for cname, adapter in CANONICAL_NAME_ADAPTER_MAP.items():
if ignore_subindices and adapter.parent_index_cname:
continue
index_name_cname_map[adapter.index_name] = cname
return index_name_cname_map

def _format_bytes(self, size):
units = ['B', 'KB', 'MB', 'GB', 'TB']
Expand Down Expand Up @@ -463,7 +469,7 @@ class Command(BaseCommand):
For getting current count of both the indices
```bash
/manage.py elastic_sync_multiplexed display_doc_counts <index_cname>
./manage.py elastic_sync_multiplexed display_doc_counts <index_cname>
```
For getting current shard allocation status for the cluster
Expand Down Expand Up @@ -602,7 +608,12 @@ def handle(self, **options):
sub_cmd = options['sub_command']
cmd_func = options.get('func')
if sub_cmd == 'start':
cmd_func(options['index_cname'], options['batch_size'], options['requests_per_second'])
cmd_func(
options['index_cname'],
options['batch_size'],
options['requests_per_second'],
options['purge_ids']
)
elif sub_cmd == 'delete':
cmd_func(options['index_cname'])
elif sub_cmd == 'cleanup' or sub_cmd == 'display_doc_counts':
Expand Down
9 changes: 8 additions & 1 deletion corehq/apps/hqadmin/views/data.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import json

from django.conf import settings
from django.http import Http404, HttpResponse, JsonResponse
from django.shortcuts import render
from django.utils.translation import gettext as _

from corehq.apps.domain.decorators import require_superuser
from corehq.apps.es.es_query import ESQuery
from corehq.apps.es.transient_util import iter_index_cnames
from corehq.apps.es.transient_util import doc_adapter_from_cname, iter_index_cnames
from corehq.apps.hqwebapp.doc_lookup import (
get_databases,
get_db_from_db_name,
Expand All @@ -31,6 +32,12 @@ def to_json(doc):
found_indices = {}
es_doc_type = None
for index in iter_index_cnames():
if not settings.IS_SAAS_ENVIRONMENT:
# If we're not in a SaaS environment, we don't need to check the sub-indices
# because they were not created in non-saas environments.
doc_adapter = doc_adapter_from_cname(index)
if doc_adapter.parent_index_cname:
continue
es_doc = lookup_doc_in_es(doc_id, index)
if es_doc:
found_indices[index] = to_json(es_doc)
Expand Down

0 comments on commit bcc5b6d

Please sign in to comment.