Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotate Task* objects for Cythonization #4302

Merged
merged 31 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6483235
Use `tsp` variable name for `TaskStreamPlugin`s
jakirkham Dec 7, 2020
68081a7
Use `dts` for iterated `TaskState` variables
jakirkham Dec 7, 2020
6440551
Create `list` from generator
jakirkham Dec 7, 2020
e9e8731
Use `-1` as `TaskState.nbytes` default
jakirkham Dec 7, 2020
ec36415
Assign `TaskState` instances to variables
jakirkham Dec 7, 2020
a949641
Annotate `TaskState` for Cythonization
jakirkham Dec 7, 2020
111f73b
Annotate all `TaskState` variables
jakirkham Dec 7, 2020
a85b85e
Use closure to access `TaskState.priority`
jakirkham Dec 7, 2020
3c25b24
Add `_` before all `TaskState` attributes
jakirkham Dec 7, 2020
78eb88e
Use `_` prefixed `TaskState` attributes throughout
jakirkham Dec 7, 2020
7f38dd6
Add Python-level `property`s for attributes
jakirkham Dec 7, 2020
0c35af0
Add some `property.setter`s
jakirkham Dec 7, 2020
af9461f
Drop recently added `TaskState.priority` closures
jakirkham Dec 7, 2020
e834d88
Swap assignment order with `TaskPrefix`
jakirkham Dec 7, 2020
be1957c
Assign `dask.config` query to a variable
jakirkham Dec 7, 2020
f5fdf67
Set `TaskPrefix.duration_average` to `-1`
jakirkham Dec 7, 2020
3b4b76e
Annotate `TaskPrefix` for Cythonization
jakirkham Dec 7, 2020
cabe00d
Annotate `TaskPrefix` constructor as well
jakirkham Dec 7, 2020
47d77b1
Annotate all `TaskPrefix` variables
jakirkham Dec 7, 2020
9ff0dbb
Add `_` before all `TaskPrefix` attributes
jakirkham Dec 7, 2020
9593fb0
Add Python-level `property`s for attributes
jakirkham Dec 7, 2020
5a76c8f
Use `Py_ssize_t` in `set_nbytes`
jakirkham Dec 7, 2020
5af80ca
Use `tg` for `TaskGroup` variables
jakirkham Dec 7, 2020
9802c86
Swap assignment order with `TaskGroup`
jakirkham Dec 7, 2020
53fd21e
Create `list` from generator
jakirkham Dec 7, 2020
1414ff5
Annotate `TaskGroup` for Cythonization
jakirkham Dec 7, 2020
9e2aa4f
Annotate `TaskGroup` constructor as well
jakirkham Dec 7, 2020
cc0dbb1
Set `TaskGroup.prefix` to `None` initially
jakirkham Dec 7, 2020
1f50eb2
Annotate all `TaskGroup` variables
jakirkham Dec 7, 2020
9336f2a
Add `_` before all `TaskGroup` attributes
jakirkham Dec 7, 2020
4aa4cbe
Add Python-level `property`s for attributes
jakirkham Dec 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions distributed/diagnostics/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ def __init__(self, scheduler):
prefix = ts.prefix.name
self.all[prefix].add(key)
self.state[ts.state][prefix].add(key)
if ts.nbytes is not None:
if ts.nbytes >= 0:
self.nbytes[prefix] += ts.nbytes

scheduler.add_plugin(self)
Expand All @@ -264,11 +264,11 @@ def transition(self, key, start, finish, *args, **kwargs):
except KeyError: # TODO: remove me once we have a new or clean state
pass

if start == "memory":
if start == "memory" and ts.nbytes >= 0:
# XXX why not respect DEFAULT_DATA_SIZE?
self.nbytes[prefix] -= ts.nbytes or 0
if finish == "memory":
self.nbytes[prefix] += ts.nbytes or 0
self.nbytes[prefix] -= ts.nbytes
if finish == "memory" and ts.nbytes >= 0:
self.nbytes[prefix] += ts.nbytes

if finish != "forgotten":
self.state[finish][prefix].add(key)
Expand Down Expand Up @@ -304,7 +304,7 @@ def __init__(self, scheduler):
self.create(key, k)
self.keys[k].add(key)
self.groups[k][ts.state] += 1
if ts.state == "memory" and ts.nbytes is not None:
if ts.state == "memory" and ts.nbytes >= 0:
self.nbytes[k] += ts.nbytes

scheduler.add_plugin(self)
Expand Down Expand Up @@ -347,9 +347,9 @@ def transition(self, key, start, finish, *args, **kwargs):
for dep in self.dependencies.pop(k):
self.dependents[key_split_group(dep)].remove(k)

if start == "memory" and ts.nbytes is not None:
if start == "memory" and ts.nbytes >= 0:
self.nbytes[k] -= ts.nbytes
if finish == "memory" and ts.nbytes is not None:
if finish == "memory" and ts.nbytes >= 0:
self.nbytes[k] += ts.nbytes

def restart(self, scheduler):
Expand Down
Loading