diff --git a/datashader/compiler.py b/datashader/compiler.py index cedab4f9f..7d18cab59 100644 --- a/datashader/compiler.py +++ b/datashader/compiler.py @@ -9,6 +9,11 @@ from .reductions import by, category_codes, summary, where from .utils import ngjit +try: + from datashader.transfer_functions._cuda_utils import cuda_mutex_lock, cuda_mutex_unlock +except ImportError: + cuda_mutex_lock, cuda_mutex_unlock = None, None + __all__ = ['compile_components'] @@ -166,21 +171,24 @@ def info(df, canvas_shape): def make_append(bases, cols, calls, glyph, categorical, antialias): names = ('_{0}'.format(i) for i in count()) inputs = list(bases) + list(cols) + namespace = {} any_uses_cuda_mutex = any(call[6] for call in calls) if any_uses_cuda_mutex: # This adds an argument to the append() function that is the cuda mutex # generated in make_info. inputs += ["_cuda_mutex"] + namespace["cuda_mutex_lock"] = cuda_mutex_lock + namespace["cuda_mutex_unlock"] = cuda_mutex_unlock signature = [next(names) for i in inputs] arg_lk = dict(zip(inputs, signature)) local_lk = {} - namespace = {} body = [] ndims = glyph.ndims if ndims is not None: subscript = ', '.join(['i' + str(n) for n in range(ndims)]) else: subscript = None + prev_cuda_mutex = False for func, bases, cols, nan_check_column, temps, _, uses_cuda_mutex in calls: local_lk.update(zip(temps, (next(names) for i in temps))) @@ -202,8 +210,9 @@ def make_append(bases, cols, calls, glyph, categorical, antialias): if antialias: args.append("aa_factor") - if uses_cuda_mutex: - args.append(arg_lk["_cuda_mutex"]) + if uses_cuda_mutex and prev_cuda_mutex: + # Avoid unnecessary mutex unlock and lock cycle + body.pop() where_reduction = len(bases) == 1 and isinstance(bases[0], where) if where_reduction: @@ -212,7 +221,10 @@ def make_append(bases, cols, calls, glyph, categorical, antialias): # where reduction needs access to the return of the contained # reduction, which is the preceding one here. - body[-1] = f'{update_index_arg_name} = {body[-1]}' + prev_body = body.pop() + if uses_cuda_mutex and not prev_cuda_mutex: + body.append(f'cuda_mutex_lock({arg_lk["_cuda_mutex"]}, (y, x))') + body.append(f'{update_index_arg_name} = {prev_body}') # If nan_check_column is defined then need to check if value of # correct row in that column is NaN and if so do nothing. This @@ -227,11 +239,16 @@ def make_append(bases, cols, calls, glyph, categorical, antialias): whitespace = ' ' body.append(f'{whitespace}if {update_index_arg_name} >= 0:') - call = f' {whitespace}{func_name}(x, y, {", ".join(args)})' + body.append(f' {whitespace}{func_name}(x, y, {", ".join(args)})') else: - call = f'{func_name}(x, y, {", ".join(args)})' + if uses_cuda_mutex and not prev_cuda_mutex: + body.append(f'cuda_mutex_lock({arg_lk["_cuda_mutex"]}, (y, x))') + body.append(f'{func_name}(x, y, {", ".join(args)})') + + if uses_cuda_mutex: + body.append(f'cuda_mutex_unlock({arg_lk["_cuda_mutex"]}, (y, x))') - body.append(call) + prev_cuda_mutex = uses_cuda_mutex body = ['{0} = {1}[y, x]'.format(name, arg_lk[agg]) for agg, name in local_lk.items()] + body diff --git a/datashader/reductions.py b/datashader/reductions.py index 530f2398c..23c33853e 100644 --- a/datashader/reductions.py +++ b/datashader/reductions.py @@ -13,10 +13,11 @@ try: from datashader.transfer_functions._cuda_utils import ( - cuda_atomic_nanmin, cuda_atomic_nanmax, cuda_args, cuda_mutex_lock, cuda_mutex_unlock, + cuda_atomic_nanmin, cuda_atomic_nanmax, cuda_args, cuda_nanmax_n_in_place, cuda_nanmin_n_in_place) except ImportError: - cuda_atomic_nanmin, cuda_atomic_nanmmax, cuda_args = None, None, None + (cuda_atomic_nanmin, cuda_atomic_nanmmax, cuda_args, cuda_nanmax_n_in_place, + cuda_nanmin_n_in_place) = None, None, None, None, None try: import cudf @@ -520,12 +521,20 @@ def _append_no_field_antialias_not_self_intersect(x, y, agg, aa_factor): @nb_cuda.jit(device=True) def _append_antialias_cuda(x, y, agg, field, aa_factor): value = field*aa_factor - return 0 if cuda_atomic_nanmax(agg, (y, x), value) != value else -1 + if not isnull(value): + old = cuda_atomic_nanmax(agg, (y, x), value) + if isnull(old) or old < value: + return 0 + return -1 @staticmethod @nb_cuda.jit(device=True) def _append_no_field_antialias_cuda_not_self_intersect(x, y, agg, aa_factor): - return 0 if cuda_atomic_nanmax(agg, (y, x), aa_factor) != aa_factor else -1 + if not isnull(aa_factor): + old = cuda_atomic_nanmax(agg, (y, x), aa_factor) + if isnull(old) or old < aa_factor: + return 0 + return -1 @staticmethod @nb_cuda.jit(device=True) @@ -538,7 +547,11 @@ def _append_cuda(x, y, agg, field): @staticmethod @nb_cuda.jit(device=True) def _append_no_field_antialias_cuda(x, y, agg, aa_factor): - return 0 if cuda_atomic_nanmax(agg, (y, x), aa_factor) != aa_factor else -1 + if not isnull(aa_factor): + old = cuda_atomic_nanmax(agg, (y, x), aa_factor) + if isnull(old) or old < aa_factor: + return 0 + return -1 @staticmethod @nb_cuda.jit(device=True) @@ -1014,7 +1027,11 @@ def _append_antialias(x, y, agg, field, aa_factor): @staticmethod @nb_cuda.jit(device=True) def _append_cuda(x, y, agg, field): - return 0 if cuda_atomic_nanmin(agg, (y, x), field) != field else -1 + if not isnull(field): + old = cuda_atomic_nanmin(agg, (y, x), field) + if isnull(old) or old > field: + return 0 + return -1 @staticmethod def _combine(aggs): @@ -1056,12 +1073,20 @@ def _append_antialias(x, y, agg, field, aa_factor): @nb_cuda.jit(device=True) def _append_antialias_cuda(x, y, agg, field, aa_factor): value = field*aa_factor - return 0 if cuda_atomic_nanmax(agg, (y, x), value) != value else -1 + if not isnull(value): + old = cuda_atomic_nanmax(agg, (y, x), value) + if isnull(old) or old < value: + return 0 + return -1 @staticmethod @nb_cuda.jit(device=True) def _append_cuda(x, y, agg, field): - return 0 if cuda_atomic_nanmax(agg, (y, x), field) != field else -1 + if not isnull(field): + old = cuda_atomic_nanmax(agg, (y, x), field) + if isnull(old) or old < field: + return 0 + return -1 @staticmethod def _combine(aggs): @@ -1422,23 +1447,18 @@ def _append(x, y, agg, field): # GPU append functions @staticmethod @nb_cuda.jit(device=True) - def _append_cuda(x, y, agg, field, mutex): + def _append_cuda(x, y, agg, field): if not isnull(field): # Linear walk along stored values. # Could do binary search instead but not expecting n to be large. n = agg.shape[2] - index = (y, x) - cuda_mutex_lock(mutex, index) for i in range(n): if isnull(agg[y, x, i]) or field > agg[y, x, i]: # Bump previous values along to make room for new value. for j in range(n-1, i, -1): agg[y, x, j] = agg[y, x, j-1] agg[y, x, i] = field - - cuda_mutex_unlock(mutex, index) return i - cuda_mutex_unlock(mutex, index) return -1 def _build_combine(self, dshape, antialias, cuda, partitioned): @@ -1493,23 +1513,18 @@ def _append(x, y, agg, field): # GPU append functions @staticmethod @nb_cuda.jit(device=True) - def _append_cuda(x, y, agg, field, mutex): + def _append_cuda(x, y, agg, field): if not isnull(field): # Linear walk along stored values. # Could do binary search instead but not expecting n to be large. n = agg.shape[2] - index = (y, x) - cuda_mutex_lock(mutex, index) for i in range(n): if isnull(agg[y, x, i]) or field < agg[y, x, i]: # Bump previous values along to make room for new value. for j in range(n-1, i, -1): agg[y, x, j] = agg[y, x, j-1] agg[y, x, i] = field - - cuda_mutex_unlock(mutex, index) return i - cuda_mutex_unlock(mutex, index) return -1 def _build_combine(self, dshape, antialias, cuda, partitioned): @@ -1622,7 +1637,7 @@ def out_dshape(self, input_dshape, antialias, cuda, partitioned): return dshape(ct.float64) def uses_cuda_mutex(self): - return self.selector.uses_cuda_mutex() + return True def uses_row_index(self, cuda, partitioned): return self.column is None or isinstance(self.selector, (_first_or_last, _first_n_or_last_n)) @@ -1702,7 +1717,7 @@ def _build_bases(self, cuda, partitioned): return selector._build_bases(cuda, partitioned) + super()._build_bases(cuda, partitioned) def _build_combine(self, dshape, antialias, cuda, partitioned): - if cuda and self.uses_cuda_mutex(): + if cuda and self.selector.uses_cuda_mutex(): raise NotImplementedError( "'where' reduction does not support a selector that uses a CUDA mutex such as 'max_n'")