Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Dask resource #2811

Merged
merged 27 commits into from
Sep 18, 2020
Merged

Create Dask resource #2811

merged 27 commits into from
Sep 18, 2020

Conversation

kinghuang
Copy link
Contributor

@kinghuang kinghuang commented Aug 11, 2020

This PR creates a Dask resource, which manages a Dask client and optional cluster. The resource is to be used by solids and the Dask DataFrame type's materializer to compute Dask graphs.

The resource can be configured to connect to a pre-existing cluster by its scheduler address, or create a cluster on demand.

Here are example resource configs for typical use cases.

Connect to an existing Dask cluster via its scheduler:

resources:
  dask:
    config:
      client:
        name: my-dagster-pipeline
        address: tcp://dask-scheduler-here:8786

Create a local cluster with 4 workers, and 1 thread per worker:

The resource will create a Cluster object, and pass it as the address option to the client.

resources:
  dask:
    config:
      client:
        name: my-dagster-pipeline
      cluster:
        local:
          n_workers: 4
          threads_per_worker: 1

The DataFrame type does not require a Dask resource. But, if a resource is provided under the dask key, the type's materializer (if configured) will run with the resource's client as the current client. Otherwise, the global client and scheduler will apply.

Related to the above, the compute field is removed from all the materialization options. Providing a way to not compute a Dask DataFrame materialization does not make sense, as the future is never returned in the pipeline and cannot be computed at a later time. Materializations must be computed if specified.

Solids using the Dask resource for computation should consider making the resource client the current client by using the as_current() resource manager.

@solid(…)
def some_solid(context):
  with context.resources.dask.client.as_current():
    …

This PR carries the commits from #2821, and is meant to be merged after it.

@alangenfeld
Copy link
Member

can you add some basic tests to make sure this doesn't get broken over time?

@alangenfeld
Copy link
Member

other than that I think this is fine, we just need to settle out the schema changes on the PR this depends on

@kinghuang kinghuang marked this pull request as draft August 16, 2020 19:00
@kinghuang kinghuang force-pushed the dask-resource branch 2 times, most recently from 1249860 to aa75183 Compare September 1, 2020 22:06
@kinghuang
Copy link
Contributor Author

Added a couple of tests for local clusters created via the resource.

I've also changed the implementation to not require a Dask resource on the DataFrame type so that it is possible to simply run with Dask's global config (as before). And, I'm thinking of making set_as_default an option instead of forcing it to be False.

@kinghuang kinghuang marked this pull request as ready for review September 7, 2020 18:48
@kinghuang kinghuang marked this pull request as draft September 7, 2020 19:48
@kinghuang kinghuang marked this pull request as ready for review September 7, 2020 21:46
@kinghuang
Copy link
Contributor Author

@alangenfeld This is ready for review. Thanks!

@kinghuang
Copy link
Contributor Author

kinghuang commented Sep 8, 2020

Found an error with Client creation. Putting this back on draft for now.

@kinghuang kinghuang marked this pull request as draft September 8, 2020 17:54
@kinghuang kinghuang marked this pull request as ready for review September 8, 2020 18:56
@kinghuang
Copy link
Contributor Author

Updated. The code formatting issues identified by Black in dagster_dask/data_frame.py have been fixed in #2888.

@@ -288,8 +287,7 @@
"options": {
"path": (Any, True, "Path to a target filename."),
"key": (String, True, "Datapath within the files."),
"compute": (Bool, False, "Whether or not to execute immediately."),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you mean to remove this? just concerned about breaking changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all the compute options have been removed, because they don't make sense in the context of materialization configs. Setting compute=False will cause Dask to return a future, which in the materialization flow will never be computed. This would result in AssetMaterialization objects being yielded for assets that don't actually exist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a note to Changes.md at the repo root - something like

## 0.9.6 (Upcoming)
**Breaking Changes**
* [dagster-dask] removed the `compute` options key which would result in un-executed futures if used 

but add some more context on where that key is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a note to the changelog.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebased and updated for post-0.9.6.

raise ValueError(f"Unknown cluster type “{cluster_type}”.")

# Import the cluster module by name, and get the cluster Class.
cluster_module = import_module(cluster_meta["module"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit concerned about the late import here, and how missing dependency failures will manifest - but I can't come up with a solution that really solves or that. This same problem exists in the executor as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few ideas:

  1. Each cluster type's field description could mention their required module.
  2. Is there a hook for validating resource configs? That might be a good time to check if a module exists.
  3. The dagster_dask module could declare extra_requires for the modules. Though, I'm not sure if this is something this module wants to take on, considering Dask's distributed module doesn't already do it, and it might not be obvious to users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an aside, I'm aware that the resource and executor will have separate implementations for configuring and creating Dask clients and clusters with this PR. I'm thinking of following up with another PR later to bring them together under a common implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the module name to each cluster config's description. For example, the kube type's description now says:

Kubernetes cluster config. Requires dask_kubernetes.

@alangenfeld
Copy link
Member

Great, those further tweaks all sound great. Will get this merged once I get a clean buildkite run.

An aside - not sure if you caught this yet, but you can run make black and make isort at the repo root to catch formatting issues. Also make pylint but that is very very slow, so maybe just reference the cli args and point at the files you changed.

@kinghuang
Copy link
Contributor Author

Ah, I've been running those manually, and obviously getting it wrong compared to the automated checks. I'll give that a try!

Create a Dask resource to represent a Dask client and optional cluster.
The resource may create a client that connects to an existing cluster
or create a new cluster, depending on the resource config.
Providing a way to not compute a Dask DataFrame materialization does
not make sense, as the future is never returned and cannot be computed
at a later time. Materializations must be computed if specified.
Require a dask resource to be specified to use a Dask DataFrame.
Materializations will be performed using the client in the resource
instead of whatever the default client happens to be.
The passthrough_df solid simplify returns the input dataframe.
The dask_pipeline pipeline simply calls the passthrough_df solid. It is
configured with a dask_resource.
Run a pipeline with a custom Dask local cluster config and test that
the client’s scheduler has the configured numbered of workers and
threads per worker.
The scheduler config is really the client config, and is not exclusive
to specifying a cluster config. Rename scheduler to client, and also
add the missing options from the Client initializer.
Reduce the resource requirements to run the unit tests on the Dask
resource.
Directly use Dagster config types instead of inferring from native
Python types
Explicitly set the Dask resource's client as the current client using a
context manager, then test using the current client.
While the distributed module is technically the same as
dask.distributed, the latter is the canonical way of accessing it.
Only one cluster type may be configured. It is not valid to create
multiple clusters for a client.
If no cluster configuration is provided, set _cluster to None.
Use the cluster property to check and get the cluster object, instead
of directly accessing the underlying attribute.
@kinghuang
Copy link
Contributor Author

@alangenfeld Do I need some sort of config files to run black and isort? I just rebased and ran make black on the project root and it reformatted 348 files across the project. Similar for make isort.

@kinghuang
Copy link
Contributor Author

I've discarded all the changes outside of dagster_dask and pushed an update in the meantime.

@alangenfeld
Copy link
Member

ah shoot, ya you can see what versions of black and isort we are set to here https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dev-requirements.txt should have mentioned that

@kinghuang
Copy link
Contributor Author

Ah, much better. Unfortunately, it's undone some of the changes the other run of black applied. Updated! 😅

@alangenfeld alangenfeld merged commit bb63b6c into dagster-io:master Sep 18, 2020
@kinghuang kinghuang deleted the dask-resource branch September 18, 2020 18:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants