Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Pandas==2.x in Apache Beam. #27221

Closed
2 of 15 tasks
JorgeCardona opened this issue Jun 22, 2023 · 6 comments · Fixed by #28422, #28454, #28499 or #28636
Closed
2 of 15 tasks

Support Pandas==2.x in Apache Beam. #27221

JorgeCardona opened this issue Jun 22, 2023 · 6 comments · Fixed by #28422, #28454, #28499 or #28636
Assignees
Labels
done & done Issue has been reviewed after it was closed for verification, followups, etc. new feature P2 python

Comments

@JorgeCardona
Copy link

JorgeCardona commented Jun 22, 2023

What happened?

Beam doesn't work with pandas>=2.0.0

Reproducible Example

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions()
pipeline = beam.Pipeline(options=options)
data = pipeline | 'CreateData' >> beam.Create(['Pandas, 2.0,2'])
data | 'PrintData' >> beam.Map(print)
pipeline.run()

Issue Description

Pipeline fails with a runtime error: `AttributeError: type object 'Series' has no attribute 'append'`. Sample stacktrace:
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[2], line 8
      4 options = PipelineOptions()
      6 pipeline = beam.Pipeline(options=options)
----> 8 data = pipeline | 'CreateData' >> beam.Create(['Pandas, 2.0,2'])
     10 data | 'PrintData' >> beam.Map(print)
     11 # Ejecutar el pipeline

File /usr/local/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py:1092, in _NamedPTransform.__ror__(self, pvalueish, _unused)
   1091 def __ror__(self, pvalueish, _unused=None):
-> 1092   return self.transform.__ror__(pvalueish, self.label)

File /usr/local/lib/python3.11/site-packages/apache_beam/transforms/ptransform.py:614, in PTransform.__ror__(self, left, label)
    612 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
    613 self.pipeline = p
--> 614 result = p.apply(self, pvalueish, label)
    615 if deferred:
    616   return result

File /usr/local/lib/python3.11/site-packages/apache_beam/pipeline.py:666, in Pipeline.apply(self, transform, pvalueish, label)
    664 old_label, transform.label = transform.label, label
    665 try:
--> 666   return self.apply(transform, pvalueish)
    667 finally:
    668   transform.label = old_label

File /usr/local/lib/python3.11/site-packages/apache_beam/pipeline.py:674, in Pipeline.apply(self, transform, pvalueish, label)
    670 # Attempts to alter the label of the transform to be applied only when it's
    671 # a top-level transform so that the cell number will not be prepended to
    672 # every child transform in a composite.
    673 if self._current_transform() is self._root_transform():
--> 674   alter_label_if_ipython(transform, pvalueish)
    676 full_label = '/'.join(
    677     [self._current_transform().full_label, label or
    678      transform.label]).lstrip('/')
    679 if full_label in self.applied_labels:

File /usr/local/lib/python3.11/site-packages/apache_beam/utils/interactive_utils.py:71, in alter_label_if_ipython(transform, pvalueish)
     59 """Alters the label to an interactive label with ipython prompt metadata
     60 prefixed for the given transform if the given pvalueish belongs to a
     61 user-defined pipeline and current code execution is within an ipython kernel.
   (...)
     68 `Cell {prompt}: {original_label}`.
     69 """
     70 if is_in_ipython():
---> 71   from apache_beam.runners.interactive import interactive_environment as ie
     72   # Tracks user defined pipeline instances in watched scopes so that we only
     73   # alter labels for any transform to pvalueish belonging to those pipeline
     74   # instances, excluding any transform to be applied in other pipeline
     75   # instances the Beam SDK creates implicitly.
     76   ie.current_env().track_user_pipelines()

File /usr/local/lib/python3.11/site-packages/apache_beam/runners/interactive/interactive_environment.py:41
     39 from apache_beam.runners.direct import direct_runner
     40 from apache_beam.runners.interactive import cache_manager as cache
---> 41 from apache_beam.runners.interactive.messaging.interactive_environment_inspector import InteractiveEnvironmentInspector
     42 from apache_beam.runners.interactive.recording_manager import RecordingManager
     43 from apache_beam.runners.interactive.sql.sql_chain import SqlChain

File /usr/local/lib/python3.11/site-packages/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py:26
     23 # pytype: skip-file
     25 import apache_beam as beam
---> 26 from apache_beam.runners.interactive.utils import as_json
     27 from apache_beam.runners.interactive.utils import obfuscate
     30 class InteractiveEnvironmentInspector(object):

File /usr/local/lib/python3.11/site-packages/apache_beam/runners/interactive/utils.py:33
     30 import pandas as pd
     32 import apache_beam as beam
---> 33 from apache_beam.dataframe.convert import to_pcollection
     34 from apache_beam.dataframe.frame_base import DeferredBase
     35 from apache_beam.internal.gcp import auth

File /usr/local/lib/python3.11/site-packages/apache_beam/dataframe/convert.py:33
     31 from apache_beam.dataframe import expressions
     32 from apache_beam.dataframe import frame_base
---> 33 from apache_beam.dataframe import transforms
     34 from apache_beam.dataframe.schemas import element_typehint_from_dataframe_proxy
     35 from apache_beam.dataframe.schemas import generate_proxy

File /usr/local/lib/python3.11/site-packages/apache_beam/dataframe/transforms.py:33
     31 from apache_beam import transforms
     32 from apache_beam.dataframe import expressions
---> 33 from apache_beam.dataframe import frames  # pylint: disable=unused-import
     34 from apache_beam.dataframe import partitionings
     35 from apache_beam.utils import windowed_value

File /usr/local/lib/python3.11/site-packages/apache_beam/dataframe/frames.py:1231
   1224       return func(*args, **kwargs)
   1226     return func(self, *args, **kwargs)
   1229 @populate_not_implemented(pd.Series)
   1230 @frame_base.DeferredFrame._register_for(pd.Series)
-> 1231 class DeferredSeries(DeferredDataFrameOrSeries):
   1232   def __repr__(self):
   1233     return (
   1234         f'DeferredSeries(name={self.name!r}, dtype={self.dtype}, '
   1235         f'{self._render_indexes()})')

File /usr/local/lib/python3.11/site-packages/apache_beam/dataframe/frames.py:1338, in DeferredSeries()
   1331 transpose = frame_base._elementwise_method('transpose', base=pd.Series)
   1332 shape = property(
   1333     frame_base.wont_implement_method(
   1334         pd.Series, 'shape', reason="non-deferred-result"))
   1336 @frame_base.with_docs_from(pd.Series)
   1337 @frame_base.args_to_kwargs(pd.Series)
-> 1338 @frame_base.populate_defaults(pd.Series)
   1339 def append(self, to_append, ignore_index, verify_integrity, **kwargs):
   1340   """``ignore_index=True`` is not supported, because it requires generating an
   1341   order-sensitive index."""
   1342   if not isinstance(to_append, DeferredSeries):

File /usr/local/lib/python3.11/site-packages/apache_beam/dataframe/frame_base.py:600, in populate_defaults.<locals>.wrap(func)
    599 def wrap(func):
--> 600   base_argspec = getfullargspec(unwrap(getattr(base_type, func.__name__)))
    601   if not base_argspec.defaults:
    602     return func

AttributeError: type object 'Series' has no attribute 'append'

Expected Behavior

the code must be the same result that version 1.5.3 from pandas
image

Installed Versions

Replace this line with the output of pd.show_versions()

version that fails Pandas 2.0.2
image

version that working ok Pandas 1.5.3
image

Response when I reported the issue on the panda's project

pandas-dev/pandas#53799 (comment)

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@AnandInguva
Copy link
Contributor

Yes, Apache Beam doesn't yet support Pandas 2.x since Pandas 2.x removed append from Series and Dataframe

@AnandInguva AnandInguva added P2 and removed P1 labels Jun 23, 2023
@AnandInguva
Copy link
Contributor

Moving this to P2 since its a support feature rather than a bug.

@AnandInguva AnandInguva changed the title [Bug]: Incompatible apache-beam==2.48.0 with Pandas==2.0.2 Incompatible apache-beam==2.48.0 with Pandas==2.0.2 Jun 23, 2023
@tvalentyn tvalentyn changed the title Incompatible apache-beam==2.48.0 with Pandas==2.0.2 Support Pandas==2.x in Apache Beam. Jul 11, 2023
@tvalentyn
Copy link
Contributor

Thanks for reporting! This is a known issue, but not worked on to my knowledge. Contributions to fix the issue or further investigation/breakdown on what it would take to support Pandas 2.x are welcome.

@tjni
Copy link
Contributor

tjni commented Aug 6, 2023

I started working on this and could use some opinions on the best way to proceed.

  1. For methods that have been removed from Series or DataFrame in Pandas, would we like to reimplement them? I presume, but I am not sure what Beam's deprecation strategy is for these APIs.

  2. I found at least one method (Series.mad and DataFrame.mad), for which I am not confident I can reimplement without missing something subtle. The migration guide is lean on details and does not cover nuances like what to do for various values of the method arguments. I am getting closer to figuring out how it works from reading the source code, but I am still worried that I might miss something subtle. I can think of several ways to handle this, but I'd love to get some other unbiased opinions.

@caneff
Copy link
Contributor

caneff commented Sep 12, 2023

.take-issue

Cleaning this up. The goal is that the Pandas API stays consistent with Pandas. So I'm gating removals on Pandas version.

@tvalentyn
Copy link
Contributor

Thanks a lot for stepping in to help with this effort, @caneff .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment