Skip to content

Commit

Permalink
read_csv changes and improvements in performance (modin-project#10)
Browse files Browse the repository at this point in the history
* Test changes to io

* Update io changes

* Fix performance bug

* Debugging performance

* Debugging performance on large IO

* Making some performance tuning changes

* Cleaning up and adding performance improvements

* Cleaning up

* Addressing comments

* Addressing comments
  • Loading branch information
devin-petersohn authored Sep 13, 2018
1 parent 05759d9 commit 5f2a307
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 115 deletions.
45 changes: 25 additions & 20 deletions modin/data_management/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from pandas.compat import string_types
from pandas.core.dtypes.cast import find_common_type
from pandas.core.dtypes.common import (_get_dtype_from_object, is_list_like)
from pandas.core.index import _ensure_index

from .partitioning.partition_collections import BlockPartitions, RayBlockPartitions
from .partitioning.remote_partition import RayRemotePartition
Expand All @@ -26,7 +27,7 @@ def __init__(self, block_partitions_object, index, columns, dtypes=None):
if dtypes is not None:
self._dtype_cache = dtypes

# dtypes
# Index, columns and dtypes objects
_dtype_cache = None

def _get_dtype(self):
Expand All @@ -45,37 +46,41 @@ def _set_dtype(self, dtypes):

dtypes = property(_get_dtype, _set_dtype)

# Index and columns objects
# These objects are currently not distributed.
# Note: These are more performant as pandas.Series objects than they are as
# pandas.DataFrame objects.
#
# _index_cache is a pandas.Series that holds the index
_index_cache = None
# _columns_cache is a pandas.Series that holds the columns
_columns_cache = None

def _get_index(self):
return self._index_cache.index
return self._index_cache

def _get_columns(self):
return self._columns_cache.index
return self._columns_cache

def _validate_set_axis(self, new_labels, old_labels):
new_labels = _ensure_index(new_labels)
old_len = len(old_labels)
new_len = len(new_labels)
if old_len != new_len:
raise ValueError('Length mismatch: Expected axis has %d elements, '
'new values have %d elements' % (old_len, new_len))
return new_labels

def _set_index(self, new_index):
if self._index_cache is not None:
self._index_cache.index = new_index
if self._index_cache is None:
self._index_cache = _ensure_index(new_index)
else:
self._index_cache = pandas.Series(index=new_index)
new_index = self._validate_set_axis(new_index, self._index_cache)
self._index_cache = new_index

def _set_columns(self, new_columns):
if self._columns_cache is not None:
self._columns_cache.index = new_columns
if self._columns_cache is None:
self._columns_cache = _ensure_index(new_columns)
else:
self._columns_cache = pandas.Series(index=new_columns)
new_columns = self._validate_set_axis(new_columns, self._columns_cache)
self._columns_cache = new_columns

columns = property(_get_columns, _set_columns)
index = property(_get_index, _set_index)

# END Index, columns, and dtypes objects

def compute_index(self, axis, data_object, compute_diff=True):
Expand Down Expand Up @@ -1000,20 +1005,20 @@ def head(self, n):
# ensure that we extract the correct data on each node. The index
# on a transposed manager is already set to the correct value, so
# we need to only take the head of that instead of re-transposing.
result = cls(self.data.transpose().take(1, n).transpose(), self.index[:n], self.columns, self.dtypes)
result = cls(self.data.transpose().take(1, n).transpose(), self.index[:n], self.columns, self._dtype_cache)
result._is_transposed = True
else:
result = cls(self.data.take(0, n), self.index[:n], self.columns, self.dtypes)
result = cls(self.data.take(0, n), self.index[:n], self.columns, self._dtype_cache)
return result

def tail(self, n):
cls = type(self)
# See head for an explanation of the transposed behavior
if self._is_transposed:
result = cls(self.data.transpose().take(1, -n).transpose(), self.index[-n:], self.columns, self.dtypes)
result = cls(self.data.transpose().take(1, -n).transpose(), self.index[-n:], self.columns, self._dtype_cache)
result._is_transposed = True
else:
result = cls(self.data.take(0, -n), self.index[-n:], self.columns, self.dtypes)
result = cls(self.data.take(0, -n), self.index[-n:], self.columns, self._dtype_cache)
return result

def front(self, n):
Expand Down
45 changes: 43 additions & 2 deletions modin/data_management/partitioning/partition_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import print_function

import numpy as np
import ray
import pandas

from .remote_partition import RayRemotePartition
Expand Down Expand Up @@ -99,7 +100,7 @@ def block_lengths(self):
# The first column will have the correct lengths. We have an
# invariant that requires that all blocks be the same length in a
# row of blocks.
self._lengths_cache = [obj.length for obj in self.partitions.T[0]]
self._lengths_cache = [obj.length().get() for obj in self.partitions.T[0]]
return self._lengths_cache

# Widths of the blocks
Expand All @@ -116,7 +117,7 @@ def block_widths(self):
# The first column will have the correct lengths. We have an
# invariant that requires that all blocks be the same width in a
# column of blocks.
self._widths_cache = [obj.width for obj in self.partitions[0]]
self._widths_cache = [obj.width().get() for obj in self.partitions[0]]
return self._widths_cache

def full_reduce(self, map_func, reduce_func, axis):
Expand Down Expand Up @@ -672,6 +673,9 @@ def __getitem__(self, key):
cls = type(self)
return cls(self.partitions[key])

def __len__(self):
return sum(self.block_lengths)


class RayBlockPartitions(BlockPartitions):
"""This method implements the interface in `BlockPartitions`."""
Expand All @@ -682,6 +686,43 @@ class RayBlockPartitions(BlockPartitions):
def __init__(self, partitions):
self.partitions = partitions

# We override these for performance reasons.
# Lengths of the blocks
_lengths_cache = None

# These are set up as properties so that we only use them when we need
# them. We also do not want to trigger this computation on object creation.
@property
def block_lengths(self):
"""Gets the lengths of the blocks.
Note: This works with the property structure `_lengths_cache` to avoid
having to recompute these values each time they are needed.
"""
if self._lengths_cache is None:
# The first column will have the correct lengths. We have an
# invariant that requires that all blocks be the same length in a
# row of blocks.
self._lengths_cache = ray.get([obj.length().oid for obj in self.partitions.T[0]])
return self._lengths_cache

# Widths of the blocks
_widths_cache = None

@property
def block_widths(self):
"""Gets the widths of the blocks.
Note: This works with the property structure `_widths_cache` to avoid
having to recompute these values each time they are needed.
"""
if self._widths_cache is None:
# The first column will have the correct lengths. We have an
# invariant that requires that all blocks be the same width in a
# column of blocks.
self._widths_cache = ray.get([obj.width().oid for obj in self.partitions[0]])
return self._widths_cache

@property
def column_partitions(self):
"""A list of `RayColumnPartition` objects."""
Expand Down
6 changes: 2 additions & 4 deletions modin/data_management/partitioning/remote_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,22 @@ def width_extraction_fn(cls):
_length_cache = None
_width_cache = None

@property
def length(self):
if self._length_cache is None:
cls = type(self)
func = cls.length_extraction_fn()
preprocessed_func = cls.preprocess_func(func)

self._length_cache = self.apply(preprocessed_func).get()
self._length_cache = self.apply(preprocessed_func)
return self._length_cache

@property
def width(self):
if self._width_cache is None:
cls = type(self)
func = cls.width_extraction_fn()
preprocessed_func = cls.preprocess_func(func)

self._width_cache = self.apply(preprocessed_func).get()
self._width_cache = self.apply(preprocessed_func)
return self._width_cache


Expand Down
Loading

0 comments on commit 5f2a307

Please sign in to comment.