Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failing: MUR SST #15

Closed
3 of 8 tasks
ranchodeluxe opened this issue Jan 4, 2024 · 16 comments
Closed
3 of 8 tasks

Failing: MUR SST #15

ranchodeluxe opened this issue Jan 4, 2024 · 16 comments
Assignees
Labels
documentation Improvements or additions to documentation

Comments

@ranchodeluxe
Copy link
Contributor

ranchodeluxe commented Jan 4, 2024

kerchunk: https://github.com/developmentseed/pangeo-forge-staging/tree/mursst-kerchunk

Runs on LocalDirectBakery using prune option

  • kerchunk runs on LocalDirectBakery using prune option

local-runner-cofig.py at bottom

pip install git+https://github.com/pangeo-forge/pangeo-forge-recipes.git@main

EARTHDATA_USERNAME=blah EARTHDATA_PASSWORD="blah" EARTHDATA_PROTOCOL=https pangeo-forge-runner bake \
    --repo=https://github.com/developmentseed/pangeo-forge-staging \
    --ref="mursst-kerchunk" \
    -f local-runner-config.py \
    --feedstock-subdir="recipes/mursst" \
    --Bake.recipe_id=MUR-JPL-L4-GLOB-v4.1 --Bake.job_name=local_test \
    --prune
  • zarr with lazy input runs on LocalDirectBakery using prune option
pangeo-forge-runner bake \
  --repo=https://github.com/ranchodeluxe/mursst-example \
  --ref="main" \
  -f config.py

Runs on LocalDirectBakery for all timesteps

  • kerchunk on LocalDirectBakery for all timesteps

same as above without --prune

  • zarr with lazy input runs on LocalDirectBakery for all timesteps
pangeo-forge-runner bake \
  --repo=https://github.com/ranchodeluxe/mursst-example \
  --ref="main" \
  -f config.py

Runs on FlinkOperatorBakery for prune option

  • Kerchunk runs on FlinkOperatorBakery for prune option

This has not been attempted yet because it depends on #28 (for the protocol parameter) and #25 (for passing Earthdata username and password)

curl -X POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
-H "Authorization: token blahblah" \
https://api.github.com/repos/NASA-IMPACT/veda-pforge-job-runner/actions/workflows/job-runner.yaml/dispatches \
-d '{"ref":"main", "inputs":{"protocol": "s3", "repo":"https://github.com/developmentseed/pangeo-forge-staging","ref":"mursst-kerchunk","prune":"1"}}'
  • Zarr with lazy input runs on FlinkOperatorBakery for prune option
curl -X POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
-H "Authorization: token blahblah" \
https://api.github.com/repos/NASA-IMPACT/veda-pforge-job-runner/actions/workflows/job-runner.yaml/dispatches \
-d '{"ref":"main", "inputs":{"repo":"https://github.com/ranchodeluxe/mursst-example","ref":"main","prune":"1"}}'

Runs on FlinkOperatorBakery for all timesteps

  • kerchunk

Not yet tested

  • Runs on FlinkOperatorBakery for all timesteps
curl -X POST \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
-H "Authorization: token blahblah" \
https://api.github.com/repos/NASA-IMPACT/veda-pforge-job-runner/actions/workflows/job-runner.yaml/dispatches \
-d '{"ref":"main", "inputs":{"repo":"https://github.com/ranchodeluxe/mursst-example","ref":"main","prune":"0"}}'

local-runner-config.py

c.Bake.bakery_class = "pangeo_forge_runner.bakery.local.LocalDirectBakery"

c.MetadataCacheStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
# Metadata cache should be per `{{job_name}}`, as kwargs changing can change metadata
c.MetadataCacheStorage.root_path = "./metadata"

c.TargetStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
c.TargetStorage.root_path = "./target"

c.InputCacheStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
c.InputCacheStorage.root_path = "./cache"
@ranchodeluxe ranchodeluxe added the documentation Improvements or additions to documentation label Jan 4, 2024
@ranchodeluxe ranchodeluxe changed the title Working: MUR SST Failing: MUR SST Jan 4, 2024
@ranchodeluxe
Copy link
Contributor Author

ranchodeluxe commented Jan 4, 2024

The way Quinlan wrote this the data is downloaded during task kickoff on the host. So we should rewrite as I mentioned in this slack thread: https://developmentseed.slack.com/archives/C04PY5QRHCM/p1698890912079009

Needs to be rewritten to be lazy anyhow like this: https://github.com/ranchodeluxe/mursst-example/blob/main/feedstock/recipe.py

@ranchodeluxe
Copy link
Contributor Author

Local runs fail and the hunch is that the parent process is being killed during rechunking b/c it's taking too much memory. Running on Flink big box to gauge $

@ranchodeluxe
Copy link
Contributor Author

Flink run fails here and it seems there's something wrong with the data/logic: https://github.com/NASA-IMPACT/veda-pforge-job-runner/actions/runs/7469136161

@abarciauskas-bgse
Copy link
Collaborator

@ranchodeluxe should I create a new issue in this repo for the kerchunk recipe? I modified the kerchunk recipe in pangeo-forge/staged-recipes#259 to use FilePattern in place of GranuleQuery and also the WriteCombinedReference from pangeo-forge/pangeo-forge-recipes#660. It works locally for 30 timesteps.

However, it is working locally because I am using HTTPS so I think I would like to try running it on Flink so it can use direct S3 access. Right now, the protocol is hard-coded as HTTPS but I am thinking we should make it configurable as either an environment variable or (I think more ideal) part of the configuration. Do you know if there is any documentation on how to pass configuration parameters, either via the command line or config file?

@ranchodeluxe
Copy link
Contributor Author

ranchodeluxe commented Jan 12, 2024

@ranchodeluxe should I create a new issue in this repo for the kerchunk recipe? I modified the kerchunk recipe in pangeo-forge/staged-recipes#259 to use FilePattern in place of GranuleQuery and also the WriteCombinedReference from pangeo-forge/pangeo-forge-recipes#660. It works locally for 30 timesteps.

Nice! You could just edit this existing one above and point the examples to the right repo and ref and that's about the same thing. But your call

However, it is working locally because I am using HTTPS so I think I would like to try running it on Flink so it can use direct S3 access. Right now, the protocol is hard-coded as HTTPS but I am thinking we should make it configurable as either an environment variable or (I think more ideal) part of the configuration. Do you know if there is any documentation on how to pass configuration parameters, either via the command line or config file?

For the time being we can just add a global input here and add it as an os env var here (like the secrets are being handled for now). Then you recipe can conditionally check if os.environ.get('WHATEVER'). Let me know if that makes sense. There still might be a use case for a traitlet option to pump arbitrary key/values per recipe into the os environment but we can sip on that idea for now like a fine wine

@abarciauskas-bgse
Copy link
Collaborator

Thanks @ranchodeluxe I:

  1. Updated the issue above to include the local run and future flink run for kerchunk. One question I have is does it make sense to include "Runs on LocalDirectBakery for all timesteps". Do we need to include this test if its working for a few files, I feel like it makes sense to move to flink at that point. Perhaps testing more than 2 files makes sense, but testing with the entire archive seems like that is what flink is designed to handle.
  2. I opened Ab/add protocol #28 to pass the protocol configuration option (and updated the mursst-kerchunk branch)

As noted in the issue text, I think we need to merge #25 and #28 before we can run this with the flink cluster hooked up to this repo, is that right?

@ranchodeluxe
Copy link
Contributor Author

ranchodeluxe commented Jan 16, 2024

As noted in the issue text, I think we need to merge #25 and #28 before we can run this with the flink cluster hooked up to this repo, is that right?

Yep, merged these blocking PRs.

1. Updated the issue above to include the local run and future flink run for kerchunk. One question I have is does it make sense to include "Runs on `LocalDirectBakery` for all timesteps". Do we need to include this test if its working for a few files, I feel like it makes sense to move to flink at that point. Perhaps testing more than 2 files makes sense, but testing with the entire archive seems like that is what flink is designed to handle.

I mean you're right that we shouldn't need to be running all timesteps on the LocalDirectBakery. The reason I had that listed is b/c this whole stack is so new/flakey and LocalDirectBakery is really the only way to get accurate feedback about whether something is working or not. I'll probably keep running things in full on there (EC2 machine with 8 cores so it's real fast 😉) until I know that the current blockers (below) to ETL'ing full datasets are resolved. The first one is the most important blocker:

  1. Graceful Openers for Connection Timeouts? pangeo-forge/pangeo-forge-recipes#667

  2. pangeo-forge-recipes only has the changes we need on main and a release needs to be cut for these changes. That cut should also include this pending bug fix IMHO

  3. Flink Swallows Pipeline Errors #27

@abarciauskas-bgse
Copy link
Collaborator

Thanks @ranchodeluxe

this whole stack is so new/flakey and LocalDirectBakery is really the only way to get accurate feedback about whether something is working or not

that makes sense and seems like something we need to resolve to make flink useful, otherwise why wouldn't we just use the output from "tests" for all timesteps on EC2? (Reuse and reproducibility of course). Thanks for opening #27 to address this.

@ranchodeluxe
Copy link
Contributor Author

ranchodeluxe commented Jan 16, 2024

otherwise why wouldn't we just use the output from "tests" for all timesteps on EC2?

@abarciauskas-bgse: that's the backup plan 😉 Maybe we use AWS Batch. And when I say the "whole stack being new/flakey" I'm also talking about the pangeo-forge-recipes not just the Flink runner. That said we still haven't seen anything fail on Flink that runs locally so as long as we get good error feedback there's nothing wrong with Flink at this moment

@abarciauskas-bgse
Copy link
Collaborator

I did some testing on EC2 to try and figure out, if we did use a local runner, what type of resources we might need to generate the kerchunk reference in a reasonable amount of time and without error.

1. Getting setup

sudo apt-get update
sudo apt-get install python3-pip
python3 -m pip install --upgrade pip

pip install \
 fsspec \
 s3fs \
 boto3 \
 requests \
 apache-beam==2.52.0 \
 pangeo-forge-runner>='0.9.1' \
 git+https://github.com/pangeo-forge/pangeo-forge-recipes.git@main

export PATH=$PATH:/home/ubuntu/.local/bin

1b. Add local-runner-config.py

# vi local-runner-config.py
c.Bake.bakery_class = "pangeo_forge_runner.bakery.local.LocalDirectBakery"

c.MetadataCacheStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
# Metadata cache should be per `{{job_name}}`, as kwargs changing can change metadata
c.MetadataCacheStorage.root_path = "./metadata"

c.TargetStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
c.TargetStorage.root_path = "./target"

c.InputCacheStorage.fsspec_class = "fsspec.implementations.local.LocalFileSystem"
c.InputCacheStorage.root_path = "./cache"

2. Updating the temporal range

Modify the recipe in github for different temporal ranges in order to evaluate the duration of the recipe run and size of the output.

git clone https://github.com/developmentseed/pangeo-forge-staging
cd pangeo-forge-staging
git checkout mursst-kerchunk
cd ..
vi pangeo-forge-staging/recipes/mursst/recipe.py

3. Run the recipe

export EDU=aimeeb
export EDP="XXX"

time EARTHDATA_USERNAME=$EDU EARTHDATA_PASSWORD=$EDP PROTOCOL=s3 pangeo-forge-runner bake \
 --repo=./pangeo-forge-staging \
 --ref="mursst-kerchunk" \
 -f local-runner-config.py \
 --feedstock-subdir="recipes/mursst" \
 --Bake.recipe_id=MUR-JPL-L4-GLOB-v4.1 --Bake.job_name=local_test

notes:

Test 1: t3.medium 2vCPU and 4GB RAM

days time (seconds) size (mb)
30 71 22
61 145 42
91 238 62
122 334 "Channel closed prematurely" Error (see full traceback below)

Test 2: t3.xlarge 4vCPU and 16GB RAM

days time (seconds) size (mb)
30 37 14
61 68 25
91 101 34
122 137 44
153 170 54
183 161 64

Test 3: t3.2xlarge 8vCPU and 32GB RAM

days time (seconds) size (mb)
30 27 14
61 47 19
91 64 24
122 77 29
153 92 35
183 118 39

Final kerchunk size estimate

Every month adds at least 5mb size to reference file data. 70mb per year or 1400mb for 20 years.

Warnings and Errors

Concatenated coordinate time contains less than...

I saw quite a few of these:

/home/ubuntu/.local/lib/python3.10/site-packages/kerchunk/combine.py:269: UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [676285200]

which I'm concerned has to do with the different sizes

Unconsolidated metadata

Metadata does not appear to be consolidated when I open the dataset

Channel closed prematurely

Full traceback
Target Storage is FSSpecTarget(LocalFileSystem(, root_path="./target")

Input Cache Storage is CacheFSSpecTarget(LocalFileSystem(, root_path="./cache")

Metadata Cache Storage is MetadataTarget(LocalFileSystem(, root_path="./metadata")

Picked Git content provider.

Cloning into '/tmp/tmpth2qfipk'...

HEAD is now at df6ec0e Update recipe.py

Parsing recipes...
Baking only recipe_id='MUR-JPL-L4-GLOB-v4.1'
Converting string literal type hint to Any: "beam.PCollection"
Converting string literal type hint to Any: "beam.PCollection[zarr.storage.FSStore]"
Converting string literal type hint to Any: "beam.PCollection"
Converting string literal type hint to Any: "beam.PCollection"
Converting string literal type hint to Any: "beam.PCollection"
Converting string literal type hint to Any: "beam.PCollection"
Converting string literal type hint to Any: "beam.PCollection"
Converting string literal type hint to Any: "beam.PCollection"
Converting string literal type hint to Any: "beam.PCollection"
Converting string literal type hint to Any: "beam.PCollection"
Using Any for unsupported type: typing.MutableMapping
Converting string literal type hint to Any: "beam.PCollection"
Converting string literal type hint to Any: "beam.PCollection"
Converting string literal type hint to Any: "beam.PCollection"
Converting string literal type hint to Any: "beam.PCollection[zarr.storage.FSStore]"
Running job for recipe MUR-JPL-L4-GLOB-v4.1

==================== <function annotate_downstream_side_inputs at 0x7fd4394ef490> ====================
==================== <function fix_side_input_pcoll_coders at 0x7fd4394ef5b0> ====================
==================== <function pack_combiners at 0x7fd4394efac0> ====================
==================== <function lift_combiners at 0x7fd4394efb50> ====================
==================== <function expand_sdf at 0x7fd4394efd00> ====================
==================== <function expand_gbk at 0x7fd4394efd90> ====================
==================== <function sink_flattens at 0x7fd4394efeb0> ====================
==================== <function greedily_fuse at 0x7fd4394eff40> ====================
==================== <function read_to_impulse at 0x7fd4394f0040> ====================
==================== <function impulse_to_input at 0x7fd4394f00d0> ====================
==================== <function sort_stages at 0x7fd4394f0310> ====================
==================== <function add_impulse_to_dangling_transforms at 0x7fd4394f0430> ====================
==================== <function setup_timer_mapping at 0x7fd4394f0280> ====================
==================== <function populate_data_channel_coders at 0x7fd4394f03a0> ====================
starting control server on port 45411
starting data server on port 36633
starting state server on port 44875
starting logging server on port 44713
Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.SubprocessSdkWorkerHandler object at 0x7fd4392f2950> for environment ref_Environment_default_environment_1 (beam:env:harness_subprocess_python:v1, b'/usr/bin/python3 -m apache_beam.runners.worker.sdk_worker_main')
Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.SubprocessSdkWorkerHandler object at 0x7fd4392f3220> for environment ref_Environment_default_environment_1 (beam:env:harness_subprocess_python:v1, b'/usr/bin/python3 -m apache_beam.runners.worker.sdk_worker_main')
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 858491182 } message: "semi_persistent_directory: None" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111" thread: "MainThread"
Worker: severity: WARN timestamp {   seconds: 1706148549   nanos: 865040779 } message: "Discarding unparseable args: [\'--direct_runner_use_stacked_bundle\', \'--pipeline_type_check\']" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/options/pipeline_options.py:367" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 869067668 } message: "Pipeline_options: {\'runner\': \'DirectRunner\', \'direct_num_workers\': 0, \'direct_running_mode\': \'multi_processing\', \'gcp_oauth_scopes\': [\'https://www.googleapis.com/auth/bigquery\', \'https://www.googleapis.com/auth/cloud-platform\', \'https://www.googleapis.com/auth/devstorage.full_control\', \'https://www.googleapis.com/auth/userinfo.email\', \'https://www.googleapis.com/auth/datastore\', \'https://www.googleapis.com/auth/spanner.admin\', \'https://www.googleapis.com/auth/spanner.data\', \'https://www.googleapis.com/auth/bigquery\', \'https://www.googleapis.com/auth/cloud-platform\', \'https://www.googleapis.com/auth/devstorage.full_control\', \'https://www.googleapis.com/auth/userinfo.email\', \'https://www.googleapis.com/auth/datastore\', \'https://www.googleapis.com/auth/spanner.admin\', \'https://www.googleapis.com/auth/spanner.data\'], \'requirements_file\': \'/tmp/tmpth2qfipk/recipes/mursst/requirements.txt\', \'pickle_library\': \'cloudpickle\', \'save_main_session\': True, \'sdk_worker_parallelism\': \'1\', \'environment_cache_millis\': \'0\'}" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 879352331 } message: "Creating state cache with size 104857600" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/statecache.py:234" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 879851341 } message: "Creating insecure control channel for localhost:45411." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:187" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 883674860 } message: "Control channel established." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:195" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 861497402 } message: "semi_persistent_directory: None" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:111" thread: "MainThread"
Worker: severity: WARN timestamp {   seconds: 1706148549   nanos: 868108987 } message: "Discarding unparseable args: [\'--direct_runner_use_stacked_bundle\', \'--pipeline_type_check\']" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/options/pipeline_options.py:367" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 872321844 } message: "Pipeline_options: {\'runner\': \'DirectRunner\', \'direct_num_workers\': 0, \'direct_running_mode\': \'multi_processing\', \'gcp_oauth_scopes\': [\'https://www.googleapis.com/auth/bigquery\', \'https://www.googleapis.com/auth/cloud-platform\', \'https://www.googleapis.com/auth/devstorage.full_control\', \'https://www.googleapis.com/auth/userinfo.email\', \'https://www.googleapis.com/auth/datastore\', \'https://www.googleapis.com/auth/spanner.admin\', \'https://www.googleapis.com/auth/spanner.data\', \'https://www.googleapis.com/auth/bigquery\', \'https://www.googleapis.com/auth/cloud-platform\', \'https://www.googleapis.com/auth/devstorage.full_control\', \'https://www.googleapis.com/auth/userinfo.email\', \'https://www.googleapis.com/auth/datastore\', \'https://www.googleapis.com/auth/spanner.admin\', \'https://www.googleapis.com/auth/spanner.data\'], \'requirements_file\': \'/tmp/tmpth2qfipk/recipes/mursst/requirements.txt\', \'pickle_library\': \'cloudpickle\', \'save_main_session\': True, \'sdk_worker_parallelism\': \'1\', \'environment_cache_millis\': \'0\'}" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:135" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 884671449 } message: "Creating state cache with size 104857600" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/statecache.py:234" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 885025024 } message: "Creating insecure control channel for localhost:45411." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:187" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 885364055 } message: "Initializing SDKHarness with unbounded number of workers." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:243" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 888333559 } message: "Control channel established." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:195" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 890057802 } message: "Python sdk harness starting." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:211" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 889965534 } message: "Initializing SDKHarness with unbounded number of workers." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:243" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 893246889 } message: "Python sdk harness starting." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:211" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 898192644 } message: "Creating insecure state channel for localhost:44875." instruction_id: "bundle_2" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:885" thread: "Thread-11"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 898584365 } message: "State channel established." instruction_id: "bundle_2" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:892" thread: "Thread-11"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 900778770 } message: "Creating client data channel for localhost:36633" instruction_id: "bundle_2" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py:770" thread: "Thread-11"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 903797388 } message: "Creating insecure state channel for localhost:44875." instruction_id: "bundle_1" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:885" thread: "Thread-12"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 904160976 } message: "State channel established." instruction_id: "bundle_1" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:892" thread: "Thread-12"
Worker: severity: INFO timestamp {   seconds: 1706148549   nanos: 907867431 } message: "Creating client data channel for localhost:36633" instruction_id: "bundle_1" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py:770" thread: "Thread-12"
/home/ubuntu/.local/lib/python3.10/site-packages/kerchunk/combine.py:269: UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [675766800]
  warnings.warn(
/home/ubuntu/.local/lib/python3.10/site-packages/kerchunk/combine.py:269: UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [675853200]
  warnings.warn(
Killed
Exception in thread beam_control_read:
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
Exception in thread run_worker:
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/local_job_service.py", line 227, in run
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 120, in _read
Failed to read inputs in the data plane.
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs
    for elements in elements_iterator:
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 488, in __next__
    return self._next()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 480, in _next
    request = self._look_for_request()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 462, in _look_for_request
    _raise_rpc_error(self._state)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 162, in _raise_rpc_error
    raise rpc_error
grpc.RpcError
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 669, in <lambda>
    target=lambda: self._read_inputs(elements_iterator),
    raise RuntimeError(
    for data in self._input:
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 488, in __next__
RuntimeError: Worker subprocess exited with return code 137
    return self._next()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 480, in _next
    request = self._look_for_request()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 462, in _look_for_request
    _raise_rpc_error(self._state)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 162, in _raise_rpc_error
    raise rpc_error
grpc.RpcError
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 652, in _read_inputs
    for elements in elements_iterator:
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 488, in __next__
    return self._next()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 480, in _next
    request = self._look_for_request()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 462, in _look_for_request
    _raise_rpc_error(self._state)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/grpc/_server.py", line 162, in _raise_rpc_error
    raise rpc_error
grpc.RpcError
Worker: severity: INFO timestamp {   seconds: 1706148873   nanos: 758861303 } message: "No more requests from control plane" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:274" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148873   nanos: 759168624 } message: "SDK Harness waiting for in-flight requests to complete" log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:275" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148873   nanos: 759240150 } message: "Closing all cached grpc data channels." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py:803" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148873   nanos: 759358882 } message: "Closing all cached gRPC state handlers." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:904" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148873   nanos: 796697378 } message: "Done consuming work." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py:287" thread: "MainThread"
Worker: severity: INFO timestamp {   seconds: 1706148873   nanos: 796885013 } message: "Python sdk harness exiting." log_location: "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker_main.py:213" thread: "MainThread"
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 505, in input_elements
    element = received.get(timeout=1)
  File "/usr/lib/python3.10/queue.py", line 179, in get
    raise Empty
_queue.Empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ubuntu/.local/bin/pangeo-forge-runner", line 8, in <module>
    sys.exit(main())
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pangeo_forge_runner/cli.py", line 28, in main
    app.start()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pangeo_forge_runner/cli.py", line 23, in start
    super().start()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/traitlets/config/application.py", line 474, in start
    return self.subapp.start()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pangeo_forge_runner/commands/bake.py", line 326, in start
    pipeline.run()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/pipeline.py", line 585, in run
    return self.runner.run_pipeline(self, self._options)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 128, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 455, in run_stages
    bundle_results = self._execute_bundle(
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 783, in _execute_bundle
    self._run_bundle(
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1020, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1462, in process_bundle
    for result, split_result in executor.map(execute, zip(part_inputs,  # pylint: disable=bad-option-value
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 621, in result_iterator
    yield _result_or_cancel(fs.pop())
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 319, in _result_or_cancel
    return fut.result(timeout)
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/utils/thread_pool_executor.py", line 37, in run
    self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1454, in execute
    return bundle_manager.process_bundle(
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1372, in process_bundle
    for output in self._worker_handler.data_conn.input_elements(
  File "/home/ubuntu/.local/lib/python3.10/site-packages/apache_beam/runners/worker/data_plane.py", line 508, in input_elements
    raise RuntimeError('Channel closed prematurely.')
RuntimeError: Channel closed prematurely.

Next steps

  • Try and run a full year.
  • Try and merge kerchunk reference files. If we can generate a full year we should be able to run each year separately and join the results.
  • Make sure metadata can be consolidated. Not sure why it's not yet since the recipe includes a consolidate metadata step...

@abarciauskas-bgse
Copy link
Collaborator

abarciauskas-bgse commented Jan 31, 2024

@ranchodeluxe @norlandrhagen last Friday I mentioned I was running into this error: https://github.com/fsspec/kerchunk/blob/063684618c053e93e3f1f25c4688ec2765c0d962/kerchunk/combine.py#L501-L506

It does appear there are a few days in 2023 where the mur sst netcdf data is chunked differently than all the other days. I started to go down a 🐇 🕳️ of kerchunk and pangeo forge recipes, which was a goose chase, because I could actually see the different chunk shapes if I just updated my version of xarray or used h5py or h5netcdf (see pydata/xarray#8691).

unfortunately, the different chunk shape is not an issue for just a few files. All of the data has chunk shape (1, 1023, 2047) except the following date ranges:

  • 2023-02-24 to 2023-02-28
  • 2023-04-22
  • 2023-09-04 to 2023-12-31
  • Also, all of 2024 so far

My understanding is we cannot create kerchunk references for data with variable chunk shapes, which I think is the reason for the Variable Chunking Zarr Enhancement Proposal (ZEP).

In lieu of support for variable chunking in Zarr, there are 2 resolutions I can think of:

  1. Create new versions of this data with the preferred chunk shape (the data type for analysed_sset is int16 so I believe this would result in 4mb chunks for shape 1, 1023, 2047 and 51.8mb chunks for shape (1, 3600, 7200). But this would negate the reason we are using kerchunk (not creating copies of the data.
  2. Create a kerchunk reference for each chunk shape. The second kerchunk reference would be a lot smaller than the first.

Curious what you think, and also @sharkinsspatial

@abarciauskas-bgse abarciauskas-bgse self-assigned this Jan 31, 2024
@norlandrhagen
Copy link
Collaborator

Thanks for the investigation @abarciauskas-bgse! Happy to chat about this tmrrw.

@norlandrhagen
Copy link
Collaborator

I wonder if there is a 3. of reaching out to the data provider and see if we can get any clarification on why this is happening.

@abarciauskas-bgse
Copy link
Collaborator

@norlandrhagen Totally agree, I was thinking to reach out to po.daac to see if there are plans to backprocess to complete the new chunk shape across the whole dataset or otherwise deliver a consistently chunked version.

@norlandrhagen
Copy link
Collaborator

Seems like that would be the best way forward, but having the variable chunking ZEP in place would be super helpful for cases like this.

@abarciauskas-bgse
Copy link
Collaborator

abarciauskas-bgse commented Feb 2, 2024

To recap from our meeting today, I think the next steps will be:

  • Reach out to po.daac to ask about the missing dates and inconsistent chunk shapes
  • Run the full temporal extent up to when the new chunk shape is introduced (Feb 24, 2023)
  • Create a new ticket to implement variable chunking

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

No branches or pull requests

3 participants