Filter, sort and paginate SQLAlchemy query objects. Ideal for exposing these actions over a REST API.
This project is fork of sqlalchemy-filters with sqlalchemy 1.4 and 2.0 support and other improvements.
Assuming that we have a SQLAlchemy Select
object:
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
class Base(object):
id = Column(Integer, primary_key=True)
name = Column(String(50), nullable=False)
count = Column(Integer, nullable=True)
@hybrid_property
def count_square(self):
return self.count * self.count
@hybrid_method
def three_times_count(self):
return self.count * 3
Base = declarative_base(cls=Base)
class Foo(Base):
__tablename__ = 'foo'
# ...
stmt = select(Foo)
Then we can apply filters to that stmt
object (multiple times):
from sa_filters import apply_filters
# `stmt` should be a SQLAlchemy Select or Query object
filter_spec = [{'field': 'name', 'op': '==', 'value': 'name_1'}]
filtered_stmt = apply_filters(stmt, filter_spec)
more_filters = [{'field': 'foo_id', 'op': 'is_not_null'}]
filtered_stmt = apply_filters(filtered_stmt, more_filters)
result = session.execute(filtered_stmt).all()
It is also possible to filter statements that contain multiple models, including joins:
class Bar(Base):
__tablename__ = 'bar'
foo_id = Column(Integer, ForeignKey('foo.id'))
stmt = select(Foo).join(Bar)
filter_spec = [
{'model': 'Foo', 'field': 'name', 'op': '==', 'value': 'name_1'},
{'model': 'Bar', 'field': 'count', 'op': '>=', 'value': 5},
]
filtered_stmt = apply_filters(stmt, filter_spec)
result = session.execute(filtered_stmt).all()
apply_filters
will attempt to automatically join models to stmt
if they're not already present and a model-specific filter is supplied.
For example, the value of filtered_stmt
in the following two code
blocks is identical:
stmt = select(Foo).join(Bar) # join pre-applied to statement
filter_spec = [
{'model': 'Foo', 'field': 'name', 'op': '==', 'value': 'name_1'},
{'model': 'Bar', 'field': 'count', 'op': '>=', 'value': 5},
]
filtered_stmt = apply_filters(stmt, filter_spec)
stmt = select(Foo) # join to Bar will be automatically applied
filter_spec = [
{'field': 'name', 'op': '==', 'value': 'name_1'},
{'model': 'Bar', 'field': 'count', 'op': '>=', 'value': 5},
]
filtered_stmt = apply_filters(stmt, filter_spec)
The automatic join is only possible if SQLAlchemy can implictly determine the condition for the join, for example because of a foreign key relationship.
Automatic joins allow flexibility for clients to filter and sort by related
objects without specifying all possible joins on the server beforehand. Feature
can be explicitly disabled by passing do_auto_join=False
argument to the
apply_filters
call.
Note that first filter of the second block does not specify a model.
It is implictly applied to the Foo
model because that is the only
model in the original query passed to apply_filters
.
It is also possible to apply filters to queries defined by fields, functions or
select_from
clause:
stmt_alt_1 = select(Foo.id, Foo.name)
stmt_alt_2 = select(func.count(Foo.id))
stmt_alt_3 = select(Foo.id).select_from(Foo)
You can filter by a hybrid attribute, a hybrid property or a hybrid method.
stmt = select(Foo)
filter_spec = [{'field': 'count_square', 'op': '>=', 'value': 25}]
filter_spec = [{'field': 'three_times_count', 'op': '>=', 'value': 15}]
filtered_stmt = apply_filters(stmt, filter_spec)
result = session.execute(filtered_stmt).all()
You can restrict the fields that SQLAlchemy loads from the database by
using the apply_loads
function:
stmt = select(Foo, Bar).join(Bar)
load_spec = [
{'model': 'Foo', 'fields': ['name']},
{'model': 'Bar', 'fields': ['count']}
]
stmt = apply_loads(stmt, load_spec) # will load only Foo.name and Bar.count
The effect of the apply_loads
function is to _defer_
the load
of any other fields to when/if they're accessed, rather than loading
them when the statement is executed. It only applies to fields that would be
loaded during normal statement execution.
The default SQLAlchemy join is lazy, meaning that columns from the
joined table are loaded only when required. Therefore apply_loads
has limited effect in the following scenario:
stmt = select(Foo).join(Bar)
load_spec = [
{'model': 'Foo', 'fields': ['name']},
{'model': 'Bar', 'fields': ['count']} # ignored
]
stmt = apply_loads(stmt, load_spec) # will load only Foo.name
apply_loads
cannot be applied to columns that are loaded as
joined eager loads.
This is because a joined eager load does not add the joined model to the
original query, as explained
here
The following would not prevent all columns from Bar
being eagerly
loaded:
stmt = select(Foo).options(joinedload(Foo.bar))
load_spec = [
{'model': 'Foo', 'fields': ['name']},
{'model': 'Bar', 'fields': ['count']}
]
stmt = apply_loads(stmt, load_spec)
Automatic Join
In fact, what happens here is that Bar
is automatically joined
to stmt
, because it is determined that Bar
is not part of
the original statement. The load_spec
therefore has no effect
because the automatic join results in lazy evaluation.
If you wish to perform a joined load with restricted columns, you must
specify the columns as part of the joined load, rather than with
apply_loads
:
stmt = select(Foo).options(joinedload(Bar).load_only('count'))
load_spec = [
{'model': 'Foo', 'fields': ['name']}
]
stmt = apply_loads(stmt, load_spec) # will load ony Foo.name and Bar.count
from sa_filters import apply_sort
# `stmt` should be a SQLAlchemy Select or Query object
sort_spec = [
{'model': 'Foo', 'field': 'name', 'direction': 'asc'},
{'model': 'Bar', 'field': 'id', 'direction': 'desc'},
]
sorted_stmt = apply_sort(stmt, sort_spec)
result = session.scalars(sorted_stmt).all()
apply_sort
will attempt to automatically join models to stmt
if
they're not already present and a model-specific sort is supplied.
The behaviour is the same as in apply_filters
.
This allows flexibility for clients to sort by fields on related objects without specifying all possible joins on the server beforehand.
You can sort by a hybrid attribute: a hybrid property or a hybrid method.
from sqlalchemy import func, select
from sa_filters import apply_pagination
# `stmt` should be a SQLAlchemy Select object
# let's count the number of rows returned by our statement
total_stmt = select(func.count()).select_from(stmt.subquery())
total_results = session.scalar(total_stmt)
paginated_stmt, pagination = apply_pagination(stmt, page_number=1, page_size=10, total_results=total_results)
result = session.execute(paginated_stmt).all()
page_number, page_size, num_pages, total_results = pagination
assert 10 == len(result)
assert 10 == page_size == pagination.page_size
assert 1 == page_number == pagination.page_number
assert 3 == num_pages == pagination.num_pages
assert 22 == total_results == pagination.total_results
You can use apply_filters
, apply_loads
, apply_sort
and apply_pagination
with SQLAlchemy Query
object:
query = session.query(Foo)
filter_spec = [{'field': 'name', 'op': '==', 'value': 'name_1'}]
filtered_query = apply_filters(query, filter_spec)
result = filtered_query.all()
Filters must be provided in a list and will be applied sequentially. Each filter will be a dictionary element in that list, using the following format:
filter_spec = [
{'model': 'model_name', 'field': 'field_name', 'op': '==', 'value': 'field_value'},
{'model': 'model_name', 'field': 'field_2_name', 'op': '!=', 'value': 'field_2_value'},
# ...
]
The model
key is optional if the original query being filtered only
applies to one model.
If there is only one filter, the containing list may be omitted:
filter_spec = {'field': 'field_name', 'op': '==', 'value': 'field_value'}
Where field
is the name of the field that will be filtered using the
operator provided in op
(optional, defaults to ==
) and the
provided value
(optional, depending on the operator).
You can also specify the table name instead of the model name by specifying
the optional key table
:
filter_spec = [
{'table': 'table_name', 'field': 'field_name', 'op': '==', 'value': 'field_value'},
{'table': 'table_name', 'field': 'field_2_name', 'op': '!=', 'value': 'field_2_value'},
# ...
]
This is the list of operators that can be used:
is_null
is_not_null
==
,eq
!=
,ne
>
,gt
<
,lt
>=
,ge
<=
,le
like
ilike
not_ilike
in
not_in
any
not_any
PostgreSQL specific operators allow to filter queries on columns of type ARRAY
.
Use any
to filter if a value is present in an array and not_any
if it's not.
and
, or
, and not
functions can be used and nested within the
filter specification:
filter_spec = [
{
'or': [
{
'and': [
{'field': 'field_name', 'op': '==', 'value': 'field_value'},
{'field': 'field_2_name', 'op': '!=', 'value': 'field_2_value'},
]
},
{
'not': [
{'field': 'field_3_name', 'op': '==', 'value': 'field_3_value'}
]
},
],
}
]
Note: or
and and
must reference a list of at least one element.
not
must reference a list of exactly one element.
Sort elements must be provided as dictionaries in a list and will be applied sequentially:
sort_spec = [
{'model': 'Foo', 'field': 'name', 'direction': 'asc'},
{'model': 'Bar', 'field': 'id', 'direction': 'desc'},
# ...
]
Where field
is the name of the field that will be sorted using the
provided direction
.
The model
key is optional if the original query being sorted only
applies to one model.
sort_spec = [
{'model': 'Baz', 'field': 'count', 'direction': 'asc', 'nullsfirst': True},
{'model': 'Qux', 'field': 'city', 'direction': 'desc', 'nullslast': True},
# ...
]
nullsfirst
is an optional attribute that will place NULL
values first
if set to True
, according to the SQLAlchemy documentation.
nullslast
is an optional attribute that will place NULL
values last
if set to True
, according to the SQLAlchemy documentation.
If none of them are provided, then NULL
values will be sorted according
to the RDBMS being used. SQL defines that NULL
values should be placed
together when sorting, but it does not specify whether they should be placed
first or last.
Even though both nullsfirst
and nullslast
are part of SQLAlchemy,
they will raise an unexpected exception if the RDBMS that is being used does
not support them.
At the moment they are supported by PostgreSQL, but they are not supported by SQLite and MySQL.
The default configuration uses SQLite, MySQL (if the driver is
installed, which is the case when tox
is used) and PostgreSQL
(if the driver is installed, which is the case when tox
is used) to
run the tests, with the following URIs:
sqlite+pysqlite:///test_sa_filters.db
mysql+mysqlconnector://root:@localhost:3306/test_sa_filters
postgresql+psycopg2://postgres:@localhost:5432/test_sa_filters?client_encoding=utf8'
A test database will be created, used during the tests and destroyed afterwards for each RDBMS configured.
There are Makefile targets to run docker containers locally for both MySQL and PostgreSQL, using the default ports and configuration:
$ make mysql-container
$ make postgres-container
To run the tests locally:
$ # Create/activate a virtual environment
$ pip install tox
$ tox
There are some other Makefile targets that can be used to run the tests:
There are other Makefile targets to run the tests, but extra dependencies will have to be installed:
$ pip install -U --editable ".[dev,mysql,postgresql]"
$ # using default settings
$ make test
$ make coverage
$ # overriding DB parameters
$ ARGS='--mysql-test-db-uri mysql+mysqlconnector://root:@192.168.99.100:3340/test_sa_filters' make test
$ ARGS='--sqlite-test-db-uri sqlite+pysqlite:///test_sa_filters.db' make test
$ ARGS='--mysql-test-db-uri mysql+mysqlconnector://root:@192.168.99.100:3340/test_sa_filters' make coverage
$ ARGS='--sqlite-test-db-uri sqlite+pysqlite:///test_sa_filters.db' make coverage
The following RDBMS are supported (tested):
- SQLite
- MySQL
- PostgreSQL
The following SQLAlchemy versions are supported: 1.4
, 2.0
.
Consult the CHANGELOG document for fixes and enhancements of each version.
Apache 2.0. See LICENSE for details.