Skip to content

Commit

Permalink
get_final_versions fixes (#38)
Browse files Browse the repository at this point in the history
1. Fixed problem when get_final_version() could return 2 rows instead of 1 and insert multiple sign < 0 rows with single sign > 0
2. Fixed problem from PR #37, when date_col is not set for ClickHouseModel
  • Loading branch information
M1ha-Shvn authored Oct 6, 2021
1 parent 12069db commit 0e65f15
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 34 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

setup(
name='django-clickhouse',
version='1.1.1',
version='1.1.2',
packages=['django_clickhouse', 'django_clickhouse.management.commands'],
package_dir={'': 'src'},
url='https://github.com/carrotquest/django-clickhouse',
Expand Down
116 changes: 83 additions & 33 deletions src/django_clickhouse/engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
"""
import datetime
from typing import List, Type, Union, Iterable, Optional
import logging
from typing import List, Type, Union, Iterable, Optional, Tuple, NamedTuple

from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines
Expand All @@ -14,6 +15,9 @@
from .utils import format_datetime


logger = logging.getLogger('django-clickhouse')


class InsertOnlyEngineMixin:
def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]:
"""
Expand Down Expand Up @@ -45,43 +49,64 @@ def __init__(self, *args, **kwargs):
self.version_col = kwargs.pop('version_col', None)
super(CollapsingMergeTree, self).__init__(*args, **kwargs)

def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
def _get_final_versions_by_version(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str],
columns: str, date_range_filter: str = '') -> List[NamedTuple]:
"""
Performs request to ClickHouse in order to fetch latest version for each object pk
:param db_alias: ClickHouse database alias used
:param model_cls: Model class for which data is fetched
:param object_pks: Objects primary keys to filter by
:param columns: Columns to fetch
:param date_range_filter: Optional date_range_filter which speeds up query if date_col is set
:return: List of named tuples with requested columns
"""
if date_range_filter:
date_range_filter = 'PREWHERE {}'.format(date_range_filter)

query = """
SELECT {columns} FROM $table WHERE (`{pk_column}`, `{version_col}`) IN (
SELECT `{pk_column}`, MAX(`{version_col}`)
FROM $table
PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
AND `{pk_column}` IN ({object_pks})
GROUP BY `{pk_column}`
)
""".format(columns=','.join(columns), version_col=self.version_col, date_col=date_col, pk_column=self.pk_column,
min_date=min_date, max_date=max_date, object_pks=','.join(object_pks))
SELECT {columns}
FROM $table
{date_range_filter}
WHERE `{pk_column}` IN ({object_pks})
ORDER BY `{pk_column}`, `{version_col}` DESC
LIMIT 1 BY `{pk_column}`
""".format(columns=','.join(columns), version_col=self.version_col, pk_column=self.pk_column,
date_range_filter=date_range_filter, object_pks=','.join(object_pks), sign_col=self.sign_col)

return connections[db_alias].select_tuples(query, model_cls)

def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
def _get_final_versions_by_final(self, db_alias: str, model_cls: Type[ClickHouseModel], object_pks: Iterable[str],
columns: str, date_range_filter: str = '') -> List[NamedTuple]:
"""
Performs request to ClickHouse in order to fetch latest version for each object pk
:param db_alias: ClickHouse database alias used
:param model_cls: Model class for which data is fetched
:param object_pks: Objects primary keys to filter by
:param columns: Columns to fetch
:param date_range_filter: Optional date_range_filter which speeds up query if date_col is set
:return: List of named tuples with requested columns
"""
if date_range_filter:
date_range_filter += ' AND'

query = """
SELECT {columns} FROM $table FINAL
WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
AND `{pk_column}` IN ({object_pks})
WHERE {date_range_filter} `{pk_column}` IN ({object_pks})
"""
query = query.format(columns=','.join(columns), date_col=date_col, pk_column=self.pk_column, min_date=min_date,
max_date=max_date, object_pks=','.join(object_pks))
query = query.format(columns=','.join(columns), pk_column=self.pk_column, date_range_filter=date_range_filter,
object_pks=','.join(object_pks))
return connections[db_alias].select_tuples(query, model_cls)

def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel],
date_col: Optional[str] = None) -> Iterable[tuple]:
def _get_date_rate_filter(self, objects, model_cls: Type[ClickHouseModel], db_alias: str,
date_col: Optional[str]) -> str:
"""
Get objects, that are currently stored in ClickHouse.
Depending on the partition key this can be different for different models.
In common case, this method is optimized for date field that doesn't change.
It also supposes primary key to by self.pk_column
:param model_cls: ClickHouseModel subclass to import
:param objects: Objects for which final versions are searched
:param date_col: Optional column name, where partiion date is hold. Defaults to self.date_col
:return: A generator of named tuples, representing previous state
Generates datetime filter to speed up final queries, if date_col is present
:param objects: Objects, which are inserted
:param model_cls: Model class for which data is fetched
:param db_alias: ClickHouse database alias used
:param date_col: Optional column name, where partition date is hold. Defaults to self.date_col
:return: String to add to WHERE or PREWHERE query section
"""

def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
if isinstance(dt, datetime.datetime):
return format_datetime(dt, 0, db_alias=db_alias)
Expand All @@ -90,10 +115,15 @@ def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
else:
raise Exception('Invalid date or datetime object: `%s`' % dt)

if not objects:
raise StopIteration()

date_col = date_col or self.date_col

if not date_col:
logger.warning('django-clickhouse: date_col is not provided for model %s.'
' This can cause significant performance problems while fetching data.'
' It is worth inheriting CollapsingMergeTree engine with custom get_final_versions() method,'
' based on your partition_key' % model_cls)
return ''

min_date, max_date = None, None
for obj in objects:
obj_date = getattr(obj, date_col)
Expand All @@ -104,19 +134,39 @@ def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
if max_date is None or max_date < obj_date:
max_date = obj_date

min_date = _dt_to_str(min_date)
max_date = _dt_to_str(max_date)

return "`{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'".\
format(min_date=min_date, max_date=max_date, date_col=date_col)

def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel],
date_col: Optional[str] = None) -> Iterable[tuple]:
"""
Get objects, that are currently stored in ClickHouse.
Depending on the partition key this can be different for different models.
In common case, this method is optimized for date field that doesn't change.
It also supposes primary key to by self.pk_column
:param model_cls: ClickHouseModel subclass to import
:param objects: Objects for which final versions are searched
:param date_col: Optional column name, where partition date is hold. Defaults to self.date_col
:return: A generator of named tuples, representing previous state
"""
if not objects:
raise StopIteration()

object_pks = [str(getattr(obj, self.pk_column)) for obj in objects]

db_alias = model_cls.get_database_alias()

min_date = _dt_to_str(min_date)
max_date = _dt_to_str(max_date)
date_range_filter = self._get_date_rate_filter(objects, model_cls, db_alias, date_col)

# Get fields. Sign is replaced to negative for further processing
columns = list(model_cls.fields(writable=True).keys())
columns.remove(self.sign_col)
columns.append('-1 AS sign')

params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns)
params = (db_alias, model_cls, object_pks, columns, date_range_filter)

if self.version_col:
return self._get_final_versions_by_version(*params)
Expand Down
13 changes: 13 additions & 0 deletions tests/test_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,25 @@ def test_get_final_versions_by_final_datetime(self):
self.objects, date_col='created')
self._test_final_versions(final_versions)

def test_get_final_versions_by_final_no_date_col(self):
ClickHouseCollapseTestModel.engine.date_col = None
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects)
self._test_final_versions(final_versions)

def test_get_final_versions_by_version_datetime(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects, date_col='created')
self._test_final_versions(final_versions)

def test_get_final_versions_by_version_no_date_col(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
ClickHouseCollapseTestModel.engine.date_col = None
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects)
self._test_final_versions(final_versions)

def test_versions(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects)
Expand Down

0 comments on commit 0e65f15

Please sign in to comment.