Skip to content

Commit

Permalink
Improve import times (#3055)
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr committed Oct 8, 2018
1 parent 6de3984 commit e79e28a
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 68 deletions.
15 changes: 6 additions & 9 deletions holoviews/core/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
import numpy as np
import param

from ..dimension import redim
from ..util import unique_iterator
from .. import util
from ..dimension import redim, Dimension, process_dimensions
from ..element import Element
from ..ndmapping import OrderedDict
from ..spaces import HoloMap, DynamicMap
from .interface import Interface, iloc, ndloc
from .array import ArrayInterface
from .dictionary import DictInterface
Expand All @@ -34,7 +37,6 @@
'following error: %s' % e)

try:
import xarray # noqa (Availability import)
from .xarray import XArrayInterface # noqa (Conditional API import)
datatypes.append('xarray')
except ImportError:
Expand All @@ -49,11 +51,6 @@
if 'array' not in datatypes:
datatypes.append('array')

from ..dimension import Dimension, process_dimensions
from ..element import Element
from ..ndmapping import OrderedDict
from ..spaces import HoloMap, DynamicMap
from .. import util


def concat(datasets, datatype=None):
Expand Down Expand Up @@ -670,7 +667,7 @@ def clone(self, data=None, shared_data=True, new_type=None, *args, **overrides):
"""
if 'datatype' not in overrides:
datatypes = [self.interface.datatype] + self.datatype
overrides['datatype'] = list(unique_iterator(datatypes))
overrides['datatype'] = list(util.unique_iterator(datatypes))
return super(Dataset, self).clone(data, shared_data, new_type, *args, **overrides)


Expand Down
23 changes: 19 additions & 4 deletions holoviews/core/data/dask.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from __future__ import absolute_import

import sys
try:
import itertools.izip as zip
except ImportError:
pass

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.dataframe import DataFrame, Series

from .. import util
from ..dimension import Dimension
Expand Down Expand Up @@ -37,16 +36,29 @@ class DaskInterface(PandasInterface):
some functions applied with aggregate and reduce will not work.
"""

types = (DataFrame, Series)
types = ()

datatype = 'dask'

default_partitions = 100

@classmethod
def loaded(cls):
return 'dask' in sys.modules

@classmethod
def applies(cls, obj):
if not cls.loaded():
return False
import dask.dataframe as dd
return isinstance(obj, (dd.DataFrame, dd.Series))

@classmethod
def init(cls, eltype, data, kdims, vdims):
import dask.dataframe as dd

data, dims, extra = PandasInterface.init(eltype, data, kdims, vdims)
if not isinstance(data, DataFrame):
if not isinstance(data, dd.DataFrame):
data = dd.from_pandas(data, npartitions=cls.default_partitions, sort=False)
kdims = [d.name if isinstance(d, Dimension) else d for d in dims['kdims']]

Expand All @@ -64,6 +76,7 @@ def shape(cls, dataset):

@classmethod
def range(cls, columns, dimension):
import dask.dataframe as dd
column = columns.data[columns.get_dimension(dimension).name]
if column.dtype.kind == 'O':
column = np.sort(column[column.notnull()].compute())
Expand Down Expand Up @@ -211,6 +224,7 @@ def unpack_scalar(cls, columns, data):
Given a columns object and data in the appropriate format for
the interface, return a simple scalar.
"""
import dask.dataframe as dd
if len(data.columns) > 1 or len(data) != 1:
return data
if isinstance(data, dd.DataFrame):
Expand Down Expand Up @@ -245,6 +259,7 @@ def add_dimension(cls, columns, dimension, dim_pos, values, vdim):

@classmethod
def concat(cls, datasets, dimensions, vdims):
import dask.dataframe as dd
dataframes = []
for key, ds in datasets:
data = ds.data.copy()
Expand Down
33 changes: 16 additions & 17 deletions holoviews/core/data/grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,7 @@
except ImportError:
pass


import numpy as np
array_types = (np.ndarray,)

try:
import dask.array as da
array_types += (da.Array,)
except ImportError:
da = None

def is_dask(array):
return da and isinstance(array, da.Array)

from .dictionary import DictInterface
from .interface import Interface, DataError
Expand All @@ -27,6 +16,7 @@ def is_dask(array):
from ..dimension import OrderedDict as cyODict
from ..ndmapping import NdMapping, item_check, sorted_context
from .. import util
from .interface import is_dask, dask_array_module, get_array_types



Expand Down Expand Up @@ -90,7 +80,7 @@ def init(cls, eltype, data, kdims, vdims):
name = dimension_name(dim)
if name not in data:
raise ValueError("Values for dimension %s not found" % dim)
if not isinstance(data[name], array_types):
if not isinstance(data[name], get_array_types()):
data[name] = np.array(data[name])

kdim_names = [dimension_name(d) for d in kdims]
Expand Down Expand Up @@ -144,7 +134,7 @@ def concat_dim(cls, datasets, dim, vdims):
'of arrays must match. %s found that arrays '
'along the %s dimension do not match.' %
(cls.__name__, vdim.name))
stack = np.stack if any(is_dask(arr) for arr in arrays) else da.stack
stack = np.stack if any(is_dask(arr) for arr in arrays) else dask_array_module().stack
new_data[vdim.name] = stack(arrays, -1)
return new_data

Expand Down Expand Up @@ -263,7 +253,7 @@ def canonicalize(cls, dataset, data, data_coords=None, virtual_coords=[]):

# Transpose data
dims = [name for name in data_coords
if isinstance(cls.coords(dataset, name), array_types)]
if isinstance(cls.coords(dataset, name), get_array_types())]
dropped = [dims.index(d) for d in dims
if d not in dataset.kdims+virtual_coords]
if dropped:
Expand Down Expand Up @@ -346,6 +336,7 @@ def values(cls, dataset, dim, expanded=True, flat=True, compute=True):
if dim in dataset.vdims or dataset.data[dim.name].ndim > 1:
data = dataset.data[dim.name]
data = cls.canonicalize(dataset, data)
da = dask_array_module()
if compute and da and isinstance(data, da.Array):
data = data.compute()
return data.T.flatten() if flat else data
Expand Down Expand Up @@ -398,12 +389,12 @@ def groupby(cls, dataset, dim_names, container_type, group_type, **kwargs):
else:
group_data = cls.select(dataset, **select)

if np.isscalar(group_data) or (isinstance(group_data, array_types) and group_data.shape == ()):
if np.isscalar(group_data) or (isinstance(group_data, get_array_types()) and group_data.shape == ()):
group_data = {dataset.vdims[0].name: np.atleast_1d(group_data)}
for dim, v in zip(dim_names, unique_key):
group_data[dim] = np.atleast_1d(v)
elif not drop_dim:
if isinstance(group_data, array_types):
if isinstance(group_data, get_array_types()):
group_data = {dataset.vdims[0].name: group_data}
for vdim in dataset.vdims:
data = group_data[vdim.name]
Expand All @@ -423,7 +414,7 @@ def groupby(cls, dataset, dim_names, container_type, group_type, **kwargs):
def key_select_mask(cls, dataset, values, ind):
if isinstance(ind, tuple):
ind = slice(*ind)
if isinstance(ind, array_types):
if isinstance(ind, get_array_types()):
mask = ind
elif isinstance(ind, slice):
mask = True
Expand Down Expand Up @@ -511,19 +502,22 @@ def select(cls, dataset, selection_mask=None, **selection):

for kdim in dataset.kdims:
if cls.irregular(dataset, dim):
da = dask_array_module()
if da and isinstance(dataset.data[kdim.name], da.Array):
data[kdim.name] = dataset.data[kdim.name].vindex[index]
else:
data[kdim.name] = np.asarray(data[kdim.name])[index]

for vdim in dataset.vdims:
da = dask_array_module()
if da and isinstance(dataset.data[vdim.name], da.Array):
data[vdim.name] = dataset.data[vdim.name].vindex[index]
else:
data[vdim.name] = np.asarray(dataset.data[vdim.name])[index]

if indexed:
if len(dataset.vdims) == 1:
da = dask_array_module()
arr = np.squeeze(data[dataset.vdims[0].name])
if da and isinstance(arr, da.Array):
arr = arr.compute()
Expand Down Expand Up @@ -559,6 +553,7 @@ def sample(cls, dataset, samples=[]):
for d, arr in zip(dimensions, np.meshgrid(*sampled)):
data[d].append(arr)
for vdim, array in zip(dataset.vdims, arrays):
da = dask_array_module()
flat_index = np.ravel_multi_index(tuple(int_inds)[::-1], array.shape)
if da and isinstance(array, da.Array):
data[vdim.name].append(array.flatten().vindex[tuple(flat_index)])
Expand All @@ -574,6 +569,7 @@ def aggregate(cls, dataset, kdims, function, **kwargs):
data = {kdim: dataset.data[kdim] for kdim in kdims}
axes = tuple(dataset.ndims-dataset.get_dimension_index(kdim)-1
for kdim in dataset.kdims if kdim not in kdims)
da = dask_array_module()
for vdim in dataset.vdims:
values = dataset.data[vdim.name]
atleast_1d = da.atleast_1d if is_dask(values) else np.atleast_1d
Expand Down Expand Up @@ -649,6 +645,7 @@ def iloc(cls, dataset, index):
new_data.append(cls.values(dataset, d, compute=False)[rows])

if scalar:
da = dask_array_module()
if new_data and isinstance(new_data[0], da.Array):
return new_data[0].compute()[0]
return new_data[0][0]
Expand All @@ -661,6 +658,8 @@ def range(cls, dataset, dimension):
column = cls.coords(dataset, dimension, expanded=expanded, edges=True)
else:
column = cls.values(dataset, dimension, expanded=False, flat=False)

da = dask_array_module()
if column.dtype.kind == 'M':
dmin, dmax = column.min(), column.max()
if da and isinstance(column, da.Array):
Expand Down
52 changes: 50 additions & 2 deletions holoviews/core/data/interface.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,36 @@
from __future__ import absolute_import

import sys
import warnings

import param
import numpy as np

from .. import util
from ..element import Element
from ..ndmapping import OrderedDict, NdMapping
from .. import util


def get_array_types():
array_types = (np.ndarray,)
if 'dask' in sys.modules:
import dask.array as da
array_types += (da.Array,)
return array_types

def dask_array_module():
try:
import dask.array as da
return da
except:
return None

def is_dask(array):
if 'dask' in sys.modules:
import dask.array as da
else:
return False
return da and isinstance(array, da.Array)


class DataError(ValueError):
Expand Down Expand Up @@ -97,12 +122,32 @@ class Interface(param.Parameterized):

datatype = None

types = ()

# Denotes whether the interface expects gridded data
gridded = False

# Denotes whether the interface expects ragged data
multi = False

@classmethod
def loaded(cls):
"""
Indicates whether the required dependencies are loaded.
"""
return True

@classmethod
def applies(cls, obj):
"""
Indicates whether the interface is designed specifically to
handle the supplied object's type. By default simply checks
if the object is one of the types declared on the class,
however if the type is expensive to import at load time the
method may be overridden.
"""
return any(isinstance(obj, t) for t in cls.types)

@classmethod
def register(cls, interface):
cls.interfaces[interface.datatype] = interface
Expand Down Expand Up @@ -176,14 +221,17 @@ def initialize(cls, eltype, data, kdims, vdims, datatype=None):
# Set interface priority order
prioritized = [cls.interfaces[p] for p in datatype
if p in cls.interfaces]
head = [intfc for intfc in prioritized if type(data) in intfc.types]
head = [intfc for intfc in prioritized if intfc.applies(data)]
if head:
# Prioritize interfaces which have matching types
prioritized = head + [el for el in prioritized if el != head[0]]

# Iterate over interfaces until one can interpret the input
priority_errors = []
for interface in prioritized:
if not interface.loaded() and len(datatype) != 1:
# Skip interface if it is not loaded and was not explicitly requested
continue
try:
(data, dims, extra_kws) = interface.init(eltype, data, kdims, vdims)
break
Expand Down
Loading

0 comments on commit e79e28a

Please sign in to comment.