Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask improvements #208

Closed
rosepearson opened this issue Oct 11, 2023 · 13 comments · Fixed by #224
Closed

Dask improvements #208

rosepearson opened this issue Oct 11, 2023 · 13 comments · Fixed by #224
Milestone

Comments

@rosepearson
Copy link
Owner

This is an issue for an upcoming NeSI consultancy with @jennan. The focus will be on improving the performance and stability of GeoFabrics for larger scale problems.

Focus on better making use of Dask throughout the GeoFabrics stages. Two identified areas are:

  • RasterArray.interpolate_na for 2D interpolation to remove NaN values
  • RasterArray.clip in series for identifying the lowest value along a waterways

RasterArray.interpolate_na
Profiling has shown that pinch points are quite different for larger scale problems than smaller problems. Take the two profiles below.

  1. is a 6min problem with all geofabric stages. (small_2m_res.html)
    image

  2. is a 4hr problem with all geograbric stages. The only difference with 1 is it is 1m instead of 2m resolution. (small_1m_res.html)
    image

RasterArray.clip
Another area of focus (although it hasn't showed up as an issue in the 1m profiling it is vivible in the 2m) is
image
Could make use of either pandas or dask-geopandas. A weak attempt that I didn't get off the ground can be seen as a comment in processor.py
image

Also worth noting that we may be able to do this more directly using a rolling.min call to the xarray with an appropriate size window.

@rosepearson
Copy link
Owner Author

Have also noticed some unexpected failures with Dask TimeOut/Heartbeat errors. In some instances this is actually after the final chunk appears to have been written to file.

@rosepearson
Copy link
Owner Author

job.err.txt
Attached Cylc job.err file illustrating the Dask TimeOut/Heartbeat errors.

@jennan
Copy link
Collaborator

jennan commented Oct 13, 2023

@rosepearson is it making the whole code crash? Dask can be a bit verbose with errors during the tearing down procedure of the Dask cluster... although this doesn't cause any real issue (apart from logs reporting errors) in my experience.

@rosepearson
Copy link
Owner Author

@jennan it is making the whole Cylc task crash. I have another one below where we hit the time limit - but again the file had been completely written out. It could be a coincident that the file just finished writing before the time limit was reached and it didn't have time to execute the print "Job Succeeded".. but it seems to me like there might be something slightly fishy. Just attaching as a reference for now - not expecting any action on this ticket right now :)

@rosepearson
Copy link
Owner Author

job.err.txt
Attached another Cylc job.err file. Again lots of verbose Task errors. The task ultimately failed after hitting the time limit - it had written out a complete file prior to hitting the time limit, however.

@rosepearson
Copy link
Owner Author

Noting another repeated error that occurs for a particular roughness run. It repeatedly fails when launched on Maui through Cylc, but has completed producing the expected output. e.e. appears to be a false failure. I have checked running on the NIWA maui nodes without SLURM accessed through https://jupyter.maui.niwa.co.nz/ and it runs fine there. This might be a good first place to look into regarding seemingly random errors after the job has run successfully.
image
job.err.txt

tile 2728. roughness stage. takes ~15min.

@rosepearson
Copy link
Owner Author

rosepearson commented Oct 27, 2023

One more note about the failures I'm experiencing. They are often related to Sending large graph of size XX.XX MiB.

One is copied below. I am wondering if we should restructure how we deal with upsampling the coarse DEM to do that directly instead of breaking into chunks and doing each chunk individually. This would mean many fewer explicit dask delayed calls and leave it up to Dask exactly how it chooses to manage the compute load. This would mean the upsampling is all done with linear interpolation - but that seems sensible anyway.

/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/client.py:3160: UserWarning: Sending large graph of size 12.85 MiB.
This may cause some slowdown.
Consider scattering data ahead of time and using futures.
  warnings.warn(
2023-10-14 11:35:39,162 - distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/protocol/core.py", line 109, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/msgpack/__init__.py", line 38, in packb
    return Packer(**kwargs).pack(o)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 272, in msgpack._cmsgpack.Packer._pack
ValueError: memoryview is too large
2023-10-14 11:35:39,163 - distributed.comm.utils - ERROR - memoryview is too large

@rosepearson
Copy link
Owner Author

rosepearson commented Nov 1, 2023

SUMMARY

Looking back through my notes/comments:

  • Tile 2728. roughness stage. takes taking ~15min then failing with the file completed.
  • Tile 2627 roughness stage, fialed with a memory error. See high transfer load in comment below
  • An ValueError: memoryview is too large error occurs for the waterway stage of tiles 3128, 3130, 3131, 3532
  • An ValueError: 3615135876 exceeds max_bin_len(2147483647) error occurs in the waterway stage for 3431

I've also been thinking about how I break up the course DEM stage into a bunch of explicit chunks and wonder if that would be best to leave up to Dask.

  • Previously load the same coarse .tif tile many times clipping it to a very small size each time and interpolating points pulled from the XArray
  • I could/should? instead load with chunking set to true and use the XArray interp function to upsample with linear interpolation. I then have a raster mask I would use to decide where to replace the DEM with these new values using xarray.where()

Old relevant issues

@rosepearson rosepearson linked a pull request Nov 2, 2023 that will close this issue
10 tasks
@rosepearson
Copy link
Owner Author

Tile 2627 roughness
image
image

@rosepearson
Copy link
Owner Author

rosepearson commented Nov 6, 2023

Update after Consultancy 1

Most recent commit associated with this comment ae30061
Have made some of the agreed changes:

  • Save out LiADR only temp netCDF
  • Optionally save the no_data_mask to the netCDF
  • Load in the coarse DEM without chunking

I've been looking into xarray.interp with chunking. I've come across the following cases:

  1. closed - interpolate a dask-based xarray between chunk of data- Feature request: Implement interp for interpolating between chunks of data (dask) pydata/xarray#4078
  2. closed - Implement interp for interpolating between chunks of data - Implement interp for interpolating between chunks of data (dask) pydata/xarray#4155
  3. open - dask implementation of np.interp - dask implementation of np.interp dask/dask#6474

I've also tried various ways to force/encourage dask in the interp call:

  • coarse_dem = coarse_dem.interp(x=dask.array.from_array(self._dem.x, 1000), y=dask.array.from_array(self._dem.y), method="linear")

    • Error - in _validate_interp_indexers raise TypeError(type(v))
  • coarse_dem.chunk(1000).interp(x=dask.array.self._dem.x, y=self._dem.y, method="linear")

    • No error, but result has only one chunk. Get memory errors... but runs for a bit before failure
      image
      image
      image
      image
  • I tried with both the normal chunk size and also one that was 8x smaller given the coarse resolution is 8x bigger.

  • It started saving out a file, but did not succeed in completely writing the file.

Questions

  • Is xarray.interp implemented with dask support - see dashboard screen captures it seems to run across cores
  • Should we chunk the coarse_dem after all as chunking the x/y dims on their own don't seem to work...

@rosepearson
Copy link
Owner Author

rosepearson commented Nov 8, 2023

@jennan thanks for your notebook. I've implemented it with a few minor tweaks to deal with the std map layout (yx with y decreasing). Also a seemingly odd issue where the chunking needs dask.array.map_blocks needs the second array to have a chunk of its length or smaller - odd at the first array is autmatically given a chunk of its size if the specified chunk size is greater.
image

No action required - just an update

Test profiling for tests/test_dem_generation_westport_4/test_dem_generation_westport_4.py
top chunking to 10 - gives 4x5 chunks (1m54s)
image
where the
bottom xhunking to 300 - gives 1x1 chunks (1m15s)
image

Testing on (previously failing larger example.

It ran successfully produced the netCDF file which is new. It did fail with a TimeOut error shortly afterwards.

2023-11-10 02:59:37,360 - distributed.nanny - WARNING - Worker process still alive after 3.199999694824219 seconds, killing
2023-11-10 02:59:38,137 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x2aad57c94d90>>, <Task finished name='Task-180509187' coro=<SpecCluster._correct_state_internal() done, defined at /nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/deploy/spec.py:346> exception=TimeoutError()>)
Traceback (most recent call last):
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/utils.py", line 1922, in wait_for
    return await fut
           ^^^^^^^^^
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/tornado/ioloop.py", line 738, in _run_callback
    ret = callback()
          ^^^^^^^^^^
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/tornado/ioloop.py", line 762, in _discard_future_result
    future.result()
TimeoutError
Traceback (most recent call last):
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/utils.py", line 1922, in wait_for
    return await fut
           ^^^^^^^^^
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/scale_wlg_persistent/filesets/project/niwa03440/geofabrics/GeoFabrics/src/geofabrics/__main__.py", line 53, in <module>
    cli_run_from_file()
  File "/scale_wlg_persistent/filesets/project/niwa03440/geofabrics/GeoFabrics/src/geofabrics/__main__.py", line 47, in cli_run_from_file
    runner.from_instructions_file(instructions_path=args.instructions)
  File "/scale_wlg_persistent/filesets/project/niwa03440/geofabrics/GeoFabrics/src/geofabrics/runner.py", line 211, in from_instructions_file
    from_instructions_dict(instructions=instructions)
  File "/scale_wlg_persistent/filesets/project/niwa03440/geofabrics/GeoFabrics/src/geofabrics/runner.py", line 153, in from_instructions_dict
    run_processor_class(
  File "/scale_wlg_persistent/filesets/project/niwa03440/geofabrics/GeoFabrics/src/geofabrics/runner.py", line 48, in run_processor_class
    runner.run()
  File "/scale_wlg_persistent/filesets/project/niwa03440/geofabrics/GeoFabrics/src/geofabrics/processor.py", line 2999, in run
    dem = self.create_dem(waterways=waterways)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/scale_wlg_persistent/filesets/project/niwa03440/geofabrics/GeoFabrics/src/geofabrics/processor.py", line 2861, in create_dem
    runner.run()
  File "/scale_wlg_persistent/filesets/project/niwa03440/geofabrics/GeoFabrics/src/geofabrics/processor.py", line 910, in run
    with cluster, distributed.Client(cluster) as client:
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/deploy/cluster.py", line 540, in __exit__
    aw = self.close()
         ^^^^^^^^^^^^
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/deploy/spec.py", line 293, in close
    aw = super().close(timeout)
         ^^^^^^^^^^^^^^^^^^^^^^
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/deploy/cluster.py", line 226, in close
    return self.sync(self._close, callback_timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/utils.py", line 359, in sync
    return sync(
           ^^^^^
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/utils.py", line 426, in sync
    raise exc.with_traceback(tb)
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/utils.py", line 399, in f
    result = yield future
             ^^^^^^^^^^^^
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
            ^^^^^^^^^^^^^^^
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/deploy/spec.py", line 446, in _close
    await self._correct_state()
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/deploy/spec.py", line 359, in _correct_state_internal
    await asyncio.gather(*tasks)
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/nanny.py", line 595, in close
    await self.kill(timeout=timeout, reason=reason)
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/nanny.py", line 380, in kill
    await self.process.kill(reason=reason, timeout=0.8 * (deadline - time()))
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/nanny.py", line 843, in kill
    await process.join(max(0, deadline - time()))
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/process.py", line 330, in join
    await wait_for(asyncio.shield(self._exit_future), timeout)
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/site-packages/distributed/utils.py", line 1921, in wait_for
    async with asyncio.timeout(timeout):
  File "/nesi/project/niwa03440/conda/envs/geofabrics/lib/python3.11/asyncio/timeouts.py", line 111, in __aexit__
    raise TimeoutError from exc_val
TimeoutError

dashboard during LiDAR write
image
dashboard at end of execution
image

@rosepearson rosepearson removed a link to a pull request Nov 12, 2023
10 tasks
@rosepearson
Copy link
Owner Author

rosepearson commented Nov 13, 2023

Update from consultancy 2

  • Pulled changes from first consultancy into main - See Issue 219
  • Running the Cylc suite over ~40 tiles (of 425)
  • Record - Success/Failure and run time for successful execution compared to last time.
    timing_comparison.xlsx

Identified test cases

  1. Time-limit/large graph size - waterways 2827, 3432
    i. I'll rerun this without a timelimit and see how long it takes or if we get a different error
  2. Failure to close Dask cluster after completing the file write - waterways 2113, 2423, 2728, 3232, lidar 2828
    i. Example error log at: /home/pearsonra/cylc-run/cylc-geofabrics/run16/log/job/1/waterways_tile_id3232/01/job.err

Separate bugfix to fix regression errors

  • Issue 220
  • Fixes lidar 2112
  • Fixes tiles 1514, 1614, 1615, 2827, 3027, 3331

Next consultancy

  • Would be great to discuss how to address the failure at client closure errors
  • 45min testcase ending in client close error: /nesi/project/niwa03440/Cylc-Workflow-Outputs/geofabrics-tiles/tiles/2828/instruction_lidar.json
  • @jennan let me know if there is anything related to this I should prepare prior to Monday

Error logs

  • File write completed, Dask timeout - /home/personra/cylc-run/cylc-geofabrics/run16/log/job/1/lidar_tile_id2828/01/job.err 45min run to crash
  • File write completed, Dask timeout - /home/personra/cylc-run/cylc-geofabrics/run16/log/job/1/waterways_tile_id3532/01/job.err
  • File write completed, Dask timeout - /home/personra/cylc-run/cylc-geofabrics/run16/log/job/1/waterways_tile_id3232/01/job.err
  • File write completed, Dask timeout - /home/personra/cylc-run/cylc-geofabrics/run16/log/job/1/waterways_tile_id3138/01/job.err
  • File write completed, Dask timeout - /home/personra/cylc-run/cylc-geofabrics/run16/log/job/1/waterways_tile_id2728/01/job.err
  • File write completed, Dask timeout - /home/personra/cylc-run/cylc-geofabrics/run16/log/job/1/waterways_tile_id2423/01/job.err
  • File write completed, Dask timeout - /home/personra/cylc-run/cylc-geofabrics/run16/log/job/1/waterways_tile_id2113/01/job.err

@rosepearson
Copy link
Owner Author

rosepearson commented Nov 19, 2023

Consultancy 3

Tasks for Rose

  • Collect logs into a folder under niwa03440
  • Check if increasing the retry count helps - export DASK_DISTRIBUTED__COMM__RETRY__COUNT=3
  • Find the Dask folder is created - dask-worker-space - found in tmp\dask-worker-space
  • Update processor.py to nest context for clusters and then client. Tried - still an error.
  • Update processor.py cluster variable with dashboard_address: None. Tried - still an error.
  • Update processor.py cluster variable with "local_directory": "path_to_folder_with_results. This produces an output in the folder - a global.lock & purge.lock file created and not deleted prior to crash.

@rosepearson rosepearson linked a pull request Nov 30, 2023 that will close this issue
10 tasks
@rosepearson rosepearson removed a link to a pull request Nov 30, 2023
10 tasks
@rosepearson rosepearson added this to the 1.1.6 milestone Jan 4, 2024
@rosepearson rosepearson linked a pull request Jan 4, 2024 that will close this issue
10 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants