Skip to content

Commit

Permalink
fix: change QuerySetProxy to be able to use other QuerySet methods
Browse files Browse the repository at this point in the history
  • Loading branch information
SquakR committed Mar 1, 2022
1 parent 8583f8e commit 5823820
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 31 deletions.
53 changes: 33 additions & 20 deletions graphene_django_filter/filterset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import warnings
from collections import OrderedDict
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, cast
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Type, Union, cast

from django.db import connection, models
from django.db.models.constants import LOOKUP_SEP
Expand All @@ -31,17 +31,29 @@ class QuerySetProxy(ObjectProxy):

__slots__ = 'q'

def __init__(self, wrapped: models.QuerySet) -> None:
def __init__(self, wrapped: models.QuerySet, q: Optional[models.Q] = None) -> None:
super().__init__(wrapped)
self.q = models.Q()
self.q = q or models.Q()

def __getattr__(self, name: str) -> Any:
"""Return QuerySet attributes for all cases except `filter` and `exclude`."""
if name == 'filter':
return self.filter_
elif name == 'exclude':
return self.exclude_
return super().__getattr__(name)
attr = super().__getattr__(name)
if callable(attr):
def func(*args, **kwargs) -> Any:
result = attr(*args, **kwargs)
if isinstance(result, models.QuerySet):
return QuerySetProxy(result, self.q)
return result
return func
return attr

def __iter__(self) -> Iterator[Any]:
"""Return QuerySet and Q objects."""
return iter([self.__wrapped__, self.q])

def filter_(self, *args, **kwargs) -> 'QuerySetProxy':
"""Replace the `filter` method of the QuerySet class."""
Expand All @@ -62,13 +74,6 @@ def exclude_(self, *args, **kwargs) -> 'QuerySetProxy':
return self


def get_q(queryset: models.QuerySet, filter_obj: Filter, value: Any) -> models.Q:
"""Return a Q object for a queryset, filter and value."""
queryset_proxy = QuerySetProxy(queryset)
filter_obj.filter(queryset_proxy, value)
return queryset_proxy.q


def is_full_text_search_lookup_expr(lookup_expr: str) -> bool:
"""Determine if a lookup_expr is a full text search expression."""
return lookup_expr.split(LOOKUP_SEP)[-1] == 'full_text_search'
Expand Down Expand Up @@ -176,25 +181,33 @@ def find_filter(self, data_key: str) -> Filter:

def filter_queryset(self, queryset: models.QuerySet) -> models.QuerySet:
"""Filter a queryset with a top level form's `cleaned_data`."""
return queryset.filter(self.get_q_for_form(queryset, self.form))
qs, q = self.get_queryset_proxy_for_form(queryset, self.form)
return qs.filter(q)

def get_q_for_form(
def get_queryset_proxy_for_form(
self,
queryset: models.QuerySet,
form: Union[Form, TreeFormMixin],
) -> models.Q:
"""Return a Q object for a form's `cleaned_data` using `and`, `or` or `not` operator."""
) -> QuerySetProxy:
"""Return a `QuerySetProxy` object for a form's `cleaned_data`."""
qs = queryset
q = models.Q()
for name, value in form.cleaned_data.items():
q = q & get_q(queryset, self.find_filter(name), value)
qs, q = self.find_filter(name).filter(QuerySetProxy(qs, q), value)
and_q = models.Q()
for and_form in form.and_forms:
and_q = and_q & self.get_q_for_form(queryset, and_form)
qs, new_q = self.get_queryset_proxy_for_form(qs, and_form)
and_q = and_q & new_q
or_q = models.Q()
for or_form in form.or_forms:
or_q = or_q | self.get_q_for_form(queryset, or_form)
not_q = ~self.get_q_for_form(queryset, form.not_form) if form.not_form else models.Q()
return q & and_q & or_q & not_q
qs, new_q = self.get_queryset_proxy_for_form(qs, or_form)
or_q = or_q | new_q
if form.not_form:
qs, new_q = self.get_queryset_proxy_for_form(queryset, form.not_form)
not_q = ~new_q
else:
not_q = models.Q()
return QuerySetProxy(qs, q & and_q & or_q & not_q)

@classmethod
def get_filters(cls) -> OrderedDict:
Expand Down
18 changes: 7 additions & 11 deletions tests/test_filterset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,11 @@
from django.db import models
from django.test import TestCase
from django.utils.timezone import make_aware
from django_filters import CharFilter, Filter
from django_filters import CharFilter
from graphene_django_filter.filters import SearchQueryFilter, SearchRankFilter, TrigramFilter
from graphene_django_filter.filterset import (
AdvancedFilterSet,
QuerySetProxy,
get_q,
is_full_text_search_lookup_expr,
is_regular_lookup_expr,
)
Expand All @@ -31,7 +30,10 @@ def test_queryset_proxy(self) -> None:
"""Test the `QuerySetProxy` class."""
queryset = User.objects.all()
queryset_proxy = QuerySetProxy(queryset)
self.assertEqual(queryset.get, queryset_proxy.get)
self.assertIsInstance(
queryset_proxy.annotate(number=models.F('id') + models.Value('#')),
QuerySetProxy,
)
self.assertNotEqual(queryset.filter, queryset_proxy.filter)
self.assertNotEqual(queryset.exclude, queryset_proxy.exclude)
queryset_proxy.filter(email__contains='kate').exclude(
Expand All @@ -43,13 +45,7 @@ def test_queryset_proxy(self) -> None:
),
queryset_proxy.q,
)

def test_get_q(self) -> None:
"""Test the `test_get_q` function."""
queryset = User.objects.all()
filter_obj = Filter(field_name='first_name', lookup_expr='exact')
q = get_q(queryset, filter_obj, 'John')
self.assertEqual(models.Q(first_name__exact='John'), q)
self.assertEqual([queryset_proxy.__wrapped__, queryset_proxy.q], list(queryset_proxy))

def test_is_full_text_search_lookup(self) -> None:
"""Test the `is_full_text_search_lookup` function."""
Expand Down Expand Up @@ -165,7 +161,7 @@ class Meta:

@classmethod
def setUpClass(cls) -> None:
"""Set up `AdvancedFilterSetTest` class tests."""
"""Set up `AdvancedFilterSetTest` class."""
super().setUpClass()
generate_data()

Expand Down

0 comments on commit 5823820

Please sign in to comment.