Skip to content

Commit

Permalink
fix(cli): Update the map-reduce example, fix bug. (#4948)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec authored Jan 26, 2021
1 parent e7e51d0 commit c729306
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 83 deletions.
6 changes: 0 additions & 6 deletions docs/fields.md
Original file line number Diff line number Diff line change
Expand Up @@ -3699,8 +3699,6 @@ ValueFrom describes a location in which to obtain the value to a parameter

- [`k8s-wait-wf.yaml`](https://github.com/argoproj/argo/blob/master/examples/k8s-wait-wf.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down Expand Up @@ -3973,8 +3971,6 @@ Sequence expands a workflow step into numeric range

- [`loops-sequence.yaml`](https://github.com/argoproj/argo/blob/master/examples/loops-sequence.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`work-avoidance.yaml`](https://github.com/argoproj/argo/blob/master/examples/work-avoidance.yaml)
</details>

Expand Down Expand Up @@ -5985,8 +5981,6 @@ EnvVarSource represents a source for the value of an EnvVar.

- [`k8s-wait-wf.yaml`](https://github.com/argoproj/argo/blob/master/examples/k8s-wait-wf.yaml)

- [`map-reduce.yaml`](https://github.com/argoproj/argo/blob/master/examples/map-reduce.yaml)

- [`memoize-simple.yaml`](https://github.com/argoproj/argo/blob/master/examples/memoize-simple.yaml)

- [`nested-workflow.yaml`](https://github.com/argoproj/argo/blob/master/examples/nested-workflow.yaml)
Expand Down
119 changes: 46 additions & 73 deletions examples/map-reduce.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
# This workflow demonstrates a basic map-reduce.
# This requires you have a artifact repository configured.
#
# Notes:
# - You'll need to have an user namespaced artifact repository set-up to save intermediate results for this workflow.
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: map-reduce-
annotations:
workflows.argoproj.io/description: |
This workflow demonstrates map-reduce using "key-only" artifacts.
The first task "split" produces a number of parts, each in the form of a JSON document, saving it to a bucket.
Each "map" task then reads those documents, performs a map operation, and writes them out to a new bucket.
Finally, "reduce" merges all the mapped documents into a final document.
workflows.argoproj.io/version: '>= 3.0.0'
spec:
entrypoint: main
arguments:
parameters:
- name: numParts
value: "4"
- name: numGroups
value: "2"
templates:
- name: main
dag:
Expand All @@ -31,28 +31,21 @@ spec:
parameters:
- name: partId
value: '{{item}}'
- name: numGroups
value: '{{workflow.parameters.numGroups}}'
artifacts:
- name: parts
from: '{{tasks.split.outputs.artifacts.parts}}'
- name: part
s3:
key: "{{workflow.name}}/parts/{{item}}.json"
dependencies:
- split
withParam: '{{tasks.split.outputs.result}}'
- name: reduce
template: reduce
arguments:
parameters:
- name: group
value: '{{item}}'
dependencies:
- map
withSequence:
count: "{{workflow.parameters.numGroups}}"
# The `split` task creates a number of "parts". Each part has a unique ID (e.g. part-0, part-1).
# This task writes the part IDs to stdout (so that the `map` task can be expanded to have one task per part).
# And, it writes one "part file" for each of pieces of processing that needs doing, into to single directory
# which is then saved a output artifact.
# The `split` task creates a number of "parts". Each part has a unique ID (index).
# This task writes one "part file" for each of pieces of processing that needs doing, into to single directory
# which is then saved as an output artifact.
# Finally, it dumps a list of part ID to stdout.
- name: split
inputs:
parameters:
Expand All @@ -65,26 +58,29 @@ spec:
import json
import os
import sys
os.mkdir("/tmp/parts")
partIds = list(map(lambda x: "part-" + str(x), range({{inputs.parameters.numParts}})))
os.mkdir("/mnt/out")
partIds = list(map(lambda x: str(x), range({{inputs.parameters.numParts}})))
for i, partId in enumerate(partIds, start=1):
with open("/tmp/parts/" + partId + ".json", "w") as out:
json.dump({"foo": i}, out)
with open("/mnt/out/" + partId + ".json", "w") as f:
json.dump({"foo": i}, f)
json.dump(partIds, sys.stdout)
outputs:
artifacts:
- name: parts
path: /tmp/parts
# One `map` per part ID is started. Finds its own "part file" under `/tmp/parts/${partId}`.
path: /mnt/out
archive:
none: { }
s3:
key: "{{workflow.name}}/parts"
# One `map` per part ID is started. Finds its own "part file" under `/mnt/in/part.json`.
# Each `map` task has an output artifact saved with a unique name for the part into to a common "results directory".
- name: map
inputs:
parameters:
- name: partId
- name: numGroups
artifacts:
- name: parts
path: /tmp/parts
- name: part
path: /mnt/in/part.json
script:
image: python:alpine3.6
command:
Expand All @@ -93,48 +89,26 @@ spec:
import json
import os
import sys
partId = "{{inputs.parameters.partId}}"
numGroups = {{inputs.parameters.numGroups}}
os.mkdir("/tmp/results")
with open("/tmp/parts/" + partId + ".json") as f:
os.mkdir("/mnt/out")
with open("/mnt/in/part.json") as f:
part = json.load(f)
with open("/tmp/results/" + partId + ".json", "w") as out:
json.dump({"bar": part["foo"] * 2, "group": part["foo"] % numGroups}, out)
with open("/mnt/out/part.json", "w") as f:
json.dump({"bar": part["foo"] * 2}, f)
outputs:
artifacts:
- name: result
path: /tmp/results/{{inputs.parameters.partId}}.json
- name: part
path: /mnt/out/part.json
archive:
none: { }
s3:
bucket: my-bucket
endpoint: minio:9000
insecure: true
accessKeySecret:
name: my-minio-cred
key: accesskey
secretKeySecret:
name: my-minio-cred
key: secretkey
key: "{{workflow.name}}/results/{{inputs.parameters.partId}}.json"
# The `reduce` task takes the "results directory" and returns a single result.
- name: reduce
inputs:
parameters:
- name: group
artifacts:
- name: result
path: /tmp/results
- name: results
path: /mnt/in
s3:
bucket: my-bucket
endpoint: minio:9000
insecure: true
accessKeySecret:
name: my-minio-cred
key: accesskey
secretKeySecret:
name: my-minio-cred
key: secretkey
key: "{{workflow.name}}/results"
script:
image: python:alpine3.6
Expand All @@ -145,18 +119,17 @@ spec:
import os
import sys
total = 0
group = "{{inputs.parameters.group}}"
os.mkdir("/tmp/totals/")
for f in list(map(lambda x: open("/tmp/results/" + x), os.listdir("/tmp/results"))):
os.mkdir("/mnt/out")
for f in list(map(lambda x: open("/mnt/in/" + x), os.listdir("/mnt/in"))):
result = json.load(f)
if result["group"] == group:
total = total + result["bar"]
with open("/tmp/totals/" + group, "w") as f:
f.write(str(total))
f.close()
total = total + result["bar"]
with open("/mnt/out/total.json" , "w") as f:
json.dump({"total": total}, f)
outputs:
parameters:
- name: total-{{inputs.parameters.group}}
globalName: total-{{inputs.parameters.group}}
valueFrom:
path: /tmp/totals/{{inputs.parameters.group}}
artifacts:
- name: total
path: /mnt/out/total.json
archive:
none: { }
s3:
key: "{{workflow.name}}/total.json"
4 changes: 2 additions & 2 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,8 +778,8 @@ func validateArgumentsValues(prefix string, arguments wfv1.Arguments) error {
}
}
for _, art := range arguments.Artifacts {
if art.From == "" && !art.HasLocation() {
return errors.Errorf(errors.CodeBadRequest, "%s%s.from or artifact location is required", prefix, art.Name)
if art.From == "" && !art.HasLocationOrKey() {
return errors.Errorf(errors.CodeBadRequest, "%s%s.from, artifact location, or key is required", prefix, art.Name)
}
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions workflow/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,11 +1169,11 @@ spec:
func TestInvalidArgumentNoFromOrLocation(t *testing.T) {
_, err := validate(invalidStepsArgumentNoFromOrLocation)
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "from or artifact location is required")
assert.Contains(t, err.Error(), "from, artifact location, or key is required")
}
_, err = validate(invalidDAGArgumentNoFromOrLocation)
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "from or artifact location is required")
assert.Contains(t, err.Error(), "from, artifact location, or key is required")
}
}

Expand Down

0 comments on commit c729306

Please sign in to comment.