Skip to content

Commit

Permalink
feat: change generation of full text search filters
Browse files Browse the repository at this point in the history
  • Loading branch information
SquakR committed Feb 17, 2022
1 parent f4effe7 commit e32301f
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 144 deletions.
146 changes: 81 additions & 65 deletions graphene_django_filter/filterset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import warnings
from collections import OrderedDict
from typing import Any, Dict, Iterable, List, Optional, Tuple, Type, Union, cast
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, cast

from django.db import connection, models
from django.db.models.constants import LOOKUP_SEP
Expand Down Expand Up @@ -92,28 +92,16 @@ def get_q(queryset: models.QuerySet, filter_obj: Filter, value: Any) -> models.Q
return queryset_proxy.q


def is_special_lookup_expr(lookup_expr: str) -> bool:
"""Determine whether the lookup_expr must be processed in a special way."""
def is_full_text_search_lookup_expr(lookup_expr: str) -> bool:
"""Determine if a lookup_expr is a full text search expression."""
return lookup_expr.split(LOOKUP_SEP)[-1] == 'full_text_search'


def get_full_text_search_filter_classes() -> Iterable[Type[Union[Filter, Any]]]:
"""Return available full text search filter classes."""
if not settings.IS_POSTGRESQL:
warnings.warn(
f'Full text search is not available because the {connection.vendor} vendor is '
'used instead of the postgresql vendor.',
)
return ()
elif not settings.HAS_TRIGRAM_EXTENSION:
from .filters import SearchQueryFilter, SearchRankFilter
warnings.warn(
'Trigram search is not available because the `pg_trgm` extension is not installed.',
)
return SearchQueryFilter, SearchRankFilter,
else:
from .filters import SearchQueryFilter, SearchRankFilter, TrigramFilter
return SearchQueryFilter, SearchRankFilter, TrigramFilter
def is_regular_lookup_expr(lookup_expr: str) -> bool:
"""Determine whether the lookup_expr must be processed in a regular way."""
return not any([
is_full_text_search_lookup_expr(lookup_expr),
])


class AdvancedFilterSet(BaseFilterSet, metaclass=FilterSetMetaclass):
Expand Down Expand Up @@ -232,63 +220,91 @@ def get_q_for_form(
return q & and_q & or_q & not_q

@classmethod
def get_fields(cls) -> OrderedDict:
"""Resolve the `Meta.fields` argument including only regular lookups."""
return cls._get_fields(is_special=False)
def get_filters(cls) -> OrderedDict:
"""Get all filters for the filterset.
@classmethod
def get_special_fields(cls) -> OrderedDict:
"""Resolve the `Meta.fields` argument including only special lookups."""
return cls._get_fields(is_special=True)
This is the combination of declared and generated filters.
"""
filters = super().get_filters()
if not cls._meta.model:
return filters
return OrderedDict([
*filters.items(),
*cls.create_full_text_search_filters(filters).items(),
])

@classmethod
def create_full_text_search_filters(
cls,
base_filters: OrderedDict,
field_name: str,
) -> OrderedDict:
"""Create available full text search filters."""
full_text_search_filters = OrderedDict()
for filter_class in get_full_text_search_filter_classes():
for lookup_expr in filter_class.available_lookups:
filter_name = cls.get_filter_name(
f'{field_name}{LOOKUP_SEP}{filter_class.postfix}',
lookup_expr,
new_filters = OrderedDict()
full_text_search_fields = cls.get_full_text_search_fields()
if not len(full_text_search_fields):
return new_filters
if not settings.IS_POSTGRESQL:
warnings.warn(
f'Full text search is not available because the {connection.vendor} vendor is '
'used instead of the postgresql vendor.',
)
return new_filters
from .filters import SearchQueryFilter, SearchRankFilter, TrigramFilter
new_filters = OrderedDict([
*new_filters.items(),
*cls.create_special_filters(base_filters, SearchQueryFilter).items(),
*cls.create_special_filters(base_filters, SearchRankFilter).items(),
])
if not settings.HAS_TRIGRAM_EXTENSION:
warnings.warn(
'Trigram search is not available because the `pg_trgm` extension is not installed.',
)
return new_filters
for field_name in full_text_search_fields:
new_filters = OrderedDict([
*new_filters.items(),
*cls.create_special_filters(base_filters, TrigramFilter, field_name).items(),
])
return new_filters

@classmethod
def create_special_filters(
cls,
base_filters: OrderedDict,
filter_class: Union[Type[Filter], Any],
field_name: Optional[str] = None,
) -> OrderedDict:
"""Create special filters using a filter class and a field name."""
new_filters = OrderedDict()
for lookup_expr in filter_class.available_lookups:
if field_name:
postfix_field_name = f'{field_name}{LOOKUP_SEP}{filter_class.postfix}'
else:
postfix_field_name = filter_class.postfix
filter_name = cls.get_filter_name(postfix_field_name, lookup_expr)
if filter_name not in base_filters:
new_filters[filter_name] = filter_class(
field_name=postfix_field_name,
lookup_expr=lookup_expr,
)
if filter_name not in base_filters:
full_text_search_filters[filter_name] = filter_class(
field_name=field_name,
lookup_expr=lookup_expr,
)
return full_text_search_filters
return new_filters

@classmethod
def get_filters(cls) -> OrderedDict:
"""Get all filters for the filterset.
def get_fields(cls) -> OrderedDict:
"""Resolve the `Meta.fields` argument including only regular lookups."""
return cls._get_fields(is_regular_lookup_expr)

This is the combination of declared and generated filters.
"""
filters = super().get_filters()
if not cls._meta.model:
return filters
for field_name, lookups in cls.get_special_fields().items():
for lookup_expr in lookups:
if lookup_expr == 'full_text_search':
filters = OrderedDict([
*filters.items(),
*cls.create_full_text_search_filters(filters, field_name).items(),
])
return filters
@classmethod
def get_full_text_search_fields(cls) -> OrderedDict:
"""Resolve the `Meta.fields` argument including only full text search lookups."""
return cls._get_fields(is_full_text_search_lookup_expr)

@classmethod
def _get_fields(cls, is_special: bool) -> OrderedDict:
"""Resolve the `Meta.fields` argument including special or regular lookups."""
all_fields = super().get_fields()
regular_fields: List[Tuple[str, List[str]]] = []
func = is_special_lookup_expr if is_special else \
lambda lookup_expr: not is_special_lookup_expr(lookup_expr)
for k, v in all_fields.items():
regular_field = [lookup for lookup in v if func(lookup)]
def _get_fields(cls, predicate: Callable[[str], bool]) -> OrderedDict:
"""Resolve the `Meta.fields` argument including lookups that match the predicate."""
fields: List[Tuple[str, List[str]]] = []
for k, v in super().get_fields().items():
regular_field = [lookup_expr for lookup_expr in v if predicate(lookup_expr)]
if len(regular_field):
regular_fields.append((k, regular_field))
return OrderedDict(regular_fields)
fields.append((k, regular_field))
return OrderedDict(fields)
Loading

0 comments on commit e32301f

Please sign in to comment.