Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize case queries in dump_domain_data #34010

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion corehq/apps/dump_reload/sql/dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
FilteredModelIteratorBuilder('form_processor.CommCareCase', SimpleFilter('domain')),
FilteredModelIteratorBuilder('form_processor.CommCareCaseIndex', SimpleFilter('domain')),
FilteredModelIteratorBuilder('form_processor.CaseAttachment', CaseIDFilter()),
FilteredModelIteratorBuilder('form_processor.CaseTransaction', CaseIDFilter()),
FilteredModelIteratorBuilder('form_processor.CaseTransaction', CaseIDFilter(), {'case_id': 'gte', 'pk': 'gt'}),
FilteredModelIteratorBuilder('form_processor.LedgerValue', SimpleFilter('domain')),
FilteredModelIteratorBuilder('form_processor.LedgerTransaction', CaseIDFilter()),

Expand Down
9 changes: 7 additions & 2 deletions corehq/apps/dump_reload/sql/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def get_ids(self, domain_name, db_alias=None):

class CaseIDFilter(IDFilter):
def __init__(self, case_field='case'):
super().__init__(case_field, None)
super().__init__(case_field, None, chunksize=500)

def count(self, domain_name):
active_case_count = len(CommCareCase.objects.get_case_ids_in_domain(domain_name))
Expand Down Expand Up @@ -143,9 +143,14 @@ def build(self, domain, model_class, db_alias):


class FilteredModelIteratorBuilder(UnfilteredModelIteratorBuilder):
def __init__(self, model_label, filter):
def __init__(self, model_label, filter, paginate_by={}):
"""
:param paginate_by: optional dictionary where key is a field name and the value is a conditional
For example, {'username': 'gt'}
gherceg marked this conversation as resolved.
Show resolved Hide resolved
"""
super(FilteredModelIteratorBuilder, self).__init__(model_label)
self.filter = filter
self.paginate_by = paginate_by

def build(self, domain, model_class, db_alias):
return self.__class__(self.model_label, self.filter).prepare(domain, model_class, db_alias)
Expand Down
22 changes: 17 additions & 5 deletions corehq/util/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def paginated_queryset(queryset, chunk_size):
return


def queryset_to_iterator(queryset, model_cls, limit=500, ignore_ordering=False):
def queryset_to_iterator(queryset, model_cls, limit=500, ignore_ordering=False, paginate_by={}):
"""
Pull from queryset in chunks. This is suitable for deep pagination, but
cannot be used with ordered querysets (results will be sorted by pk).
Expand All @@ -108,12 +108,24 @@ def queryset_to_iterator(queryset, model_cls, limit=500, ignore_ordering=False):
raise AssertionError("queryset_to_iterator does not respect ordering. "
"Pass ignore_ordering=True to continue.")

pk_field = model_cls._meta.pk.name
queryset = queryset.order_by(pk_field)
if not paginate_by:
pk_field = model_cls._meta.pk.name
paginate_by = {pk_field: "gt"}

queryset = queryset.order_by(*list(paginate_by.keys()))
docs = queryset[:limit]
while docs:
doc_count = 0
for doc in docs:
doc_count += 1
yield doc
gherceg marked this conversation as resolved.
Show resolved Hide resolved

last_doc_pk = getattr(doc, pk_field)
docs = queryset.filter(**{pk_field + "__gt": last_doc_pk})[:limit]
if doc_count < limit:
break

last_doc_values = {}
for field, condition in paginate_by.items():
key = f"{field}__{condition}"
last_doc_values[key] = getattr(doc, field)

docs = queryset.filter(**last_doc_values)[:limit]
91 changes: 64 additions & 27 deletions corehq/util/tests/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,82 @@


class TestQuerysetToIterator(TestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.users = [
User.objects.create_user(f'user{i}@example.com', last_name="Tenenbaum")
for i in range(1, 11)
]

@classmethod
def tearDownClass(cls):
for user in cls.users:
user.delete()
super().tearDownClass()
def test_correct_results_are_returned(self):
query = User.objects.filter(last_name="Tenenbaum")

results = list(queryset_to_iterator(query, User, limit=10))

self.assertEqual(
[u.username for u in results],
[u.username for u in self.users],
)

def test_results_returned_in_one_query_if_limit_is_greater_than_result_size(self):
query = User.objects.filter(last_name="Tenenbaum")

with self.assertNumQueries(1):
# query 1: Users 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
results = list(queryset_to_iterator(query, User, limit=11))

self.assertEqual(len(results), 10)

def test_results_returned_in_two_queries_if_limit_is_equal_to_result_size(self):
query = User.objects.filter(last_name="Tenenbaum")

def test_queryset_to_iterator(self):
with self.assertNumQueries(2):
# query 1: Users 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
# query 2: Check that there are no users past #10
results = list(queryset_to_iterator(query, User, limit=10))

self.assertEqual(len(results), 10)

def test_results_return_in_three_queries_if_limit_is_less_than_or_equal_to_half_of_result_size(self):
query = User.objects.filter(last_name="Tenenbaum")
self.assertEqual(query.count(), 10)

with self.assertNumQueries(4):
with self.assertNumQueries(3):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@esoergel given this behavior was defined in tests as well, I wanted to clarify if it was intentional. Basically, is there a concern that breaking the pagination loop when the # of docs returned in for a page is less than the limit set could lead to prematurely exiting pagination? This maps to the addition of:

        if doc_count < limit:
            break

in queryset_to_iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(also I assumed you added this based on commit history, but I admittedly did not look at it very hard so if you don't have context, totally fine)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think I added this - that does seem reasonable, not sure why I didn't do that in the first place. Reviewing this now, my initial thought was that the dataset can change while the query is executing, which could cause weirdness, but the limit is part of that last query, so the number of results returned should be a valid data point in determining whether there are any remaining.

# query 1: Users 1, 2, 3, 4
# query 2: Users 5, 6, 7, 8
# query 3: Users 9, 10
# query 4: Check that there are no users past #10
all_users = list(queryset_to_iterator(query, User, limit=4))
results = list(queryset_to_iterator(query, User, limit=4))

self.assertEqual(
[u.username for u in all_users],
[u.username for u in self.users],
)
self.assertEqual(len(results), 10)

def test_ordered_queryset(self):
def test_ordered_queryset_raises_assertion_error_when_ignore_ordering_is_false(self):
query = User.objects.filter(last_name="Tenenbaum").order_by('username')

with self.assertRaises(AssertionError):
# ignore_ordering defaults to False
list(queryset_to_iterator(query, User, limit=4))

def test_ordered_queryset_ignored(self):
def test_ordered_queryset_does_not_raise_assertion_error_when_ignore_ordering_is_true(self):
query = User.objects.filter(last_name="Tenenbaum").order_by('username')
all_users = list(queryset_to_iterator(query, User, limit=4, ignore_ordering=True))
# test succeeds is AssertionError is not raised
list(queryset_to_iterator(query, User, limit=4, ignore_ordering=True))

def test_results_ordered_by_pagination_key_when_paginate_by_is_defined(self):
query = User.objects.filter(last_name="Tenenbaum")

results = list(queryset_to_iterator(query, User, limit=4, paginate_by={"username": "gt"}))

self.assertEqual(
[u.username for u in all_users],
[u.username for u in self.users],
)
[u.username for u in results],
['alice-user4@example.com',
'alice-user8@example.com',
'bob-user1@example.com',
'bob-user5@example.com',
'bob-user9@example.com',
'jane-user3@example.com',
'jane-user7@example.com',
'john-user10@example.com',
'john-user2@example.com',
'john-user6@example.com'])

@classmethod
def setUpClass(cls):
super().setUpClass()
first_names = ['alice', 'bob', 'john', 'jane']
cls.users = [
User.objects.create_user(f'{first_names[i % 4]}-user{i}@example.com', last_name="Tenenbaum")
for i in range(1, 11)
]
Loading