Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Task class instead of tuple #8797

Merged
merged 23 commits into from
Oct 24, 2024
Merged

Use Task class instead of tuple #8797

merged 23 commits into from
Oct 24, 2024

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Jul 24, 2024

This is an early version that will close dask/dask#9969

It introduces a new Task class (name is subject to change) and a couple of other related subclasses that should replace the tuple as a representation of runnable tasks.

The benefits of this are outlined in dask/dask#9969 but are primarily focused to reduce overhead during serialization and parsing of results. An important result is also that we can trivially cache functions (and arguments if we wish) to avoid problems like #8767 where users are erroneously providing expensive to pickle functions (which also happens frequently in our own code and/or downstream projects like xarray)

This approach allows us to convert the legacy dsk graph to the new representation with full backwards compatibility. Old graphs can be migrated and new ones written directly using this new representation which will ultimately reduce overhead.

I will follow up with measurements shortly.

Sibling PR in dask dask/dask#11248

Comment on lines 942 to 938
from dask.task_spec import Task

dsk.update(
{
key: (apply, self.func, (tuple, list(args)), kwargs2)
key: Task(key, self.func, args, kwargs2)
# (apply, self.func, (tuple, list(args)), kwargs2)
Copy link
Member Author

@fjetter fjetter Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't necessary but an example of how this migration would look like

Comment on lines 775 to 793
prefix_name = ts.prefix.name
count = self.task_prefix_count[prefix_name] - 1
tp_count = self.task_prefix_count
tp_count_global = self.scheduler._task_prefix_count_global
if count:
self.task_prefix_count[ts.prefix.name] = count
tp_count[prefix_name] = count
else:
del self.task_prefix_count[ts.prefix.name]
del tp_count[prefix_name]

count = self.scheduler._task_prefix_count_global[ts.prefix.name] - 1
count = tp_count_global[prefix_name] - 1
if count:
self.scheduler._task_prefix_count_global[ts.prefix.name] = count
tp_count_global[prefix_name] = count
else:
del self.scheduler._task_prefix_count_global[ts.prefix.name]
del tp_count_global[prefix_name]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an unrelated perf fix

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, how noticeable is this? Apart from that, let's move this to a separate PR to keep this focused on the major change you introduce here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dsk = convert_old_style_dsk(dsk)
# TODO: This isn't working yet as expected
dependencies = dict(DependenciesMapping(dsk))

return dsk, dependencies, annotations_by_type
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

most/all of this complexity is now either gone entirely or hidden in the class

Copy link
Contributor

github-actions bot commented Jul 24, 2024

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    25 files  ±  0      25 suites  ±0   10h 21m 22s ⏱️ + 1m 27s
 4 123 tests  -   9   4 006 ✅  -   9    110 💤 ±0   7 ❌ +1 
47 622 runs   - 109  45 521 ✅  - 108  2 087 💤  - 8  14 ❌ +8 

For more details on these failures, see this check.

Results for commit 65ed5d5. ± Comparison against base commit 48509b3.

This pull request removes 11 and adds 2 tests. Note that renamed tests count towards both.
distributed.tests.test_client ‑ test_persist_get
distributed.tests.test_client ‑ test_recreate_error_array
distributed.tests.test_client ‑ test_recreate_error_collection
distributed.tests.test_client ‑ test_recreate_error_delayed
distributed.tests.test_client ‑ test_recreate_error_futures
distributed.tests.test_client ‑ test_recreate_task_array
distributed.tests.test_client ‑ test_recreate_task_collection
distributed.tests.test_client ‑ test_recreate_task_delayed
distributed.tests.test_client ‑ test_recreate_task_futures
distributed.tests.test_utils ‑ test_maybe_complex
…
distributed.tests.test_client ‑ test_persist_get[False]
distributed.tests.test_client ‑ test_persist_get[True]

♻️ This comment has been updated with latest results.

Copy link
Member

@jacobtomlinson jacobtomlinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be implications for some of the dashboard components, the "pew pew pew" plot comes to mind. I see this is still a draft, let me know when it's in a reviewable state and I'll look over the dashboard code to see if anything needs changing there 🙂.

@fjetter
Copy link
Member Author

fjetter commented Jul 24, 2024

There may be implications for some of the dashboard components, the "pew pew pew" plot comes to mind. I see this is still a draft, let me know when it's in a reviewable state and I'll look over the dashboard code to see if anything needs changing there 🙂.

I'd actually be surprised if that was affected since we don't change the scheduler internal metadata (like dependencies, transfers, where the tasks are executed...). But who knows. I'll probably stumble over 50 small weird things trying to get CI green :)

@fjetter fjetter force-pushed the support_task_spec branch from 99a2ec5 to bb0324a Compare July 29, 2024 13:23
@hendrikmakait hendrikmakait self-requested a review July 30, 2024 14:52
@fjetter fjetter force-pushed the support_task_spec branch 3 times, most recently from b01c705 to bb7d38e Compare August 7, 2024 12:56
@fjetter fjetter force-pushed the support_task_spec branch 2 times, most recently from 507fcb6 to 45b42a5 Compare August 9, 2024 14:05
@fjetter fjetter force-pushed the support_task_spec branch 3 times, most recently from eb30895 to 2ecaa5c Compare August 23, 2024 16:51
Comment on lines 4920 to 4988
task_state_created = time()
metrics.update(
{
"start": start,
"duration_materialization": materialization_done - start,
"duration_ordering": materialization_done - ordering_done,
"duration_state_initialization": ordering_done - task_state_created,
"duration_total": task_state_created - start,
}
)
evt_msg = {
"action": "update-graph",
"stimulus_id": stimulus_id,
"metrics": metrics,
"status": "OK",
}
self.log_event(["all", client, "update-graph"], evt_msg)
logger.debug("Task state created. %i new tasks", len(self.tasks) - before)
except Exception as e:
evt_msg = {
"action": "update-graph",
"stimulus_id": stimulus_id,
"status": "error",
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unrelated change but it shouldn't be too disruptive for the review

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to log (partial) metrics on exception? Also, should we add the exception to the log event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is very useful and I don't want to add the exception to this. This is primarily supposed to be a stream of metrics and I don't like adding Exception objects or large strings to it. I also find the logging and proper exception handling mechanism should be sufficient.

If this is a contentious topic I will remove this change from the PR

Comment on lines 4877 to 4881
logger.debug("Materialization done. Got %i tasks.", len(dsk))
dependents = reverse_dict(dependencies)
dsk = resolve_aliases(dsk, keys, dependents)
dependencies = dict(DependenciesMapping(dsk))
logger.debug("Removing aliases. %i left", len(dsk))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new... it removes all those

"key": "other-key"

references that we currently schedule as real tasks. Particularly fusion adds these kinds of redirects. I should probably factor this out to a dedicated PR. The impact can be quite substantial in graph size reduction.

@fjetter fjetter marked this pull request as ready for review August 23, 2024 17:31
@fjetter fjetter force-pushed the support_task_spec branch from 919ff4f to 9974bb0 Compare August 26, 2024 11:56
@hendrikmakait hendrikmakait self-requested a review August 26, 2024 12:50
Comment on lines +9413 to +9460
# FIXME: There should be no need to fully materialize and copy this but some
# sections in the scheduler are mutating it.
dependencies = {k: set(v) for k, v in DependenciesMapping(dsk3).items()}
return dsk3, dependencies, annotations_by_type
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #8842

Copy link
Member Author

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend reviewers to start with the dask/dask PR.

This PR does not include (m)any intentional changes other than adjusting the code to the new spec. There are one or two things that change behavior, I flagged them explicitly

Comment on lines +3189 to +3192
prefix = ts.prefix
duration: float = prefix.duration_average
if duration >= 0:
return duration

s = self.unknown_durations.get(ts.prefix.name)
s = self.unknown_durations.get(prefix.name)
if s is None:
self.unknown_durations[ts.prefix.name] = s = set()
self.unknown_durations[prefix.name] = s = set()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is unrelated (but I won't create another PR for these three lines)

processes=False,
asynchronous=True,
scheduler_sync_interval="1ms",
dashboard_address=":0",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these dashboard changes are also unrelated. Appologies. If it actually helps I will factor it out but those tests are typically disjoint from actual changes so I hope the review process is not too difficult

This change allows the tests to run in parallel

Comment on lines +9405 to +9453
# This is removing weird references like "x-foo": "foo" which often make up
# a substantial part of the graph
# This also performs culling!
dsk3 = resolve_aliases(dsk2, keys, dependents)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an actual change in behavior! This will reduce graphs sizes substantially for graphs that went through linear fusion

distributed/client.py Outdated Show resolved Hide resolved
distributed/tests/test_client.py Show resolved Hide resolved
@@ -8402,6 +8237,9 @@ async def test_release_persisted_collection(c, s, a, b):
await c.compute(arr)


@pytest.mark.skip(
reason="Deadlocks likely related to future serialization and ref counting"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an issue for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that should've been fixed by #8827

I'll remove the skip

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
metrics.update(
{
"start": start,
"duration_materialization": materialization_done - start,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I suggest that we start following Prometheus variable naming conventions here to make our lives easier in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you suggest the appropriate names? I'm not sure what the correct way is

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"duration_materialization": materialization_done - start,
"materialization_duration_seconds": materialization_done - start,

Comment on lines 4920 to 4988
task_state_created = time()
metrics.update(
{
"start": start,
"duration_materialization": materialization_done - start,
"duration_ordering": materialization_done - ordering_done,
"duration_state_initialization": ordering_done - task_state_created,
"duration_total": task_state_created - start,
}
)
evt_msg = {
"action": "update-graph",
"stimulus_id": stimulus_id,
"metrics": metrics,
"status": "OK",
}
self.log_event(["all", client, "update-graph"], evt_msg)
logger.debug("Task state created. %i new tasks", len(self.tasks) - before)
except Exception as e:
evt_msg = {
"action": "update-graph",
"stimulus_id": stimulus_id,
"status": "error",
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be useful to log (partial) metrics on exception? Also, should we add the exception to the log event?

"stimulus_id": stimulus_id,
"status": "error",
}
self.log_event(["all", client, "update-graph"], evt_msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason you prefer a dedicated update-graph topic instead of something like a scheduler topic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No particular reason. I used this in a test but I will change it. I will also drop the all topic (feels a bit like an anti pattern)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that dropping the all pattern is a user-facing breaking change. That being said, I'm all for redesigning our topics, etc., this might just require some changes for downstream users, e.g., for Coiled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again, we've also renamed the action, so these changes are breaking already.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this is breaking and I'm fine with it. This is overall a pretty burried feature and I doubt (m)any users will notice.

distributed/worker.py Outdated Show resolved Hide resolved
@fjetter
Copy link
Member Author

fjetter commented Oct 15, 2024

Also this dask/dask#11431

@fjetter fjetter mentioned this pull request Oct 16, 2024
4 tasks
@fjetter
Copy link
Member Author

fjetter commented Oct 16, 2024

The mindeps builds are sad. Everything else seems unrelated

Comment on lines 4624 to 4644
seen: set[Key] = set()
sadd = seen.add
for k in list(keys):
work = {k}
wpop = work.pop
wupdate = work.update
while work:
d = wpop()
if d in seen:
continue
sadd(d)
if d not in dsk:
if d not in self.tasks:
lost_keys.add(d)
lost_keys.add(k)
logger.info("User asked for computation on lost data, %s", k)
dependencies.pop(d, None)
keys.discard(k)
continue
wupdate(dsk[d].dependencies)
return lost_keys
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote this section. I had issues with it and just barely understand the old code (and ran into multiple bugs in the recent past)

Copy link
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code generally looks good to me. CI test results are confusing, it seems like they're out of sync with the actual test jobs?

I've added a bunch of suggestions for Prometheus-style metric naming within update_graph.

continuous_integration/environment-mindeps.yaml Outdated Show resolved Hide resolved
distributed/shuffle/_rechunk.py Show resolved Hide resolved
distributed/tests/test_utils_comm.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
Comment on lines 4739 to 4740
"new-tasks": len(new_tasks),
"key-collisions": colliding_task_count,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prometheus naming prefers underscores over hyphens

Suggested change
"new-tasks": len(new_tasks),
"key-collisions": colliding_task_count,
"new_tasks": len(new_tasks),
"key_collisions": colliding_task_count,

metrics.update(
{
"start": start,
"duration_materialization": materialization_done - start,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"duration_materialization": materialization_done - start,
"materialization_duration_seconds": materialization_done - start,

Comment on lines 4970 to 4972
"duration_ordering": materialization_done - ordering_done,
"duration_state_initialization": ordering_done - task_state_created,
"duration_total": task_state_created - start,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prometheus naming convention (roughly): (_total if accumulating)

Suggested change
"duration_ordering": materialization_done - ordering_done,
"duration_state_initialization": ordering_done - task_state_created,
"duration_total": task_state_created - start,
"ordering_duration_seconds": materialization_done - ordering_done,
"state_initialization_duration_seconds": ordering_done - task_state_created,
"duration_seconds": task_state_created - start,

(I don't have a great suggestion for the e2e duration)

task_state_created = time()
metrics.update(
{
"start": start,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"start": start,
"start_timestamp_seconds": start,

@hendrikmakait
Copy link
Member

Update: test_merge failures seem systemic (and thus related).

@fjetter
Copy link
Member Author

fjetter commented Oct 18, 2024

Update: test_merge failures seem systemic (and thus related).

I've been trying to reproduce but without any luck so far

@fjetter
Copy link
Member Author

fjetter commented Oct 18, 2024

I think the test_merge failures are actually unrelated. The exception is

  
        def validate_data(self, data: pd.DataFrame) -> None:
>           if set(data.columns) != set(self.meta.columns):
E           AttributeError: 'tuple' object has no attribute 'columns'

which indicates that data is likely a key instead of the data... I've seen this before and I think this is a dask-expr problem. I added another verification step here to confirm this. I'll keep digging

@fjetter
Copy link
Member Author

fjetter commented Oct 18, 2024

yeah, so the exception is pretty much what I expected

            if not isinstance(data, pd.DataFrame):
>               raise TypeError(f"Expected {data=} to be a DataFrame, got {type(data)}.")
E               TypeError: Expected data=('assign-3d7cfa7cea412465799bea6cfac1b512', 1) to be a DataFrame, got <class 'tuple'>.

@fjetter
Copy link
Member Author

fjetter commented Oct 18, 2024

Ah, this is the build with dask-expr disabled. Now I can reproduce!

shuffle_transfer,
(self.name_input, i),
TaskRef((self.name_input, i)),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test_merge tests w/ dask-expr enabled never take this code path. That's interesting but not incredibly surprising.

@fjetter
Copy link
Member Author

fjetter commented Oct 18, 2024

distributed/shuffle/tests/test_graph.py::test_multiple_linear failure is also related. also a legacy-only problem

@fjetter
Copy link
Member Author

fjetter commented Oct 18, 2024

dask/dask#11445 is hopefully the last one

@hendrikmakait
Copy link
Member

Is there anything left to do here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Abandon encoded tuples as task definition in dsk graphs
3 participants