Skip to content

Commit

Permalink
add complete disk implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mfbalin committed Jul 18, 2024
1 parent 79e4e2f commit 5b2bf55
Showing 1 changed file with 37 additions and 1 deletion.
38 changes: 37 additions & 1 deletion python/dgl/graphbolt/impl/torch_based_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,43 @@ def read_async(self, ids: torch.Tensor = None):
The read feature future.
"""
assert torch.ops.graphbolt.detect_io_uring()
yield self._ondisk_npy_array.index_select(ids)
if ids.is_cuda:
ids_device = ids.device
current_stream = torch.cuda.current_stream()
device_to_host_stream = get_device_to_host_uva_stream()
device_to_host_stream.wait_stream(current_stream)
with torch.cuda.stream(device_to_host_stream):
ids.record_stream(torch.cuda.current_stream())
ids = ids.to(self._tensor.device, non_blocking=True)
ids_copy_event = torch.cuda.Event()
ids_copy_event.record()

yield # first stage is done.

ids_copy_event.synchronize()
values = self._ondisk_npy_array.index_select(ids)
yield

host_to_device_stream = get_device_to_host_uva_stream()
with torch.cuda.stream(host_to_device_stream):
values_cuda = values.wait().to(ids_device, non_blocking=True)
values_cuda.record_stream(current_stream)
values_copy_event = torch.cuda.Event()
values_copy_event.record()

class _Waiter:
@staticmethod
def wait():
values_copy_event.wait()
return values_cuda

yield _Waiter()
else:
yield self._ondisk_npy_array.index_select(ids)

def read_async_num_stages(self, ids_device: torch.device):
"""The number of stages of the read_async operation"""
return 3 if ids_device.type == "cuda" else 1

def size(self):
"""Get the size of the feature.
Expand Down

0 comments on commit 5b2bf55

Please sign in to comment.