-
Notifications
You must be signed in to change notification settings - Fork 195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add join
method to DeviceBuffer
#1035
Conversation
2714e50
to
063f2da
Compare
cdef uintptr_t sp = <uintptr_t>self.c_data() | ||
cdef size_t sdbs = self.c_size() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lots of very short and undescriptive variable names in this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely could use clearer and more canonical names like sp->sep
, sdbs->sep_bytes
, etc.
for i in range(N - 1): | ||
db = L[i] | ||
dp = <uintptr_t>db.c_data() | ||
dbs = db.c_size() | ||
copy_device_to_ptr(dp, rp + offset, dbs, stream) | ||
offset += dbs | ||
if sdbs > 0: | ||
copy_device_to_ptr(sp, rp + offset, sdbs, stream) | ||
offset += sdbs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a comment explaining this loop. The variable names don't help clarify it.
@jakirkham could you please provide a bit more context here? Is this to enable a specific feature or algorithm? |
db = L[N - 1] | ||
dp = <uintptr_t>db.c_data() | ||
dbs = db.c_size() | ||
copy_device_to_ptr(dp, rp + offset, dbs, stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can't we merge these lines into the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we would just need to change 330 from if sdbs > 0
to if sdbs > 0 and i != N-1
and update the for loop on 324 to use range(N)
.
In Distributed serialization (part of spilling and communication), we often split and/or join frames as part of this process. ATM this just happens with host side serialization ("pickle" or "dask"). However we may want to extend this to CUDA-based serialization to allow for compression/decompression, consolidating many small frames into one larger one (like before sending), etc. On the host side this is done with syntax like This has also come up before primarily in issue ( rapidsai/cudf#9726 ). Though there is also overlap from ancillary issues ( rapidsai/ucx-py#478 ) ( rapidsai/dask-cuda#760 ). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I first read the description of this PR I was really expecting something more like a classmethod
factory that just concatenated buffers together, I did not expect this to be a method using self
as the separator for the purpose of concatenation. I see where the API is coming from though. It feels a little out of scope for RMM to me, but I also don't really see a better place for this feature to exist, so I'm OK with pushing forward if nobody else objects. The code needs some better variable naming/comments, but otherwise mostly looks fine.
L : ``list`` of ``DeviceBuffer``s | ||
stream : CUDA stream to use for copying, default the default stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L : ``list`` of ``DeviceBuffer``s | |
stream : CUDA stream to use for copying, default the default stream | |
L : list | |
The ``DeviceBuffer``s to concatenate. | |
stream : Stream | |
The stream to use for copying. Defaults to the default stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also name the parameter buffers
instead?
@@ -285,6 +285,58 @@ cdef class DeviceBuffer: | |||
|
|||
return b | |||
|
|||
cpdef DeviceBuffer join(self, list L, Stream stream=DEFAULT_STREAM): | |||
"""Joins a sequence of ``DeviceBuffer``s with ``self`` inbetween. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"""Joins a sequence of ``DeviceBuffer``s with ``self`` inbetween. | |
"""Joins a sequence of ``DeviceBuffer``s with ``self`` in between. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be clearer if you actually copied more of the docstring from help(b''.join)
. Specifically, it's not obvious that "in between" means that self
is inserted between every consecutive element. Use of the word "separator" to describe self
might also help.
cdef uintptr_t sp = <uintptr_t>self.c_data() | ||
cdef size_t sdbs = self.c_size() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely could use clearer and more canonical names like sp->sep
, sdbs->sep_bytes
, etc.
db = L[N - 1] | ||
dp = <uintptr_t>db.c_data() | ||
dbs = db.c_size() | ||
copy_device_to_ptr(dp, rp + offset, dbs, stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we would just need to change 330 from if sdbs > 0
to if sdbs > 0 and i != N-1
and update the for loop on 324 to use range(N)
.
cdef DeviceBuffer rdb = DeviceBuffer(size=s, stream=stream) | ||
cdef uintptr_t rp = <uintptr_t>rdb.c_data() | ||
|
||
cdef uintptr_t dp, offset = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why maintain an offset when you can just increment rp
directly?
Moving to 22.08 |
This PR has been labeled |
This PR has been labeled |
@jakirkham @shwina @vyasr so what do you think, is this out of scope for RMM? Is this PR dead? |
Going to close for now. Can reopen later as needed. |
Adds
join
method toDeviceBuffer
analogous tobytes.join(...)
. Provides a simple way to concatenate buffers together into a single one.