Skip to content

Commit

Permalink
Wrap use of cuda mutex in where reductions
Browse files Browse the repository at this point in the history
  • Loading branch information
ianthomas23 committed May 18, 2023
1 parent 7a97e97 commit c6ad0e4
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 29 deletions.
31 changes: 24 additions & 7 deletions datashader/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
from .reductions import by, category_codes, summary, where
from .utils import ngjit

try:
from datashader.transfer_functions._cuda_utils import cuda_mutex_lock, cuda_mutex_unlock
except ImportError:
cuda_mutex_lock, cuda_mutex_unlock = None, None


__all__ = ['compile_components']

Expand Down Expand Up @@ -166,21 +171,24 @@ def info(df, canvas_shape):
def make_append(bases, cols, calls, glyph, categorical, antialias):
names = ('_{0}'.format(i) for i in count())
inputs = list(bases) + list(cols)
namespace = {}
any_uses_cuda_mutex = any(call[6] for call in calls)
if any_uses_cuda_mutex:
# This adds an argument to the append() function that is the cuda mutex
# generated in make_info.
inputs += ["_cuda_mutex"]
namespace["cuda_mutex_lock"] = cuda_mutex_lock
namespace["cuda_mutex_unlock"] = cuda_mutex_unlock
signature = [next(names) for i in inputs]
arg_lk = dict(zip(inputs, signature))
local_lk = {}
namespace = {}
body = []
ndims = glyph.ndims
if ndims is not None:
subscript = ', '.join(['i' + str(n) for n in range(ndims)])
else:
subscript = None
prev_cuda_mutex = False

for func, bases, cols, nan_check_column, temps, _, uses_cuda_mutex in calls:
local_lk.update(zip(temps, (next(names) for i in temps)))
Expand All @@ -202,8 +210,9 @@ def make_append(bases, cols, calls, glyph, categorical, antialias):
if antialias:
args.append("aa_factor")

if uses_cuda_mutex:
args.append(arg_lk["_cuda_mutex"])
if uses_cuda_mutex and prev_cuda_mutex:
# Avoid unnecessary mutex unlock and lock cycle
body.pop()

where_reduction = len(bases) == 1 and isinstance(bases[0], where)
if where_reduction:
Expand All @@ -212,7 +221,10 @@ def make_append(bases, cols, calls, glyph, categorical, antialias):

# where reduction needs access to the return of the contained
# reduction, which is the preceding one here.
body[-1] = f'{update_index_arg_name} = {body[-1]}'
prev_body = body.pop()
if uses_cuda_mutex and not prev_cuda_mutex:
body.append(f'cuda_mutex_lock({arg_lk["_cuda_mutex"]}, (y, x))')
body.append(f'{update_index_arg_name} = {prev_body}')

# If nan_check_column is defined then need to check if value of
# correct row in that column is NaN and if so do nothing. This
Expand All @@ -227,11 +239,16 @@ def make_append(bases, cols, calls, glyph, categorical, antialias):
whitespace = ' '

body.append(f'{whitespace}if {update_index_arg_name} >= 0:')
call = f' {whitespace}{func_name}(x, y, {", ".join(args)})'
body.append(f' {whitespace}{func_name}(x, y, {", ".join(args)})')
else:
call = f'{func_name}(x, y, {", ".join(args)})'
if uses_cuda_mutex and not prev_cuda_mutex:
body.append(f'cuda_mutex_lock({arg_lk["_cuda_mutex"]}, (y, x))')
body.append(f'{func_name}(x, y, {", ".join(args)})')

if uses_cuda_mutex:
body.append(f'cuda_mutex_unlock({arg_lk["_cuda_mutex"]}, (y, x))')

body.append(call)
prev_cuda_mutex = uses_cuda_mutex

body = ['{0} = {1}[y, x]'.format(name, arg_lk[agg])
for agg, name in local_lk.items()] + body
Expand Down
59 changes: 37 additions & 22 deletions datashader/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

try:
from datashader.transfer_functions._cuda_utils import (
cuda_atomic_nanmin, cuda_atomic_nanmax, cuda_args, cuda_mutex_lock, cuda_mutex_unlock,
cuda_atomic_nanmin, cuda_atomic_nanmax, cuda_args,
cuda_nanmax_n_in_place, cuda_nanmin_n_in_place)
except ImportError:
cuda_atomic_nanmin, cuda_atomic_nanmmax, cuda_args = None, None, None
(cuda_atomic_nanmin, cuda_atomic_nanmmax, cuda_args, cuda_nanmax_n_in_place,
cuda_nanmin_n_in_place) = None, None, None, None, None

try:
import cudf
Expand Down Expand Up @@ -520,12 +521,20 @@ def _append_no_field_antialias_not_self_intersect(x, y, agg, aa_factor):
@nb_cuda.jit(device=True)
def _append_antialias_cuda(x, y, agg, field, aa_factor):
value = field*aa_factor
return 0 if cuda_atomic_nanmax(agg, (y, x), value) != value else -1
if not isnull(value):
old = cuda_atomic_nanmax(agg, (y, x), value)
if isnull(old) or old < value:
return 0
return -1

@staticmethod
@nb_cuda.jit(device=True)
def _append_no_field_antialias_cuda_not_self_intersect(x, y, agg, aa_factor):
return 0 if cuda_atomic_nanmax(agg, (y, x), aa_factor) != aa_factor else -1
if not isnull(aa_factor):
old = cuda_atomic_nanmax(agg, (y, x), aa_factor)
if isnull(old) or old < aa_factor:
return 0
return -1

@staticmethod
@nb_cuda.jit(device=True)
Expand All @@ -538,7 +547,11 @@ def _append_cuda(x, y, agg, field):
@staticmethod
@nb_cuda.jit(device=True)
def _append_no_field_antialias_cuda(x, y, agg, aa_factor):
return 0 if cuda_atomic_nanmax(agg, (y, x), aa_factor) != aa_factor else -1
if not isnull(aa_factor):
old = cuda_atomic_nanmax(agg, (y, x), aa_factor)
if isnull(old) or old < aa_factor:
return 0
return -1

@staticmethod
@nb_cuda.jit(device=True)
Expand Down Expand Up @@ -1014,7 +1027,11 @@ def _append_antialias(x, y, agg, field, aa_factor):
@staticmethod
@nb_cuda.jit(device=True)
def _append_cuda(x, y, agg, field):
return 0 if cuda_atomic_nanmin(agg, (y, x), field) != field else -1
if not isnull(field):
old = cuda_atomic_nanmin(agg, (y, x), field)
if isnull(old) or old > field:
return 0
return -1

@staticmethod
def _combine(aggs):
Expand Down Expand Up @@ -1056,12 +1073,20 @@ def _append_antialias(x, y, agg, field, aa_factor):
@nb_cuda.jit(device=True)
def _append_antialias_cuda(x, y, agg, field, aa_factor):
value = field*aa_factor
return 0 if cuda_atomic_nanmax(agg, (y, x), value) != value else -1
if not isnull(value):
old = cuda_atomic_nanmax(agg, (y, x), value)
if isnull(old) or old < value:
return 0
return -1

@staticmethod
@nb_cuda.jit(device=True)
def _append_cuda(x, y, agg, field):
return 0 if cuda_atomic_nanmax(agg, (y, x), field) != field else -1
if not isnull(field):
old = cuda_atomic_nanmax(agg, (y, x), field)
if isnull(old) or old < field:
return 0
return -1

@staticmethod
def _combine(aggs):
Expand Down Expand Up @@ -1422,23 +1447,18 @@ def _append(x, y, agg, field):
# GPU append functions
@staticmethod
@nb_cuda.jit(device=True)
def _append_cuda(x, y, agg, field, mutex):
def _append_cuda(x, y, agg, field):
if not isnull(field):
# Linear walk along stored values.
# Could do binary search instead but not expecting n to be large.
n = agg.shape[2]
index = (y, x)
cuda_mutex_lock(mutex, index)
for i in range(n):
if isnull(agg[y, x, i]) or field > agg[y, x, i]:
# Bump previous values along to make room for new value.
for j in range(n-1, i, -1):
agg[y, x, j] = agg[y, x, j-1]
agg[y, x, i] = field

cuda_mutex_unlock(mutex, index)
return i
cuda_mutex_unlock(mutex, index)
return -1

def _build_combine(self, dshape, antialias, cuda, partitioned):
Expand Down Expand Up @@ -1493,23 +1513,18 @@ def _append(x, y, agg, field):
# GPU append functions
@staticmethod
@nb_cuda.jit(device=True)
def _append_cuda(x, y, agg, field, mutex):
def _append_cuda(x, y, agg, field):
if not isnull(field):
# Linear walk along stored values.
# Could do binary search instead but not expecting n to be large.
n = agg.shape[2]
index = (y, x)
cuda_mutex_lock(mutex, index)
for i in range(n):
if isnull(agg[y, x, i]) or field < agg[y, x, i]:
# Bump previous values along to make room for new value.
for j in range(n-1, i, -1):
agg[y, x, j] = agg[y, x, j-1]
agg[y, x, i] = field

cuda_mutex_unlock(mutex, index)
return i
cuda_mutex_unlock(mutex, index)
return -1

def _build_combine(self, dshape, antialias, cuda, partitioned):
Expand Down Expand Up @@ -1622,7 +1637,7 @@ def out_dshape(self, input_dshape, antialias, cuda, partitioned):
return dshape(ct.float64)

def uses_cuda_mutex(self):
return self.selector.uses_cuda_mutex()
return True

def uses_row_index(self, cuda, partitioned):
return self.column is None or isinstance(self.selector, (_first_or_last, _first_n_or_last_n))
Expand Down Expand Up @@ -1702,7 +1717,7 @@ def _build_bases(self, cuda, partitioned):
return selector._build_bases(cuda, partitioned) + super()._build_bases(cuda, partitioned)

def _build_combine(self, dshape, antialias, cuda, partitioned):
if cuda and self.uses_cuda_mutex():
if cuda and self.selector.uses_cuda_mutex():
raise NotImplementedError(
"'where' reduction does not support a selector that uses a CUDA mutex such as 'max_n'")

Expand Down

0 comments on commit c6ad0e4

Please sign in to comment.