-
Notifications
You must be signed in to change notification settings - Fork 595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dask backend execution #2557
Dask backend execution #2557
Conversation
Looks like there's a join test that's flaky and some tests in ibis/test/all that are having some trouble with dask DataFrames - will take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so as i look thru, i am seeing a lot of code copying. so 2 choices here
you can simply import the actual pandas implmentation that is generic (e.g. literally import the function) and then just call it within the dask registered function
OR
move the implmenetation to a common location & register both pandas and dask.
Both of these are solutions are fine. What I would do is basically merge all the tests, but xfail them. in a pre-cursor PR. then as ops are defined can xfail the tests.
ibis/backends/dask/execution/maps.py
Outdated
return data.map(get) | ||
|
||
|
||
# Note - to avoid dispatch ambiguities we must unregister pandas |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really? why? these are a dd.Series should override here? (now a way around this is to in a common location you simply register with both types, e.g. in a dask/pandas ops section). we can do that as a followup, though maybe makes sense to split it out now (before this PR).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't unregister these, https://github.com/ibis-project/ibis/blob/master/ibis/backends/pandas/tests/test_core.py#L44 will fail. If I understand the issue correct it is that multipledisptach
sees
(ops.MapValueOrDefaultForKey, collections.abc.Mapping, pandas.Series, object,)
(ops.MapValueOrDefaultForKey, collections.abc.Mapping, object, pandas.Series,)
(ops.MapValueOrDefaultForKey, collections.abc.Mapping, dd.Series, object,)
(ops.MapValueOrDefaultForKey, collections.abc.Mapping, object, dd.Series,)
as an ambiguous set of registrations. via https://multiple-dispatch.readthedocs.io/en/latest/resolution.html#ambiguities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, this looks like we should actually just register both (e.g. do this as a common set).
though why are these ambiguous?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question - that's the source of the failing pandas test as well. I think what is happening is multipledispatch
sees two ambiguities:
(ops.MapValueOrDefaultForKey, collections.abc.Mapping, object, pandas.Series,)
(ops.MapValueOrDefaultForKey, collections.abc.Mapping, dd.Series, object,)
and separately
(ops.MapValueOrDefaultForKey, collections.abc.Mapping, pandas.Series, object,)
(ops.MapValueOrDefaultForKey, collections.abc.Mapping, object, dd.Series,)
multipledispatch
sees this as ambiguous because it won't know what do do if you call this with a (pandas.Series, dd.Series)
object. I've changed the registration around in c125fe7 to both avoid the ambiguity and keep dispatch working correctly in pandas (even if you have also loaded the dask backend at the same time). I tried to clarify in the code why we were doing it this way.
Let me know if this was a reasonable approach and the commentary is clear, or you want this done a different way.
RE the xfail at: https://github.com/gerrymanoim/ibis/blob/ibis-dask-execution/ibis/backends/dask/tests/test_datatypes.py#L89, the issue here is https://github.com/ibis-project/ibis/blob/master/ibis/expr/types.py#L1256 imports pandas directly, detects |
just create an issue for now. need to re-write this with a single dispatch rule to handle this instead of the way its done. |
Discovered an issue with how generic ibis tests were being run on the dask backend. A lot of these newer test failures have to do with how the tests themselves are written. For example, https://github.com/gerrymanoim/ibis/blob/ibis-dask-execution/ibis/tests/all/test_string.py#L17 has the line: df.string_col.map(is_text_type).all() which is not a valid dask expression (the valid one would be Not sure what the best option is for these tests. |
the |
Running the code there gives you
for example. Another example - Several tests use the fixture |
Originally I was going to break up this PR, but a difficulty is that tests depend on lots of different pieces from execution, so I ended up coding up most of it just to get I think my next steps here are:
Does that sound like reasonable plan? |
plan sounds good. |
replacement | ||
if isnull(value) | ||
else dd.from_pandas( | ||
pd.Series(value, index=replacement.index), npartitions=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this work? Isn't replace.index a distributed index? Will this trigger any data materialization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good question - I think this is done lazily
In [34]: dask_base
Out[34]:
Dask Series Structure:
npartitions=1
0 datetime64[ns]
2 ...
dtype: datetime64[ns]
Dask Name: from_pandas, 1 tas
In [32]: dd.from_pandas(pd.Series(5, index=dask_base.index), npartitions=1)
Out[32]:
Dask Series Structure:
npartitions=1
0 int64
2 ...
dtype: int64
Dask Name: from_pandas, 1 tasks
In [33]: dd.from_pandas(pd.Series(5, index=dask_base.index), npartitions=1).compute()
Out[33]:
0 5
1 5
2 5
dtype: int64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a lot of operations like this will trigger materialization at times. this is ok. its on the worker in a partition. its usually much easier to just do it this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that is the type of the value
and replacement
here? Also why do we return a series of a single partition? Should this method return the same partitions as the input dd.Series?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should express the partitions in chunks rather than having a static number - fixing.
raw_1d.astype(constants.IBIS_TYPE_TO_PANDAS_TYPE[expr.type()]) | ||
) | ||
# TODO - we force computation here | ||
if isinstance(expr, ir.ScalarExpr) and result.size.compute() == 1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we forcing computation here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to actually know the result.size
to do the boolean test. I tried to be explicit about places where we're forcing compute so we can come back and adjust them if we find better ways of lazily doing these checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kk sounds fine. We can look later. Should try to get rid of these computation in the complication step as much as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed on the general point of aiming to not materialize as much as possible.
Quick update:
Once I rebase and get CI tests green I'll mark this ready to review and ping you again. |
@gerrymanoim sounds great ping when ready pinning ok for now |
1bce6fe
to
bf11966
Compare
Ah - looks like pandas got bumped in the CI to 1.1, which was breaking a bunch of my tests (pandas 1.1 and dask 2.30 do not get along). After bumping dask as well, everything looks green. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok a few comments. this looks reasonable. ok to do the removal of generic.py in a followon. will look once again, but then prob ok to merge. like to do some cleaning in small pieces rather than big changes first.
|
||
if isinstance(from_type, (dt.Timestamp, dt.Date)): | ||
return data.astype( | ||
'M8[ns]' if tz is None else DatetimeTZDtype('ns', tz) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is likley a bug, mark the test xfail
return result | ||
|
||
|
||
@execute_node.register(ops.SimpleCase, dd.Series, list, list, object) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you either organize this file into sections, with comments indicating the type of ops or split to separate files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok i see you did split out a lot of things. I guess just need to split the rest and remove the need for generic.py.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If done as a follow on - should pandas and dask be updated at the same time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's do separate PRs for these (can be done any order and all should be orthogonal).
ibis/backends/tests/test_numeric.py
Outdated
@@ -101,7 +101,10 @@ def test_isnan_isinf( | |||
expected = backend.default_series_rename(expected) | |||
backend.assert_series_equal(result, expected) | |||
else: | |||
assert result == expected | |||
try: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change? can you simply change the expected or make a dask specific test? catching an exception in a test is generally a no-no as it hides things
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This touches on a general issue I had with tests where many of them implicitly assume you're running pandas in the test (see anything with pandas
in #2553 (comment)).
Here - I wanted to support equality for dd.Series
objects without adding an explicit dependency on dask
for running the general tests.
I could change this to something like if 'dd' in locals() and isinstance(expected, dd.Series):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However - it seems that ibs/backends/tests
now assumes you have requirements for all envs installed. Is that intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no, this is something that @datapythonista removed, are you rebased on master (which reverted the last change).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rebased now. So given that I don't necessarily want a dask dependency to run these tests, maybe these checks would be better than the exception?
"dask" in str(type(df)
or
hasattr(df, "compute")
@icexelloss @jreback - I'm rebasing now after which tests should be green (I'll confirm) and I'll be ready for more comments/re-review. I'm also addressing some of Jeff's comments above. |
fd1ae80
to
30f8bee
Compare
The failure is from azure pipelines taking longer than 1 hour to build the docs. |
https://dev.azure.com/ibis-project/ibis/_build/results?buildId=3987&view=logs&j=8f09edc2-e3b7-52de-126a-0225c4f3efa1&t=8f09edc2-e3b7-52de-126a-0225c4f3efa1 worked fine without me changing anything. Maybe some issue with the conda solve last night. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://dev.azure.com/ibis-project/ibis/_build/results?buildId=3987&view=logs&j=8f09edc2-e3b7-52de-126a-0225c4f3efa1&t=8f09edc2-e3b7-52de-126a-0225c4f3efa1 worked fine without me changing anything. Maybe some issue with the conda solve last night.
yeah ok
|
||
if isinstance(from_type, (dt.Timestamp, dt.Date)): | ||
return data.astype( | ||
'M8[ns]' if tz is None else DatetimeTZDtype('ns', tz) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm what does the pandas backend do here? (note not really concerned about this right now, just add a TODO and can followup later)
timestamps = data.map_partitions( | ||
to_datetime, | ||
infer_datetime_format=True, | ||
meta=(data.name, 'datetime64[ns]'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this is prb ok
meta=(data.name, 'datetime64[ns]'), | ||
) | ||
# TODO - is there a better way to do this | ||
timestamps = timestamps.astype(timestamps.head(1).dtype) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this is prob ok, but just add a todo and can look at later
replacement | ||
if isnull(value) | ||
else dd.from_pandas( | ||
pd.Series(value, index=replacement.index), npartitions=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a lot of operations like this will trigger materialization at times. this is ok. its on the worker in a partition. its usually much easier to just do it this way.
|
||
|
||
def vectorize_object(op, arg, *args, **kwargs): | ||
# TODO - this works for now, but I think we can do something much better |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah this almost doesn't even matter. its going the op on object dtype with numeric data; this is for numpy compat and likely never hit in any import case.
looks good. plenty of followups! asked @icexelloss and @emilyreff7 to have a look. |
|
||
@execute_node.register(ops.NullIf, simple_types, dd.Series) | ||
def execute_node_nullif_scalar_series(op, value, series, **kwargs): | ||
# TODO - not preserving the index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am curious why not preserving the index here? Does the input/output Series have the same index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are doing a da.where
on the dd.Series.values
that will not carry the index information with it.
|
||
# TODO - execute_materialized_join - #2553 | ||
@execute_node.register(ops.Join, dd.DataFrame, dd.DataFrame) | ||
def execute_materialized_join(op, left, right, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is materialized join?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is the join handler for joins on columns we need to materialize, i.e the following from test_join_with_non_trivial_key
:
join = left.join(right, right.key.length() == left.key.length(), how=how)
expr = join[left.key, left.value, right.other_value]
result = expr.execute()
expected = (
dd.merge(
df1.assign(key_len=df1.key.str.len()),
df2.assign(key_len=df2.key.str.len()),
on='key_len',
how=how,
)
.drop(['key_len', 'key_y', 'key2', 'key3'], axis=1)
.rename(columns={'key_x': 'key'})
)
(code is basically https://github.com/ibis-project/ibis/blob/master/ibis/backends/pandas/execution/join.py#L58),
return call_numpy_ufunc(func, op, data, **kwargs).astype(return_type) | ||
|
||
|
||
def vectorize_object(op, arg, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this method do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also a bit weird that this is only used for log and round operations. Curious why this is not used in all numeric methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
raise KeyError(name) | ||
(root_table,) = op.root_tables() | ||
left_root, right_root = ops.distinct_roots( | ||
parent_table_op.left, parent_table_op.right |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the type of parent_table_op
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inside of this branch of the function they parent_table_op is a join operation of some type.
result = execute(expr, scope=scope, timecontext=timecontext, **kwargs) | ||
assert result_name is not None, 'Column selection name is None' | ||
if np.isscalar(result): | ||
series = dd.from_array(np.repeat(result, len(data.index))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will enforce computation, right? (Also might below memory since you can materialize into np array on the driver.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still lazy:
In [5]: data = dd.from_pandas(pd.Series([1,2,3]), npartitions=1)
In [6]: series = dd.from_array(np.repeat(5, len(data.index)))
In [7]: series
Out[7]:
Dask Series Structure:
npartitions=1
0 int64
2 ...
dtype: int64
Dask Name: from_array, 1 tasks
In [8]: series.compute()
Out[8]:
0 5
1 5
2 5
dtype: int64
parent_table_op.left, parent_table_op.right | ||
) | ||
suffixes = { | ||
left_root: constants.LEFT_JOIN_SUFFIX, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't totally get this logic here. Why is "joining" appear in the column selection rule here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The codepath here is distinct form execute_selection_dataframe
(in either dask or pandas) which performs the basic selection operation. Here we're performing a selection on a ir.ColumnExpr
. An example (from test_core
that hits this code path:
left = ibis_table
right = left.view()
join = left.join(right, 'plain_strings')[left.plain_int64]
or similarly from test_join_with _project_right_duplicate_column
:
right = client.table('df3')
join = left.join(right, ['key'], how=how)
expr = join[left.key, right.key2, right.other_value]
result = expr.execute()
|
||
@execute_node.register(ops.StringAscii, dd.Series) | ||
def execute_string_ascii(op, data, **kwargs): | ||
output_meta = pandas.Series([], dtype=np.dtype('int32'), name=data.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. what should the type of meta
be in general? looks like sometimes it's a pd.Series sometimes it's a dict?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.dask.org/en/latest/dataframe-design.html#metadata. A few different ways to apply meta are all equally valid. My reads is you can:
- Provide an empty
Pandas object with appropriate dtypes and names
. - A descriptive meta, which can be:
A dict of {name: dtype} or an iterable of (name, dtype) specifies a DataFrame. Note that order is important: the order of the names in meta should match the order of the columns
A tuple of (name, dtype) specifies a series
A dtype object or string (e.g. 'f8') specifies a scalar
& (data.dt.time.astype(str) <= upper) | ||
).to_dask_array(True) | ||
|
||
result = da.zeros(len(data), dtype=np.bool_) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
len(data)
seems dangerous here since it will materialize data? Can we create a distributed array here by calling "apply" on the original dd.Series?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure. I'll add this to a "materialization to investigate" follow up issue (along with items you mentioned above).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished one around review.
@jreback @icexelloss - I think I've hit all the feedback, let me know if there's anything else that needs to be done. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
few more comments, main thing is to lower the required dask version.
|
||
# TODO - aggregations - #2553 | ||
# Not all code paths work cleanly here | ||
@execute_node.register(ops.Aggregation, dd.DataFrame) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add to the list of things that we can register for pandas/dask
Updated for review comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @gerrymanoim lgtm.
Part 3 of #2537.
See #2553 for (much improved) tracking anything that is
xfail
ed in dask, but not in pandas. Functions/tests expected to fail are marked withTODO - <reason keyword> #2553
.Notes/caveats: