Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INV: Real-time resource recording, usage and feedback mechanism #5

Open
cpelley opened this issue Feb 23, 2024 · 1 comment
Open

INV: Real-time resource recording, usage and feedback mechanism #5

cpelley opened this issue Feb 23, 2024 · 1 comment

Comments

@cpelley
Copy link
Collaborator

cpelley commented Feb 23, 2024

Dask has the means to manage resources at run-time and autoscale. However it is unknown how effective/efficient this will be in a real world scenario with large workflows. To that end, we have a fall-back plan which intends to instruct dask accurately with memory usage of each processing step while maintaining total separation with the configuration/recipe it is executing.

The new framework will provide a means to instruct the python graph of each steps resource requirement (memory footprint) and also to then write its footprint within each execution step via the logging and monitoring capability (#4) to a database or otherwise for more accurate estimation for execution in the next cycle. That is, a feedback mechanism which allows adjusting memory footprint requirements that can evolve to reflect the changing circumstances based on weather.

  • Get estimate for memory footprint of plugin.
  • Utilise the logging and monitoring framework to record the memory footprint for future more accurate estimations.
  • Instruct dask graph of node memory footprint.

sizeof

Register a custom sizeof, for example, for cubes:

from dask.sizeof import sizeof
from iris.cube import Cube

@sizeof.register(Cube)
def sizeof_cube(cube):
    return cube.lazy_data().nbytes

print(sys.getsizeof(cube), sizeof(cube))
56 8000000

providing an estimate of the operations (plugin) footprint

Ideally dask will handle this without too much trouble via spill over to disk. However it might not be effective at this.
If dask turns out not to be effective enough at this, we can look at passing a dummy object to a task representing the expected memory consumption of the operation to be performed.

Using a "dummy" parameter to represent the anticipated memory usage of an operation will influence Dask's scheduling decisions indirectly. This method would involve attaching a proxy or placeholder object that simulates the memory footprint required by the operation and registering a custom sizeof function for that object.

class MemoryProxy:
    def __init__(self, estimated_size):
        self.estimated_size = estimated_size


@sizeof.register(MemoryProxy)
def sizeof_memory_proxy(proxy):
    return proxy.estimated_size

However, it's not certain whether this might negatively impact dasks ability to autoscale with spill over. That is, perhaps this would be only appropriate if estimates are accurate, meaning there are no spill over to disk.

One possible partway mitigation is simply stopping dask from being able to spill the memory object to disk:

class MemoryProxy:
    def __init__(self, estimated_size):
        self.estimated_size = estimated_size

    # Prevent serialization, thus making it non-spillable
    def __reduce__(self):
        raise TypeError("MemoryProxy objects should not be serialized or spilled to disk.")

However, dask will still think this memory as represented by the MemoryProxy object is consumed.

Alternative avenue

It appears that dask delayed (dask.delayed) objects can have called set_resources(memory=...)
Could we do something like this:

tasks = [
    obj.set_resources(memory=req)
    for obj, req in zip(delayed_objects, memory_requirements)
]

Issues

Dependencies

Background reading

@cpelley
Copy link
Collaborator Author

cpelley commented Jul 15, 2024

Recording memory footprint of plugin execution is handled by #5
The things remaining from this issue are then this feedback mechanism referenced. That is, reading from the sqlite database and wrapping execution in objects with 'size' reflecting their likely footprint.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants