Skip to content

Commit

Permalink
Concat now working with new architecture (modin-project#9)
Browse files Browse the repository at this point in the history
* concat now working with new architecture

* fixing functionality for pandas Series

* updated append_list_of_data_managers function for concat

* minor stylistic fix

* remove unused append_data_manager function

* fixed join

* removed axis arg from join function
  • Loading branch information
osalpekar authored and devin-petersohn committed Sep 13, 2018
1 parent 8cc142d commit 05759d9
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 128 deletions.
66 changes: 26 additions & 40 deletions modin/data_management/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,49 +169,44 @@ def _join_index_objects(self, axis, other_index, how, sort=True):
else:
return self.index.join(other_index, how=how, sort=sort)

def concat(self, axis, other, **kwargs):
ignore_index = kwargs.get("ignore_index", False)
if axis == 0:
if isinstance(other, list):
return self._append_list_of_managers(other, ignore_index)
else:
return self._append_data_manager(other, ignore_index)
def join(self, other, **kwargs):
if isinstance(other, list):
return self._join_list_of_managers(other, **kwargs)
else:
if isinstance(other, list):
return self._join_list_of_managers(other, **kwargs)
else:
return self._join_data_manager(other, **kwargs)

def _append_data_manager(self, other, ignore_index):
assert isinstance(other, type(self)), \
"This method is for data manager objects only"
cls = type(self)
return self._join_data_manager(other, **kwargs)

joined_columns = self._join_index_objects(0, other.columns, 'outer')
to_append = other.reindex(1, joined_columns).data
new_self = self.reindex(1, joined_columns).data

new_data = new_self.concat(0, to_append)
new_index = self.index.append(other.index) if not ignore_index else pandas.RangeIndex(len(self.index) + len(other.index))

return cls(new_data, new_index, joined_columns)
def concat(self, axis, other, **kwargs):
return self._append_list_of_managers(other, axis, **kwargs)

def _append_list_of_managers(self, others, ignore_index):
def _append_list_of_managers(self, others, axis, **kwargs):
assert isinstance(others, list), \
"This method is for lists of DataManager objects only"
assert all(isinstance(other, type(self)) for other in others), \
"Different Manager objects are being used. This is not allowed"
cls = type(self)

joined_columns = self._join_index_objects(0, [other.columns for other in others], 'outer')
sort = kwargs.get("sort", None)
join = kwargs.get("join", "outer")
ignore_index = kwargs.get("ignore_index", False)

joined_axis = self._join_index_objects(axis, [other.columns if axis == 0
else other.index for other in others], join, sort=sort)

to_append = [other.reindex(axis ^ 1, joined_axis).data for other in others]
new_self = self.reindex(axis ^ 1, joined_axis).data
new_data = new_self.concat(axis, to_append)

to_append = [other.reindex(1, joined_columns).data for other in others]
new_self = self.reindex(1, joined_columns).data
if axis == 0:
new_index = self.index.append([other.index for other in others]) if not ignore_index else pandas.RangeIndex(len(self.index) + sum([len(other.index) for other in others]))

return cls(new_data, new_index, joined_axis)
else:
self_proxy_columns = pandas.DataFrame(columns=self.columns).columns
others_proxy_columns = [pandas.DataFrame(columns=other.columns).columns for other in others]
new_columns = self_proxy_columns.append(others_proxy_columns)

new_data = new_self.concat(0, to_append)
new_index = self.index.append([other.index for other in others]) if not ignore_index else pandas.RangeIndex(len(self.index) + sum([len(other.index) for other in others]))
return cls(new_data, joined_axis, new_columns)

return cls(new_data, new_index, joined_columns)

def _join_data_manager(self, other, **kwargs):
assert isinstance(other, type(self)), \
Expand Down Expand Up @@ -252,12 +247,6 @@ def _join_list_of_managers(self, others, **kwargs):
lsuffix = kwargs.get("lsuffix", "")
rsuffix = kwargs.get("rsuffix", "")

assert isinstance(others, list), \
"This method is for lists of DataManager objects only"
assert all(isinstance(other, type(self)) for other in others), \
"Different Manager objects are being used. This is not allowed"
cls = type(self)

joined_index = self._join_index_objects(1, [other.index for other in others], how, sort=sort)

to_join = [other.reindex(0, joined_index).data for other in others]
Expand Down Expand Up @@ -1069,9 +1058,6 @@ def from_pandas(cls, df, block_partitions_cls):
new_columns = df.columns
new_dtypes = df.dtypes

# Set the columns to RangeIndex for memory efficiency
df.index = pandas.RangeIndex(len(df.index))
df.columns = pandas.RangeIndex(len(df.columns))
new_data = block_partitions_cls.from_pandas(df)

return cls(new_data, new_index, new_columns, dtypes=new_dtypes)
Expand Down
103 changes: 17 additions & 86 deletions modin/pandas/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,89 +64,20 @@ def concat(objs,
raise ValueError("Only can inner (intersect) or outer (union) join the"
" other axis")

# We need this in a list because we use it later.
all_index, all_columns = list(
zip(*[(obj.index, obj.columns) for obj in objs]))

def series_to_df(series, columns):
df = pandas.DataFrame(series)
df.columns = columns
return DataFrame(df)

# Pandas puts all of the Series in a single column named 0. This is
# true regardless of the existence of another column named 0 in the
# concat.
if axis == 0:
objs = [
series_to_df(obj, [0]) if isinstance(obj, pandas.Series) else obj
for obj in objs
]
else:
# Pandas starts the count at 0 so this will increment the names as
# long as there's a new nameless Series being added.
def name_incrementer(i):
val = i[0]
i[0] += 1
return val

i = [0]
objs = [
series_to_df(
obj, obj.name if obj.name is not None else name_incrementer(i))
if isinstance(obj, pandas.Series) else obj for obj in objs
]

# Using concat on the columns and index is fast because they're empty,
# and it forces the error checking. It also puts the columns in the
# correct order for us.
final_index = \
pandas.concat([pandas.DataFrame(index=idx) for idx in all_index],
axis=axis, join=join, join_axes=join_axes,
ignore_index=ignore_index, keys=keys, levels=levels,
names=names, verify_integrity=verify_integrity,
copy=False).index
final_columns = \
pandas.concat([pandas.DataFrame(columns=col)
for col in all_columns],
axis=axis, join=join, join_axes=join_axes,
ignore_index=ignore_index, keys=keys, levels=levels,
names=names, verify_integrity=verify_integrity,
copy=False).columns

# Put all of the DataFrames into Ray format
# TODO just partition the DataFrames instead of building a new Ray DF.
objs = [
DataFrame(obj)
if isinstance(obj, (pandas.DataFrame, pandas.Series)) else obj
for obj in objs
]

# Here we reuse all_columns/index so we don't have to materialize objects
# from remote memory built in the previous line. In the future, we won't be
# building new DataFrames, rather just partitioning the DataFrames.
if axis == 0:
new_blocks = np.array([
_reindex_helper._submit(
args=tuple([
all_columns[i], final_columns, axis,
len(objs[0]._block_partitions)
] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions))
for i in range(len(objs)) for part in objs[i]._block_partitions
])
else:
# Transposing the columns is necessary because the remote task treats
# everything like rows and returns in row-major format. Luckily, this
# operation is cheap in numpy.
new_blocks = np.array([
_reindex_helper._submit(
args=tuple([
all_index[i], final_index, axis,
len(objs[0]._block_partitions.T)
] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions.T))
for i in range(len(objs)) for part in objs[i]._block_partitions.T
]).T

return DataFrame(
block_partitions=new_blocks, columns=final_columns, index=final_index)
# We have the weird Series and axis check because, when concatenating a
# dataframe to a series on axis=0, pandas ignores the name of the series,
# and this check aims to mirror that (possibly buggy) functionality
objs = [obj if isinstance(obj, DataFrame) else DataFrame(obj.rename()) if
isinstance(obj, pandas.Series) and axis == 0 else DataFrame(obj)
for obj in objs]
df = objs[0]
objs = [obj._data_manager for obj in objs]
new_manager = df._data_manager.concat(axis, objs[1:], join=join,
join_axes=None,
ignore_index=False,
keys=None,
levels=None,
names=None,
verify_integrity=False,
copy=True)
return DataFrame(data_manager=new_manager)
4 changes: 2 additions & 2 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2011,7 +2011,7 @@ def join(self,
# would otherwise require a lot more logic.
pandas.DataFrame(columns=self.columns).join(pandas.DataFrame(columns=other.columns), lsuffix=lsuffix, rsuffix=rsuffix).columns

return DataFrame(data_manager=self._data_manager.concat(1, other._data_manager, how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort))
return DataFrame(data_manager=self._data_manager.join(other._data_manager, how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort))
else:
# This constraint carried over from Pandas.
if on is not None:
Expand All @@ -2024,7 +2024,7 @@ def join(self,
lsuffix=lsuffix,
rsuffix=rsuffix).columns

return DataFrame(data_manager=self._data_manager.concat(1, [obj._data_manager for obj in other], how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort))
return DataFrame(data_manager=self._data_manager.join([obj._data_manager for obj in other], how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort))

def kurt(self,
axis=None,
Expand Down
12 changes: 12 additions & 0 deletions modin/pandas/test/test_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ def test_ray_concat():
assert ray_df_equals_pandas(
pd.concat([ray_df, ray_df2]), pandas.concat([df, df2]))

def test_ray_concat_with_series():
df, df2 = generate_dfs()
ray_df, ray_df2 = from_pandas(df), from_pandas(df2)
pandas_series = pandas.Series([1,2,3,4], name="new_col")

assert ray_df_equals_pandas(
pd.concat([ray_df, ray_df2, pandas_series], axis=0), pandas.concat([df,
df2, pandas_series], axis=0))

assert ray_df_equals_pandas(
pd.concat([ray_df, ray_df2, pandas_series], axis=1), pandas.concat([df,
df2, pandas_series], axis=1))

def test_ray_concat_on_index():
df, df2 = generate_dfs()
Expand Down

0 comments on commit 05759d9

Please sign in to comment.