Skip to content

Commit

Permalink
Add support for dask dataframes
Browse files Browse the repository at this point in the history
  • Loading branch information
Eike von Seggern committed Jun 1, 2024
1 parent 3053287 commit 1ae38d0
Show file tree
Hide file tree
Showing 13 changed files with 892 additions and 526 deletions.
2 changes: 1 addition & 1 deletion Changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Unreleased (YYYY-MM-DD)

- ...
- Add support for dask dataframes

# 1.5.0 (2024-04-17)

Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.8-slim
FROM python:3.9-slim

# Install curl to install poetry
RUN --mount=type=cache,target=/var/cache/apt --mount=type=cache,target=/var/lib/apt : \
Expand Down
9 changes: 6 additions & 3 deletions pandas_paddles/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,16 @@
__version__ = "1.6.0-dev"
__all__ = ["C", "DF", "I", "S", "report", "paddles"]

from .contexts import DataframeContext, SeriesContext

from .axis import ColumnSelectionComposer, IndexSelectionComposer
from .pipe import report
from . import paddles
try:
from .dask import DF, S
except ImportError as e:
print("Using pandas-only...", e)
from .pandas import DF, S


C = ColumnSelectionComposer()
DF = DataframeContext()
I = IndexSelectionComposer()
S = SeriesContext()
22 changes: 13 additions & 9 deletions pandas_paddles/axis.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Select axis labels (columns or index) of a data frame."""
import operator
from typing import Any, Callable, List, Optional, Sequence
from typing import Any, Callable, Optional, Sequence
import typing
try:
from typing import Literal
except ImportError:
Expand All @@ -9,8 +10,11 @@
import numpy as np
import pandas as pd

Indices = "Indices"
AnyDataframe = "AnyDataframe"
if typing.TYPE_CHECKING:
from .types import AnyDataframe, Indices

Indices = List[int]


class Selection:
Expand Down Expand Up @@ -40,7 +44,7 @@ def __init__(self, included:Optional[Indices]=None, excluded:Optional[Indices]=N
self.included: Optional[Indices] = included
self.excluded: Optional[Indices] = excluded

def apply(self, axis:Literal["columns", "index"], df: pd.DataFrame):
def apply(self, axis:Literal["columns", "index"], df: AnyDataframe):
labels = getattr(df, axis)
included = self.included
if included is None:
Expand Down Expand Up @@ -99,7 +103,7 @@ def union_indices(left: Indices, right: Indices) -> Indices:
# Column selection operator closures
class BaseOp:
"""API definition of the closure object."""
def __call__(self, axis: Literal["columns", "index"], df: pd.DataFrame) -> Selection:
def __call__(self, axis: Literal["columns", "index"], df: AnyDataframe) -> Selection:
"""Evaluate operator on data frame from context."""
raise NotImplementedError("Must be implemented in sub-class.")

Expand Down Expand Up @@ -191,7 +195,7 @@ def pp(a):
return f'(level={self.level}).{self.meth}({pp_args})'
return f'.{self.meth}({pp_args})'

def __call__(self, axis, df: pd.DataFrame) -> Selection:
def __call__(self, axis, df: AnyDataframe) -> Selection:
labels = getattr(df, axis)
if self.level is None:
str_accessor = labels.str
Expand All @@ -205,7 +209,7 @@ def __call__(self, axis, df: pd.DataFrame) -> Selection:

class EllipsisOp(BaseOp):
"""Select all labels (i.e. columns or rows)."""
def __call__(self, axis, df: pd.DataFrame) -> Selection:
def __call__(self, axis, df: AnyDataframe) -> Selection:
labels = getattr(df, axis)
return Selection(mask=np.ones(len(labels), dtype=bool))

Expand Down Expand Up @@ -238,7 +242,7 @@ def _pprint(self, axis: str) -> str:
return f"{self.left._pprint(axis)} {op_name} {self.right._pprint(axis)}"


def __call__(self, axis, df: pd.DataFrame) -> Selection:
def __call__(self, axis, df: AnyDataframe) -> Selection:
sel_left = self.left(axis, df)
sel_right = self.right(axis, df)

Expand Down Expand Up @@ -271,7 +275,7 @@ def _pprint(self, axis: str) -> str:
right = ")"
return f"{op_name}{left}{self.wrapped._pprint(axis)}{right}"

def __call__(self, axis, df: pd.DataFrame) -> Selection:
def __call__(self, axis, df: AnyDataframe) -> Selection:
sel = self.wrapped(axis, df)

return self.op(sel)
Expand Down Expand Up @@ -389,7 +393,7 @@ def __invert__(self):
op=operator.invert,
))

def __call__(self, df:pd.DataFrame) -> pd.Index:
def __call__(self, df: AnyDataframe) -> pd.Index:
"""Evaluate the wrapped operations."""
selection = self.op(self.axis, df)
return selection.apply(self.axis, df)
Expand Down
204 changes: 19 additions & 185 deletions pandas_paddles/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from . import operator_helpers


def _add_dunder_operators(cls):
def add_dunder_operators(cls):
"""Dress class with all sensible comparison operations.
The class must implement a ``_operator_proxy`` method.
Expand All @@ -21,20 +21,29 @@ def _add_dunder_operators(cls):
"""
# Fix the closure of `op_wrap` to the current value of `op`. Without
# `fix_closure()` all created methods point to the last `op` value.

cls_lookup_candidates = [pd.Series]
for wrapped_cls in cls.wrapped_cls:
if wrapped_cls in cls_lookup_candidates:
continue
cls_lookup_candidates.append(wrapped_cls)


def fix_closure(op):
def op_wrap(self, *args, **kwargs):
return self._operator_proxy(op)(*args, **kwargs)
# Update method metadata to improve usablility
op_wrap.__name__ = op
orig_doc = None
orig_annot = None
for pd_cls in {pd.Series} | {cls.wrapped_cls}:
if hasattr(pd_cls, op):
a = getattr(pd_cls, op)
for orig_cls in cls_lookup_candidates:
if hasattr(orig_cls, op):
a = getattr(orig_cls, op)
if not a.__doc__:
continue
op_wrap.__doc__ = a.__doc__
op_wrap.__annotations__ = a.__annotations__
try:
op_wrap.__annotations__ = a.__annotations__
except AttributeError:
pass
break

return op_wrap
Expand All @@ -51,7 +60,7 @@ def op_wrap(self, *args, **kwargs):
return cls


def _get_obj_attr_doc(obj_or_class: Any, attr: str):
def get_obj_attr_doc(obj_or_class: Any, attr: str):
"""Get doc-string for attribute ``attr`` of ``obj_or_class`` if it exists."""
if isinstance(attr, str):
a = getattr(obj_or_class, attr, None)
Expand All @@ -62,7 +71,7 @@ def _get_obj_attr_doc(obj_or_class: Any, attr: str):

class ClosureFactoryBase:
"""Abstract base-class for generating DataFrame and Series context closures."""
wrapped_cls: ClassVar[Type] = type('NotABaseOfAnything', (), {})
wrapped_cls: ClassVar[Tuple[Type]] = (type('NotABaseOfAnything', (), {}),)
wrapped_s: str = "X"
def __init__(self,
closures: Optional[Iterable[ClosureBase]]=None):
Expand Down Expand Up @@ -110,7 +119,7 @@ def __getitem__(self, key: str) -> "ClosureFactoryBase":
def _operator_proxy(self, op_name: str) -> Callable:
"""Generate proxy function for built-in operators.
Used by :func:`_add_dunder_operators`
Used by :func:`add_dunder_operators`
"""
def op_wrapper(*args, **kwargs):
return type(self)(self._closures + (MethodClosure(op_name, type(self), *args, **kwargs),))
Expand Down Expand Up @@ -169,178 +178,3 @@ def to_node(x):
cur = new

return cur.root


@_add_dunder_operators # This is necessary to overload all dunder operators.
class DataframeContext(ClosureFactoryBase):
"""Build callable to access columns and operators.
Use the global instance like::
from pandas_paddles import DF
df.loc[DF["x"] < 3]
All operations (item/attribute access, method calls) are passed to the
data frame of the context.
This is useful in combination with :attr:`~pandas.DataFrame.loc`,
:attr:`~pandas.DataFrame.iloc`, :meth:`~pandas.DataFrame.assign()` and
other methods that accept callables taking the data frame to act on as
single argument.
Examples
--------
Usage with :attr:`~pandas.DataFrame.loc` or
:attr:`~pandas.DataFrame.iloc`::
df = pd.DataFrame({"x": [1, 2, 3, 4]})
df.loc[DF["x"] <= 2]
# Out:
# x
# 0 1
# 1 2
Usage with :meth:`~pandas.DataFrame.assign()`::
df.assign(y = DF["x"] * 2)
# Out:
# x y
# 0 1 2
# 1 2 4
# 2 3 6
# 3 4 8
"""
wrapped_cls = pd.DataFrame
wrapped_s = "DF"
def _get_doc(self) -> Optional[str]:
doc = super()._get_doc()
# Assume DataFrame-level function for 1-level accessor
if len(self._closures) == 1 and isinstance(self._closures[-1].name, str):
doc = _get_obj_attr_doc(self.wrapped_cls, self._closures[-1].name) or doc
# Check for typed Series accessors for 3+-level accessor
elif len(self._closures) >= 3 and self._closures[-2].name in ('dt', 'str'):
doc = (
_get_obj_attr_doc(
getattr(pd.Series, self._closures[-2].name),
self._closures[-1].name,
)
or doc
)
# Check for Series-level function for 2+-level accessor
elif len(self._closures) > 1:
doc = _get_obj_attr_doc(pd.Series, self._closures[-1].name) or doc

return doc



@_add_dunder_operators # This is necessary to overload all dunder operators.
class SeriesContext(ClosureFactoryBase):
"""Build callable for series attributes and operators.
Use the global instance like::
from pandas_paddles import S
s[S < 0]
All operations (item/attribute access, method calls) are passed to the
series of the context.
This is useful in combination with :attr:`~pandas.Series.loc`,
:attr:`~pandas.Series.iloc`, and other methods that accept callables
taking the series to act on as argument, e.g., `.agg()` after a
group-by.
Examples
--------
Usage with ``[]``, ``.loc[]`` or ``.iloc[]``::
from pandas_paddles import S
s = pd.Series(range(10))
s[S <= 2]
# Out:
# 0 0
# 1 1
# 2 2
# dtype: int64
Aggregating a single ``groupby``ed column with
``groupby(...)[col].agg()``::
df = pd.DataFrame({
"x": [1, 2, 3, 4],
"y": ["a", "b", "b", "a"],
"z": [0.1, 0.5, 0.6, 0.9],
})
df.groupby("y")["x"].agg(S.max() - S.min())
# Out:
# y
# a 3
# b 1
# Name: x, dtype: int64
Appying multiple aggregations to a single column::
df.groupby("y")["x"].agg([
S.max() - S.min(),
S.mean(),
])
# Out:
# S.max() - S.min() S.min()
# y
# a 3 1
# b 1 2
Aggregating multiple columns (**Note:** You must wrap the
``S``-expressions in a ``list`` even when using only one expression!)::
df.groupby("y").agg([S.min()])
# Out:
# x z
# S.min() S.min()
# y
# a 1 0.1
# b 2 0.5
Multiple ``S``-expressions work the same::
df.groupby("y").agg([S.min(), S.mean()])
# Out:
# x z
# S.min() S.mean() S.min() S.mean()
# y
# a 1 2.5 0.1 0.50
# b 2 2.5 0.5 0.55
``S``-expressions can alsoe be passed in a ``dict`` argument to
``.agg()`` (Again, they always need to be wrapped in a ``list``!)::
df.groupby("y").agg({
"x": [S.min(), S.mean()],
"z": [S.max(), S.max() - S.min()],
})
# Out:
# x z
# S.min() S.mean() S.max() S.max() - S.min()
# y
# a 1 2.5 0.9 0.8
# b 2 2.5 0.6 0.1
"""
wrapped_cls = pd.Series
wrapped_s = "S"
def _get_doc(self) -> Optional[str]:
doc = super()._get_doc()
# Assume Series-level function for 1-level accessor
if len(self._closures) == 1 and isinstance(self._closures[-1].name, str):
doc = _get_obj_attr_doc(self.wrapped_cls, self._closures[-1].name) or doc
# Check for typed Series accessors
elif len(self._closures) > 1 and self._closures[0].name in ('dt', 'str'):
doc = (
_get_obj_attr_doc(
getattr(pd.Series, self._closures[0].name),
self._closures[-1].name,
)
or doc
)

return doc
18 changes: 18 additions & 0 deletions pandas_paddles/dask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""Pandas and dask contexts"""

from dask.dataframe import DataFrame
import pandas as pd

from .pandas import PandasDataframeContext, S


from .contexts import (
add_dunder_operators,
get_obj_attr_doc,
)

@add_dunder_operators
class DaskDataframeContext(PandasDataframeContext):
wrapped_cls = (pd.DataFrame, DataFrame)

DF = DaskDataframeContext()
Loading

0 comments on commit 1ae38d0

Please sign in to comment.