-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
frame_base.py
764 lines (627 loc) · 24.8 KB
/
frame_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import operator
import re
from collections.abc import Callable
from inspect import cleandoc
from inspect import getfullargspec
from inspect import isclass
from inspect import ismodule
from inspect import unwrap
from typing import Any
from typing import Optional
from typing import Tuple
from typing import Union
import pandas as pd
from apache_beam.dataframe import expressions
from apache_beam.dataframe import partitionings
class DeferredBase(object):
_pandas_type_map: dict[Union[type, None], type] = {}
def __init__(self, expr):
self._expr = expr
@classmethod
def _register_for(cls, pandas_type):
def wrapper(deferred_type):
cls._pandas_type_map[pandas_type] = deferred_type
return deferred_type
return wrapper
@classmethod
def wrap(cls, expr, split_tuples=True):
proxy_type = type(expr.proxy())
if proxy_type is tuple and split_tuples:
def get(ix):
return expressions.ComputedExpression(
# yapf: disable
'get_%d' % ix,
lambda t: t[ix],
[expr],
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Singleton())
return tuple(cls.wrap(get(ix)) for ix in range(len(expr.proxy())))
elif proxy_type in cls._pandas_type_map:
wrapper_type = cls._pandas_type_map[proxy_type]
else:
if expr.requires_partition_by() != partitionings.Singleton():
raise ValueError(
'Scalar expression %s of type %s partitoned by non-singleton %s' %
(expr, proxy_type, expr.requires_partition_by()))
wrapper_type = _DeferredScalar
return wrapper_type(expr)
def _elementwise(
self, func, name=None, other_args=(), other_kwargs=None, inplace=False):
other_kwargs = other_kwargs or {}
return _elementwise_function(
func, name, inplace=inplace)(self, *other_args, **other_kwargs)
def __reduce__(self):
return UnusableUnpickledDeferredBase, (str(self), )
class UnusableUnpickledDeferredBase(object):
"""Placeholder object used to break the transitive pickling chain in case a
DeferredBase accidentially gets pickled (e.g. as part of globals).
Trying to use this object after unpickling is a bug and will result in an
error.
"""
def __init__(self, name):
self._name = name
def __repr__(self):
return 'UnusablePickledDeferredBase(%r)' % self.name
class DeferredFrame(DeferredBase):
pass
class _DeferredScalar(DeferredBase):
def apply(self, func, name=None, args=()):
if name is None:
name = func.__name__
with expressions.allow_non_parallel_operations(
all(isinstance(arg, _DeferredScalar) for arg in args) or None):
return DeferredFrame.wrap(
expressions.ComputedExpression(
name,
func, [self._expr] + [arg._expr for arg in args],
requires_partition_by=partitionings.Singleton()))
def __neg__(self):
return self.apply(operator.neg)
def __pos__(self):
return self.apply(operator.pos)
def __invert__(self):
return self.apply(operator.invert)
def __repr__(self):
return f"DeferredScalar[type={type(self._expr.proxy())}]"
def __bool__(self):
# TODO(BEAM-11951): Link to documentation
raise TypeError(
"Testing the truth value of a deferred scalar is not "
"allowed. It's not possible to branch on the result of "
"deferred operations.")
def _scalar_binop(op):
def binop(self, other):
if not isinstance(other, DeferredBase):
return self.apply(lambda left: getattr(left, op)(other), name=op)
elif isinstance(other, _DeferredScalar):
return self.apply(
lambda left, right: getattr(left, op)(right), name=op, args=[other])
else:
return NotImplemented
return binop
for op in ['__add__',
'__sub__',
'__mul__',
'__div__',
'__truediv__',
'__floordiv__',
'__mod__',
'__divmod__',
'__pow__',
'__and__',
'__or__']:
setattr(_DeferredScalar, op, _scalar_binop(op))
DeferredBase._pandas_type_map[None] = _DeferredScalar
def name_and_func(method: Union[str, Callable]) -> Tuple[str, Callable]:
"""For the given method name or method, return the method name and the method
itself.
For internal use only. No backwards compatibility guarantees."""
if isinstance(method, str):
method_str = method
func = lambda df, *args, **kwargs: getattr(df, method_str)(*args, **kwargs)
return method, func
else:
return method.__name__, method
def _elementwise_method(
func, name=None, restrictions=None, inplace=False, base=None):
return _proxy_method(
func,
name,
restrictions,
inplace,
base,
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary())
def _proxy_method(
func,
name=None,
restrictions=None,
inplace=False,
base=None,
*,
requires_partition_by: partitionings.Partitioning,
preserves_partition_by: partitionings.Partitioning,
):
if name is None:
name, func = name_and_func(func)
if base is None:
raise ValueError("base is required for _proxy_method")
return _proxy_function(
func,
name,
restrictions,
inplace,
base,
requires_partition_by=requires_partition_by,
preserves_partition_by=preserves_partition_by)
def _elementwise_function(
func, name=None, restrictions=None, inplace=False, base=None):
return _proxy_function(
func,
name,
restrictions,
inplace,
base,
requires_partition_by=partitionings.Arbitrary(),
preserves_partition_by=partitionings.Arbitrary())
def _proxy_function(
func: Union[Callable, str],
name: Optional[str] = None,
restrictions: Optional[dict[str, Union[Any, list[Any]]]] = None,
inplace: bool = False,
base: Optional[type] = None,
*,
requires_partition_by: partitionings.Partitioning,
preserves_partition_by: partitionings.Partitioning,
):
if name is None:
if isinstance(func, str):
name = func
else:
name = func.__name__
if restrictions is None:
restrictions = {}
def wrapper(*args, **kwargs):
for key, values in restrictions.items():
if key in kwargs:
value = kwargs[key]
else:
try:
ix = getfullargspec(func).args.index(key)
except ValueError:
# TODO: fix for delegation?
continue
if len(args) <= ix:
continue
value = args[ix]
if callable(values):
check = values
elif isinstance(values, list):
check = lambda x, values=values: x in values
else:
check = lambda x, value=value: x == value
if not check(value):
raise NotImplementedError(
'%s=%s not supported for %s' % (key, value, name))
deferred_arg_indices = []
deferred_arg_exprs = []
constant_args = [None] * len(args)
from apache_beam.dataframe.frames import _DeferredIndex
for ix, arg in enumerate(args):
if isinstance(arg, DeferredBase):
deferred_arg_indices.append(ix)
deferred_arg_exprs.append(arg._expr)
elif isinstance(arg, _DeferredIndex):
# TODO(robertwb): Consider letting indices pass through as indices.
# This would require updating the partitioning code, as indices don't
# have indices.
deferred_arg_indices.append(ix)
deferred_arg_exprs.append(
expressions.ComputedExpression(
'index_as_series',
lambda ix: ix.index.to_series(), # yapf break
[arg._frame._expr],
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Arbitrary()))
elif isinstance(arg, pd.core.generic.NDFrame):
deferred_arg_indices.append(ix)
deferred_arg_exprs.append(expressions.ConstantExpression(arg, arg[0:0]))
else:
constant_args[ix] = arg
deferred_kwarg_keys = []
deferred_kwarg_exprs = []
constant_kwargs = {key: None for key in kwargs}
for key, arg in kwargs.items():
if isinstance(arg, DeferredBase):
deferred_kwarg_keys.append(key)
deferred_kwarg_exprs.append(arg._expr)
elif isinstance(arg, pd.core.generic.NDFrame):
deferred_kwarg_keys.append(key)
deferred_kwarg_exprs.append(
expressions.ConstantExpression(arg, arg[0:0]))
else:
constant_kwargs[key] = arg
deferred_exprs = deferred_arg_exprs + deferred_kwarg_exprs
if inplace:
actual_func = _copy_and_mutate(func)
else:
actual_func = func
def apply(*actual_args):
actual_args, actual_kwargs = (actual_args[:len(deferred_arg_exprs)],
actual_args[len(deferred_arg_exprs):])
full_args = list(constant_args)
for ix, arg in zip(deferred_arg_indices, actual_args):
full_args[ix] = arg
full_kwargs = dict(constant_kwargs)
for key, arg in zip(deferred_kwarg_keys, actual_kwargs):
full_kwargs[key] = arg
return actual_func(*full_args, **full_kwargs)
if (requires_partition_by.is_subpartitioning_of(partitionings.Index()) and
sum(isinstance(arg.proxy(), pd.core.generic.NDFrame)
for arg in deferred_exprs) > 1):
# Implicit join on index if there is more than one indexed input.
actual_requires_partition_by = partitionings.JoinIndex()
else:
actual_requires_partition_by = requires_partition_by
result_expr = expressions.ComputedExpression(
name,
apply,
deferred_exprs,
requires_partition_by=actual_requires_partition_by,
preserves_partition_by=preserves_partition_by)
if inplace:
args[0]._expr = result_expr
else:
return DeferredFrame.wrap(result_expr)
wrapper.__name__ = name
if restrictions:
wrapper.__doc__ = "\n".join(
f"Only {kw}={value!r} is supported"
for (kw, value) in restrictions.items())
if base is not None:
return with_docs_from(base)(wrapper)
else:
return wrapper
def _prettify_pandas_type(pandas_type):
if pandas_type in (pd.DataFrame, pd.Series):
return f'pandas.{pandas_type.__name__}'
elif isclass(pandas_type):
return f'{pandas_type.__module__}.{pandas_type.__name__}'
elif ismodule(pandas_type):
return pandas_type.__name__
else:
raise TypeError(pandas_type)
def wont_implement_method(base_type, name, reason=None, explanation=None):
"""Generate a stub method that raises WontImplementError.
Note either reason or explanation must be specified. If both are specified,
explanation is ignored.
Args:
base_type: The pandas type of the method that this is trying to replicate.
name: The name of the method that this is aiming to replicate.
reason: If specified, use data from the corresponding entry in
``_WONT_IMPLEMENT_REASONS`` to generate a helpful exception message
and docstring for the method.
explanation: If specified, use this string as an explanation for why
this operation is not supported when generating an exception message
and docstring.
"""
if reason is not None:
if reason not in _WONT_IMPLEMENT_REASONS:
raise AssertionError(
f"reason must be one of {list(_WONT_IMPLEMENT_REASONS.keys())}, "
f"got {reason!r}")
reason_data = _WONT_IMPLEMENT_REASONS[reason]
elif explanation is not None:
reason_data = {'explanation': explanation}
else:
raise ValueError("One of (reason, explanation) must be specified")
def wrapper(*args, **kwargs):
raise WontImplementError(
f"'{name}' is not yet supported {reason_data['explanation']}",
reason=reason)
wrapper.__name__ = name
wrapper.__doc__ = (
f":meth:`{_prettify_pandas_type(base_type)}.{name}` is not yet supported "
f"in the Beam DataFrame API {reason_data['explanation']}")
if 'url' in reason_data:
wrapper.__doc__ += f"\n\n For more information see {reason_data['url']}."
return wrapper
def not_implemented_method(op, issue='20318', base_type=None):
"""Generate a stub method for ``op`` that simply raises a NotImplementedError.
For internal use only. No backwards compatibility guarantees."""
assert base_type is not None, "base_type must be specified"
issue_url = f"https://issues.apache.org/jira/{issue}." if issue.startswith(
"BEAM-") else f"https://github.com/apache/beam/issues/{issue}"
def wrapper(*args, **kwargs):
raise NotImplementedError(
f"{op!r} is not implemented yet. "
f"If support for {op!r} is important to you, please let the Beam "
"community know by writing to user@beam.apache.org "
"(see https://beam.apache.org/community/contact-us/) or commenting on "
f"{issue_url}")
wrapper.__name__ = op
wrapper.__doc__ = (
f":meth:`{_prettify_pandas_type(base_type)}.{op}` is not implemented yet "
"in the Beam DataFrame API.\n\n"
f"If support for {op!r} is important to you, please let the Beam "
"community know by `writing to user@beam.apache.org "
"<https://beam.apache.org/community/contact-us/>`_ or commenting on "
f"`{issue} <{issue_url}>`_.")
return wrapper
def _copy_and_mutate(func):
def wrapper(self, *args, **kwargs):
copy = self.copy()
func(copy, *args, **kwargs)
return copy
return wrapper
def maybe_inplace(func):
"""Handles the inplace= kwarg available in many pandas operations.
This decorator produces a new function handles the inplace kwarg. When
`inplace=False`, the new function simply yields the result of `func`
directly.
When `inplace=True`, the output of `func` is used to replace this instances
expression. The result is that any operations applied to this instance after
the inplace operation will refernce the updated expression.
For internal use only. No backwards compatibility guarantees."""
@functools.wraps(func)
def wrapper(self, inplace=False, **kwargs):
result = func(self, **kwargs)
if inplace:
self._expr = result._expr
else:
return result
return wrapper
def args_to_kwargs(base_type, removed_method=False, removed_args=None):
"""Convert all args to kwargs before calling the decorated function.
When applied to a function, this decorator creates a new function
that always calls the wrapped function with *only* keyword arguments. It
inspects the argspec for the identically-named method on `base_type` to
determine the name to use for arguments that are converted to keyword
arguments.
For internal use only. No backwards compatibility guarantees.
Args:
base_type: The pandas type of the method that this is trying to replicate.
removed_method: Whether this method has been removed in the running
Pandas version.
removed_args: If not empty, which arguments have been dropped in the
running Pandas version.
"""
def wrap(func):
if removed_method:
# Do no processing, let Beam function itself raise the error if called.
return func
removed_arg_names = removed_args if removed_args is not None else []
# We would need to add position only arguments if they ever become a thing
# in Pandas (as of 2.1 currently they aren't).
base_arg_spec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
base_arg_names = base_arg_spec.args
# Some arguments are keyword only and we still want to check against those.
all_possible_base_arg_names = base_arg_names + base_arg_spec.kwonlyargs
beam_arg_names = getfullargspec(func).args
if not_found := (set(beam_arg_names) - set(all_possible_base_arg_names) -
set(removed_arg_names)):
raise TypeError(
f"Beam definition of {func.__name__} has arguments that are not found"
f" in the base version of the function: {not_found}")
@functools.wraps(func)
def wrapper(*args, **kwargs):
if len(args) > len(base_arg_names):
raise TypeError(f"{func.__name__} got too many positioned arguments.")
for name, value in zip(base_arg_names, args):
if name in kwargs:
raise TypeError(
"%s() got multiple values for argument '%s'" %
(func.__name__, name))
kwargs[name] = value
# Still have to populate these for the Beam function signature.
if removed_args:
for name in removed_args:
if name not in kwargs:
kwargs[name] = None
return func(**kwargs)
return wrapper
return wrap
BEAM_SPECIFIC = "Differences from pandas"
SECTION_ORDER = [
'Parameters',
'Returns',
'Raises',
BEAM_SPECIFIC,
'See Also',
'Notes',
'Examples'
]
EXAMPLES_DISCLAIMER = (
"**NOTE:** These examples are pulled directly from the pandas "
"documentation for convenience. Usage of the Beam DataFrame API will look "
"different because it is a deferred API.")
EXAMPLES_DIFFERENCES = EXAMPLES_DISCLAIMER + (
" In addition, some arguments shown here may not be supported, see "
f"**{BEAM_SPECIFIC!r}** for details.")
def with_docs_from(base_type, name=None, removed_method=False):
"""Decorator that updates the documentation from the wrapped function to
duplicate the documentation from the identically-named method in `base_type`.
Any docstring on the original function will be included in the new function
under a "Differences from pandas" heading.
removed_method used in cases where a method has been removed in a later
version of Pandas.
"""
def wrap(func):
if removed_method:
func.__doc__ = (
"This method has been removed in the current version of Pandas.")
return func
fn_name = name or func.__name__
orig_doc = getattr(base_type, fn_name).__doc__
if orig_doc is None:
return func
orig_doc = cleandoc(orig_doc)
section_splits = re.split(r'^(.*)$\n^-+$\n', orig_doc, flags=re.MULTILINE)
intro = section_splits[0].strip()
sections = dict(zip(section_splits[1::2], section_splits[2::2]))
beam_has_differences = bool(func.__doc__)
for header, content in sections.items():
content = content.strip()
# Replace references to version numbers so its clear they reference
# *pandas* versions
content = re.sub(r'([Vv]ersion\s+[\d\.]+)', r'pandas \1', content)
if header == "Examples":
content = '\n\n'.join([
(
EXAMPLES_DIFFERENCES
if beam_has_differences else EXAMPLES_DISCLAIMER),
# Indent the examples under a doctest heading,
# add skipif option. This makes sure our doctest
# framework doesn't run these pandas tests.
(".. doctest::\n"
" :skipif: True"),
re.sub(r"^", " ", content, flags=re.MULTILINE),
])
elif "Examples" in content and ">>>" in content:
# some new examples don't have the correct heading
# this catches those examples
split_content = content.split("Examples")
content = '\n\n'.join([
split_content[0],
"Examples\n",
# Indent the code snippet under a doctest heading,
# add skipif option. This makes sure our doctest
# framework doesn't run these pandas tests.
(".. doctest::\n"
" :skipif: True"),
re.sub(r"^", " ", content, flags=re.MULTILINE),
split_content[1]
])
else:
content = content.replace('DataFrame', 'DeferredDataFrame').replace(
'Series', 'DeferredSeries')
sections[header] = content
if beam_has_differences:
sections[BEAM_SPECIFIC] = cleandoc(func.__doc__)
else:
sections[BEAM_SPECIFIC] = (
"This operation has no known divergences from the "
"pandas API.")
def format_section(header):
return '\n'.join([header, ''.join('-' for _ in header), sections[header]])
func.__doc__ = '\n\n'.join([intro] + [
format_section(header) for header in SECTION_ORDER if header in sections
])
return func
return wrap
def populate_defaults(base_type, removed_method=False, removed_args=None):
"""Populate default values for keyword arguments in decorated function.
When applied to a function, this decorator creates a new function
with default values for all keyword arguments, based on the default values
for the identically-named method on `base_type`.
For internal use only. No backwards compatibility guarantees.
Args:
base_type: The pandas type of the method that this is trying to replicate.
removed_method: Whether this method has been removed in the running
Pandas version.
removed_args: If not empty, which arguments have been dropped in the
running Pandas version.
"""
def wrap(func):
if removed_method:
return func
base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
if not base_argspec.defaults and not base_argspec.kwonlydefaults:
return func
arg_to_default = {}
if base_argspec.defaults:
arg_to_default.update(
zip(
base_argspec.args[-len(base_argspec.defaults):],
base_argspec.defaults))
if base_argspec.kwonlydefaults:
arg_to_default.update(base_argspec.kwonlydefaults)
unwrapped_func = unwrap(func)
# args that do not have defaults in func, but do have defaults in base
func_argspec = getfullargspec(unwrapped_func)
num_non_defaults = len(func_argspec.args) - len(func_argspec.defaults or ())
defaults_to_populate = set(
func_argspec.args[:num_non_defaults]).intersection(
arg_to_default.keys())
if removed_args:
defaults_to_populate -= set(removed_args)
# In pandas 2, many methods rely on the default copy=None
# to mean that copy is the value of copy_on_write. Since
# copy_on_write will always be true for Beam, just fill it
# in here. In pandas 1, the default was True anyway.
if 'copy' in arg_to_default and arg_to_default['copy'] is None:
arg_to_default['copy'] = True
@functools.wraps(func)
def wrapper(**kwargs):
for name in defaults_to_populate:
if name not in kwargs:
kwargs[name] = arg_to_default[name]
return func(**kwargs)
return wrapper
return wrap
_WONT_IMPLEMENT_REASONS = {
'order-sensitive': {
'explanation': "because it is sensitive to the order of the data.",
'url': 'https://s.apache.org/dataframe-order-sensitive-operations',
},
'non-deferred-columns': {
'explanation': (
"because the columns in the output DataFrame depend "
"on the data."),
'url': 'https://s.apache.org/dataframe-non-deferred-columns',
},
'non-deferred-result': {
'explanation': (
"because it produces an output type that is not "
"deferred."),
'url': 'https://s.apache.org/dataframe-non-deferred-result',
},
'plotting-tools': {
'explanation': "because it is a plotting tool.",
'url': 'https://s.apache.org/dataframe-plotting-tools',
},
'event-time-semantics': {
'explanation': (
"because implementing it would require integrating with Beam "
"event-time semantics"),
'url': 'https://s.apache.org/dataframe-event-time-semantics',
},
'deprecated': {
'explanation': "because it is deprecated in pandas.",
},
'experimental': {
'explanation': "because it is experimental in pandas.",
},
}
class WontImplementError(NotImplementedError):
"""An subclass of NotImplementedError to raise indicating that implementing
the given method is not planned.
Raising this error will also prevent this doctests from being validated
when run with the beam dataframe validation doctest runner.
"""
def __init__(self, msg, reason=None):
if reason is not None:
if reason not in _WONT_IMPLEMENT_REASONS:
raise AssertionError(
f"reason must be one of {list(_WONT_IMPLEMENT_REASONS.keys())}, "
f"got {reason!r}")
reason_data = _WONT_IMPLEMENT_REASONS[reason]
if 'url' in reason_data:
msg = f"{msg}\nFor more information see {reason_data['url']}."
super().__init__(msg)