Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Ray shuffle experiment #2883

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions daft/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ def runner(self) -> Runner:
with self._lock:
return self._get_runner()

def shuffle_service_factory(self):
from daft.runners.ray_runner import RayShuffleServiceFactory

return RayShuffleServiceFactory()

@property
def daft_execution_config(self) -> PyDaftExecutionConfig:
with self._lock:
Expand Down
191 changes: 190 additions & 1 deletion daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties

from daft.daft import FileFormat, IOConfig, JoinType
from daft.daft import FileFormat, IOConfig, JoinType, PyExpr
from daft.logical.schema import Schema
from daft.runners.partitioning import PartialPartitionMetadata


# A PhysicalPlan that is still being built - may yield both PartitionTaskBuilders and PartitionTasks.
Expand Down Expand Up @@ -1615,6 +1616,194 @@ def fanout_random(child_plan: InProgressPhysicalPlan[PartitionT], num_partitions
seed += 1


def fully_materializing_push_exchange_op(
child_plan: InProgressPhysicalPlan[PartitionT], partition_by: list[PyExpr], num_partitions: int
) -> InProgressPhysicalPlan[PartitionT]:
from daft.expressions import Expression

# Step 1: Naively materialize all child partitions
stage_id_children = next(stage_id_counter)
materialized_partitions: list[SingleOutputPartitionTask] = []
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
task = step.finalize_partition_task_single_output(stage_id=stage_id_children)
materialized_partitions.append(task)
yield task
elif isinstance(step, PartitionTask):
yield step
elif step is None:
yield None
else:
yield step

# Step 2: Wait for all partitions to be done
while any(not p.done() for p in materialized_partitions):
yield None

with get_context().shuffle_service_factory().push_based_shuffle_service_context(
num_partitions, partition_by=ExpressionsProjection([Expression._from_pyexpr(e) for e in partition_by])
) as shuffle_service:
results = shuffle_service.run([p.partition() for p in materialized_partitions])

for reduced_data in results:
reduce_task = PartitionTaskBuilder(
inputs=[reduced_data],
partial_metadatas=None,
resource_request=ResourceRequest(),
)
yield reduce_task


def fully_materializing_exchange_op(
child_plan: InProgressPhysicalPlan[PartitionT], partition_by: list[PyExpr], num_partitions: int
) -> InProgressPhysicalPlan[PartitionT]:
from daft.expressions import Expression

# Step 1: Naively materialize all child partitions
stage_id_children = next(stage_id_counter)
materialized_partitions: list[SingleOutputPartitionTask] = []
for step in child_plan:
if isinstance(step, PartitionTaskBuilder):
task = step.finalize_partition_task_single_output(stage_id=stage_id_children)
materialized_partitions.append(task)
yield task
elif isinstance(step, PartitionTask):
yield step
elif step is None:
yield None
else:
yield step

# Step 2: Wait for all partitions to be done
while any(not p.done() for p in materialized_partitions):
yield None

# Step 3: Yield the map tasks
stage_id_map_tasks = next(stage_id_counter)
materialized_map_partitions: list[MultiOutputPartitionTask] = []
while materialized_partitions:
materialized_child_partition = materialized_partitions.pop(0)
map_task = (
PartitionTaskBuilder(
inputs=[materialized_child_partition.partition()],
partial_metadatas=materialized_child_partition.partial_metadatas,
resource_request=ResourceRequest(),
)
.add_instruction(
execution_step.FanoutHash(
_num_outputs=num_partitions,
partition_by=ExpressionsProjection([Expression._from_pyexpr(expr) for expr in partition_by]),
),
ResourceRequest(),
)
.finalize_partition_task_multi_output(stage_id=stage_id_map_tasks)
)
materialized_map_partitions.append(map_task)
yield map_task

# Step 4: Wait on all the map tasks to complete
while any(not p.done() for p in materialized_map_partitions):
yield None

# Step 5: "Transpose the results" and run reduce tasks
transposed_results: list[list[tuple[PartitionT, PartialPartitionMetadata]]] = [[] for _ in range(num_partitions)]
for map_task in materialized_map_partitions:
partitions = map_task.partitions()
partition_metadatas = map_task.partial_metadatas
for i, (partition, meta) in enumerate(zip(partitions, partition_metadatas)):
transposed_results[i].append((partition, meta))

for i, partitions in enumerate(transposed_results):
reduce_task = PartitionTaskBuilder(
inputs=[p for p, _ in partitions],
partial_metadatas=[m for _, m in partitions],
resource_request=ResourceRequest(),
).add_instruction(
instruction=execution_step.ReduceMerge(),
)
yield reduce_task


# This was the complicated one...
#
# def fully_materializing_exchange_op(
# child_plan: InProgressPhysicalPlan[PartitionT], partition_by: list[PyExpr], num_partitions: int
# ) -> InProgressPhysicalPlan[PartitionT]:
# from daft.execution.physical_plan_shuffles import HashPartitionRequest

# prior_stage_id = next(stage_id_counter)

# # Yield children stuff and avoid creating the shuffle service until we start running
# # tasks in the stage directly prior to this
# child_task = None
# while child_task is None:
# step = next(child_plan)
# if step is None:
# yield None
# continue
# elif isinstance(step, PartitionTask):
# yield step
# continue
# else:
# assert isinstance(step, PartitionTaskBuilder)
# child_task = step.finalize_partition_task_single_output(prior_stage_id)
# break

# materializations: deque[SingleOutputPartitionTask] = deque()
# materializations.append(child_task)
# yield child_task

# MAX_NUM_CHILD_INFLIGHT_TASKS_BEFORE_INGESTION = 128

# # Create the shuffle service and start materializing children and sending data to the service
# with get_context().shuffle_service_factory().fully_materializing_shuffle_service_context(
# num_partitions,
# [c.name() for c in partition_by], # TODO: Assume no-op here for now, YOLO!
# ) as shuffle_service:
# child_plan_exhausted = False
# while not child_plan_exhausted or len(materializations) > 0:
# # Ingest as many materialized results as possible
# materialized: list[SingleOutputPartitionTask] = []
# while len(materializations) > 0 and materializations[0].done():
# materialized.append(materializations.popleft())
# results = [done_task.result() for done_task in materialized]
# if len(results) > 0:
# _ingest_results = shuffle_service.ingest([r.partition for r in results])

# # Keep pulling steps from children until either:
# # 1. We hit `MAX_NUM_CHILD_INFLIGHT_TASKS_BEFORE_INGESTION` and want to chill a bit to ingest the data
# # 2. We exhaust the child plan
# while len(materializations) < MAX_NUM_CHILD_INFLIGHT_TASKS_BEFORE_INGESTION:
# try:
# step = next(child_plan)
# except StopIteration:
# child_plan_exhausted = True
# break
# if step is None:
# yield step
# elif isinstance(step, PartitionTask):
# yield step
# else:
# assert isinstance(step, PartitionTaskBuilder)
# child_task = step.finalize_partition_task_single_output(prior_stage_id)
# materializations.append(child_task)
# yield child_task

# # Read from the shuffle service in chunks of 1GB
# MAX_SIZE_BYTES = 1024 * 1024 * 1024
# partition_requests = [
# list(shuffle_service.read(HashPartitionRequest(type_="hash", bucket=i), MAX_SIZE_BYTES))
# for i in range(num_partitions)
# ]

# for partition_chunks in partition_requests:
# yield PartitionTaskBuilder[PartitionT](
# inputs=partition_chunks,
# partial_metadatas=None,
# resource_request=ResourceRequest(),
# )


def _best_effort_next_step(
stage_id: int, child_plan: InProgressPhysicalPlan[PartitionT]
) -> tuple[PartitionTask[PartitionT] | None, bool]:
Expand Down
119 changes: 119 additions & 0 deletions daft/execution/physical_plan_shuffles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from __future__ import annotations

import dataclasses
from typing import Any, Iterator, Literal, Protocol, TypeVar

ShuffleData = TypeVar("ShuffleData")
IngestResult = TypeVar("IngestResult")


@dataclasses.dataclass(frozen=True)
class PartitioningSpec:
type_: Literal["hash"] | Literal["range"]

def to_hash_pspec(self) -> HashPartitioningSpec:
assert self.type_ == "hash" and isinstance(self, HashPartitioningSpec)
return self

def to_range_pspec(self) -> RangePartitioningSpec:
assert self.type_ == "range" and isinstance(self, RangePartitioningSpec)
return self


@dataclasses.dataclass(frozen=True)
class HashPartitioningSpec(PartitioningSpec):
num_partitions: int
columns: list[str]


@dataclasses.dataclass(frozen=True)
class RangePartitioningSpec(PartitioningSpec):
boundaries: list[Any]
columns: list[str]


@dataclasses.dataclass(frozen=True)
class PartitionRequest:
type_: Literal["hash"] | Literal["range"]

def to_hash_request(self) -> HashPartitionRequest:
assert self.type_ == "hash" and isinstance(self, HashPartitionRequest)
return self

def to_range_request(self) -> RangePartitionRequest:
assert self.type_ == "range" and isinstance(self, RangePartitionRequest)
return self


@dataclasses.dataclass(frozen=True)
class HashPartitionRequest(PartitionRequest):
bucket: int


@dataclasses.dataclass(frozen=True)
class RangePartitionRequest(PartitionRequest):
start_end_values: list[tuple[Any, Any]]


class ShuffleServiceInterface(Protocol[ShuffleData, IngestResult]):
"""An interface to a ShuffleService

The job of a shuffle service is to `.ingest` results from the previous stage, perform partitioning on the data,
and then expose a `.read` to consumers of the "shuffled" data.

NOTE: `.read` should throw an error before the ShuffleService is informed of the target partitioning. This
is because the ShuffleService needs to know how to partition the data before it can emit results.

See BigQuery/Dremel video from CMU: https://www.youtube.com/watch?v=JxeITDS-xh0&ab_channel=CMUDatabaseGroup
"""

def teardown(self) -> None: ...

###
# INGESTION:
# These endpoints allow the ShuffleService to ingest data from the previous stage of the query
###

def ingest(self, data: Iterator[ShuffleData]) -> list[IngestResult]:
"""Receive some data.

NOTE: This will throw an error if called after `.close_ingest` has been called.
"""
...

def set_input_stage_completed(self) -> None:
"""Inform the ShuffleService that all data from the previous stage has been ingested"""
...

def is_input_stage_completed(self) -> bool:
"""Query whether or not the previous stage has completed ingestion"""
...

###
# READ:
# These endpoints allow clients to request data from the ShuffleService
###

def read(self, request: PartitionRequest, max_size_bytes: int) -> Iterator[ShuffleData]:
"""Retrieves ShuffleData from the shuffle service for the specified partition.

This returns an iterator of ShuffleData

When all data is guaranteed to be exhausted for the given request, the iterator will raise
a StopIteration.
"""
...

# TODO: Dynamic Partitioning
#
# We could have the ShuffleService expose running statistics (as data is being collected)
# so that the coordinator can dynamically decide on an appropriate output partitioning scheme
# before it attempts to request for output ShuffleData
#
# def get_current_statistics(self) -> ShuffleStatistics:
# """Retrieves the current statistics from the ShuffleService's currently ingested data"""
# ...
#
# def set_output_partitioning(self, spec: PartitioningSpec) -> None:
# """Sets the intended output partitioning scheme that should be emitted"""
# ...
6 changes: 6 additions & 0 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,12 @@ def actor_pool_context(
self._actor_pools[actor_pool_id].teardown()
del self._actor_pools[actor_pool_id]

@contextlib.contextmanager
def shuffle_service_context(
self,
) -> Iterator[str]:
raise NotImplementedError("shuffle_service_context not yet implemented in PyRunner")

def _physical_plan_to_partitions(
self,
execution_id: str,
Expand Down
Loading
Loading