Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Step level memoization #944

Closed
vicaire opened this issue Aug 9, 2018 · 25 comments
Closed

Step level memoization #944

vicaire opened this issue Aug 9, 2018 · 25 comments
Assignees
Labels
type/feature Feature request

Comments

@vicaire
Copy link

vicaire commented Aug 9, 2018

FEATURE REQUEST:

When using Argo for ML, data caching would be an very useful feature.

For instance:

  • Workflow 1 starts
  • Workflow 1 executes container image 5 as its 10th step.
  • Argo somehow would be able to figure out whether container image 5 already executed with the exact same inputs previously. (It could have been as part of another workfow).
  • If the same image was already executed with the same inputs, the output of the previous execution would be reused, the image would not be re-executed.
  • If the container is storing data in an artifact repository, the old data would somehow be associated with the old (completed) as well as the new (bypassed) execution.

Is this something that Argo would support?

If not, would you have suggestions/ideas on whether it would be possible to support this use case? Would need necessarily require big changes to Argo? Or do you think there is a way to support this with the current implementation?

@DSchmidtDev
Copy link

@vicaire It's an interesting feature but I'm not sure if thats something Argo will offer.
But if you need the feature in near future you could have a look at Pachyderm which tries to improve the processing with caching and special data management.

@vicaire
Copy link
Author

vicaire commented Aug 10, 2018

DSchmidtDev@, ideally, we would like something that is integrated with Argo. We would also like the caching logic to be separate from the containers that handle the ML computations.

@jessesuen jessesuen added the type/feature Feature request label Apr 19, 2019
@mcharrel
Copy link

+1. We use Luigi and consider migrating to Argo.
Luigi is "able to figure out whether container image 5 already executed with the exact same inputs previously. (It could have been as part of another workfow)" and it would be great if Argo provides it as well.

@alexec
Copy link
Contributor

alexec commented Jan 3, 2020

What is the goal please - to execute workflows faster by not running steps that have already been computed?

@antoniomo
Copy link
Contributor

While the goal to execute faster is part of it, it also involves better resource usage and the ability to use Argo as a generic ETL/Data/ML workflow engine. Some workflows might be really costly in time and money, say workflows that require GPU and big CPU/RAM requirements, and might take several hours to compute.

@alexec
Copy link
Contributor

alexec commented Feb 4, 2020

Can I propose a new name for this issue - "step memoizing"? I think it captures the problem and potential solution in a way that makes it easy to to understand.

E.g. "Reduce cost and workflow execution time by memoizing previously run steps"?

This is a large feature as we'd need a data store to memoize the results.

@terrytangyuan
Copy link
Member

+1 to step memoizing

@ecurtin2
Copy link

ecurtin2 commented Apr 19, 2020

This is something I've been manually implementing in the container's logic itself. For ML workflows in particular, I've been resorting to checking if the output dataset exists then exit early. Wonder if a simple solution to some of this problem is something like

mytask:
   - name: data-stuff
     when:
        - pathNotExists:
             - s3://mybucket/key1
             - s3://mybucket/key2

For example, it'd be really lovely to do stuff like this to check for the existence of spark's _SUCCESS files for big ETL job caching. This also removes the persistence question out of argo and even k8s and let's us use the external state of the world instead.

@alexec
Copy link
Contributor

alexec commented Apr 24, 2020

See #1054

@foobarbecue
Copy link
Contributor

@ecurtin2 I'm doing the same, on an NFS share volume. Manually checking at each step whether the output exists and skipping the step if it does. It would be great to have a solution built-in to Argo.

@alexec
Copy link
Contributor

alexec commented May 26, 2020

See #3066

@jessesuen jessesuen changed the title Data Caching Step level memoization Jun 17, 2020
@jessesuen
Copy link
Member

Had some discussion about this. This is how we propose this feature will will work:

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: memoize-
spec:
  entrypoint: whalesay
  arguments:
    parameters:
    - name: message
      value: hello world

  templates:
  - name: whalesay
    memoize:
      maxAge: 3d
      cache:
        configMapName: whalesay-cache
    inputs:
      parameters:
      - name: message
    container:
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["sleep 10; cowsay {{inputs.parameters.message}} > /tmp/hello_world.txt"]
    outputs:
      parameters:
      - name: hello
        valueFrom:
          path: /tmp/hello_world.txt

---

apiVersion: v1
kind: ConfigMap
metadata:
  name: whalesay-cache
data:
  # cache key would be hash combination of:
  # - template definition
  # - inputs
  # allows cache to be invalidated if template changes or inputs change
  whalesay.abc123def4567890: |
    {
      "exipiresAt":"2020-06-18T17:11:05Z",
      "nodeID":"memoize-abx4124-123129321123", # do we need this?
      "outputs":
        {
          "parameters":
            [
              {
                "name":"hello",
                "value":"hello world"
              }
            ]
        }
    }

@jessesuen
Copy link
Member

Future improvements would include:

  1. syntax to allow this to happen for all steps of the workflow (i.e. spec.memoize)
  2. ability to control what inputs can control the cache key (e.g. allowing only some of the N inputs to affect the cache key, and ignore others)
  3. ability to use other stores as a cache (other than a ConfigMap), e.g. redis

@ecurtin2
Copy link

ecurtin2 commented Jun 22, 2020

I'd like to comment that I think being able to use an artifact-like store for this is particularly useful for me. We are often doing ETL + ML tasks that are long-lived and in namespaced deployments of workflows, and every once in a while depend on external tasks. If there was a way to maybe do some sort of cache key as a file artifact I think it could really help.

In your example I'm thinking of something like

  - name: whalesay
    memoize:
      cache:
        key: "{{memoize.template}}-{{memoize.params}}" 
        ~OR~
        key: ""   # Spark does empty _SUCCESS files so we can skip in that circumstance
        storage:
          configMap:
             name: my-configmap
          ~OR~
          artifact:
            s3:
              endpoint: aws.whateveritis.com
              bucket: my-bucket-name
              key: path/in/bucket

where the artifact's contents are the string cache.key and will only skip if the file exists and is equal
to the key, otherwise the cache is considered invalidated.

I am not familiar with the internals of Argo so am not sure if this is hard for some reason I don't understand.

@lilida
Copy link

lilida commented Jun 29, 2020

@jessesuen Thanks for implementing this. One question, should we include the actual container hash in the cache key computation because there are cases the actual container image changes but the tag keeps the same

@rbreeze
Copy link
Member

rbreeze commented Jun 29, 2020

@lilida The way we are handling this is by allowing custom keys - so that users can pass in their own input to use as a key in the instance where the tag is constant but the container image change, or other use cases where the workflow spec might stay the same but outputs change. Does this address what you were thinking?

@lilida
Copy link

lilida commented Jun 29, 2020

@rbreeze Customized key can probably address it but extra effort is required. An alternative can be some easy api or way to invalidate the cache so the invalidation be integrated with the image publishing process

@rbreeze
Copy link
Member

rbreeze commented Jun 30, 2020

@lilida That's a good idea, for the first draft we are going to stick to using custom keys only, but this might be something we explore more down the line

@terrytangyuan
Copy link
Member

terrytangyuan commented Jun 30, 2020

ICYMI Looks like @rbreeze is working on this in #3356 (the PR title does not link to this issue automatically).

@alexec alexec removed the L label Jun 30, 2020
@Ark-kun
Copy link
Member

Ark-kun commented Jul 2, 2020

@jessesuen
Thank you for your work on designing this.

In KFP we've recently implemented execution caching on top of Argo using a Kubernetes Mutating admission controller webhook. Having native support in Argo would be much better.

We have cache reuse turned on by default. The user can limit the cache reuse on per-task basis. (usually for some volatile components).
The cache key is calculated based on certain parts of Argo template (inputs, container.{image,command,args,env, volumeMounts}, volumes`). When pod succeeds we record Argo's outputs to a DB. When new pod comes and we find a viable cache DB entry we skip the execution by hacking the pod using the output data from the DB so that Argo thinks that the pod actually ran.

The way to control the cache reuse: We have an annotation called max_cache_staleness. It's per-task/step, not per-template like in your proposal. (The template authors cannot know which value to use, so it's not their responsibility - it's the responsibility of the workflow author.)

The cache selection logic is as follows:

  • Calculate the cache key
  • Query the DB and try to use the latest viable (same cache key, not too old) DB entry with with the same max_cache_staleness
  • Else query the DB and try to use the latest viable (same cache key, not too old) DB entry. If found, then copy that entry adopting the task's max_cache_staleness.
  • Else submit the pod for execution and when it succeeds record the outputs in the DB including the max_cache_staleness in the entry.

Here is the reason for this more complicated logic: Suppose you have two different workflows - "fast" (max_cache_staleness=1 day) and "slow" (max_cache_staleness=1 month). We do not want the "fast" workflow to constantly break the cache for the "slow" workflow. With the logic above the fast workflow does not affect cache queries for the slow workflow.

Some other aspects:
It might be useful to allow specifying cache_seed which is used when calculating the cache_key. This way it's possible to "break" the cache while still using the same maxAge/max_cache_staleness. (For example, suppose you have a periodic workflow and some bad data was generated and cached. You want to keep the same maxAge, but you need to switch to the different caching "track", so you just change the cache_seed)

In the future you might also want to add the "execution latching" feature: Suppose there are no completed results for your task but the same task is already being executed at this moment. Then you could just "latch" to that execution - wait for it to complete and use its outputs instead of launching a new execution.

Cache expiration is different from max_cache_staleness. Say, you run a workflow that has max_cache_staleness=1d. But three days later you run a workflow that has max_cache_staleness=7d. It could use the cached valued that were created by the first run.

@Ark-kun
Copy link
Member

Ark-kun commented Jul 8, 2020

2. ability to control what inputs can control the cache key

Do you think it would be useful to have some sensible default like container + inputs?

@alexec
Copy link
Contributor

alexec commented Jul 8, 2020

@Ark-kun are you free to Zoom with @jessesuen @rbreeze and myself about your use case please?

@Ark-kun
Copy link
Member

Ark-kun commented Jul 10, 2020

@Ark-kun are you free to Zoom with @jessesuen @rbreeze and myself about your use case please?

Sounds good. What's the good date/time for you?

@alexec
Copy link
Contributor

alexec commented Sep 2, 2020

Available for testing in v2.11.0-rc1.

@mtiller
Copy link

mtiller commented Sep 16, 2020

What is the actual solution? How do we use this (or is it automatic somehow?)

icecoffee531 pushed a commit to icecoffee531/argo-workflows that referenced this issue Jan 5, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature Feature request
Projects
None yet
Development

No branches or pull requests