Skip to content

Commit

Permalink
Use multiple queues
Browse files Browse the repository at this point in the history
Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix

Signed-off-by: Hao Chen <chenh1024@gmail.com>

separate queues

Signed-off-by: Hao Chen <chenh1024@gmail.com>

debug

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix

Signed-off-by: Hao Chen <chenh1024@gmail.com>

debug

Signed-off-by: Hao Chen <chenh1024@gmail.com>

refine

Signed-off-by: Hao Chen <chenh1024@gmail.com>

fix

Signed-off-by: Hao Chen <chenh1024@gmail.com>

Revert "fix"

This reverts commit c63f8b71f150b0dc0add60b2817ce2241abd41ac.

Revert "refine"

This reverts commit 225db8279d128e1d00a359b42a5b7b5b93c57cfb.

fix

Signed-off-by: Hao Chen <chenh1024@gmail.com>
  • Loading branch information
raulchen committed Jan 23, 2024
1 parent abcb083 commit d9aeb87
Showing 1 changed file with 15 additions and 43 deletions.
58 changes: 15 additions & 43 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,13 @@ class DownstreamMemoryInfo:
object_store_memory: float


class RefBundleDeque(deque):
class RefBundleDeque:
"""Thread-safe wrapper around collections.deque that stores current stats."""

def __init__(self):
self._memory_usage = 0
self._num_blocks = 0
self._queues = defaultdict(lambda: deque())
self._lock = threading.Lock()
super().__init__()

Expand All @@ -125,44 +126,27 @@ def num_blocks(self) -> int:
with self._lock:
return self._num_blocks

def append(self, ref: RefBundle):
def __len__(self):
return self.num_blocks

def has_next(self, output_split_idx: Optional[int]=None) -> bool:
with self._lock:
self._memory_usage += ref.size_bytes()
self._num_blocks += len(ref.blocks)
super().append(ref)
return len(self._queues[output_split_idx]) > 0

def appendleft(self, ref: RefBundle):
def append(self, ref: RefBundle):
with self._lock:
self._memory_usage += ref.size_bytes()
self._num_blocks += len(ref.blocks)
super().appendleft(ref)

def pop(self) -> RefBundle:
ref = super().pop()
with self._lock:
self._memory_usage -= ref.size_bytes()
self._num_blocks -= len(ref.blocks)
return ref
idx = ref.output_split_idx
self._queues[idx].append(ref)

def popleft(self) -> RefBundle:
ref = super().popleft()
def pop(self, output_split_idx=None) -> RefBundle:
ref = self._queues[output_split_idx].popleft()
with self._lock:
self._memory_usage -= ref.size_bytes()
self._num_blocks -= len(ref.blocks)
return ref

def remove(self, ref: RefBundle):
super().remove(ref)
with self._lock:
self._memory_usage -= ref.size_bytes()
self._num_blocks -= len(ref.blocks)

def clear(self):
super().clear()
with self._lock:
self._memory_usage = 0
self._num_blocks = 0


class OpState:
"""The execution state tracked for each PhysicalOperator.
Expand Down Expand Up @@ -267,7 +251,7 @@ def dispatch_next_task(self) -> None:
"""Move a bundle from the operator inqueue to the operator itself."""
for i, inqueue in enumerate(self.inqueues):
if inqueue:
self.op.add_input(inqueue.popleft(), input_index=i)
self.op.add_input(inqueue.pop(), input_index=i)
return
assert False, "Nothing to dispatch"

Expand All @@ -285,22 +269,10 @@ def get_output_blocking(self, output_split_idx: Optional[int]) -> RefBundle:
# Check if StreamingExecutor has caught an exception or is done execution.
if self._exception is not None:
raise self._exception
elif self._finished and len(self.outqueue) == 0:
elif self._finished and not self.outqueue.has_next(output_split_idx):
raise StopIteration()
try:
# Non-split output case.
if output_split_idx is None:
return self.outqueue.popleft()

# Scan the queue and look for outputs tagged for the given index.
for i in range(len(self.outqueue)):
bundle = self.outqueue[i]
if bundle.output_split_idx == output_split_idx:
self.outqueue.remove(bundle)
return bundle

# Didn't find any outputs matching this index, repeat the loop until
# we find one or hit a None.
return self.outqueue.pop(output_split_idx)
except IndexError:
pass
time.sleep(0.01)
Expand Down

0 comments on commit d9aeb87

Please sign in to comment.