-
Notifications
You must be signed in to change notification settings - Fork 42
/
test_conditional_outputs.py
79 lines (63 loc) · 2.11 KB
/
test_conditional_outputs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import random
import time
from dflow import (OutputArtifact, OutputParameter, Outputs, Step, Steps,
Workflow, if_expression)
from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate,
upload_packages)
if "__file__" in locals():
upload_packages.append(__file__)
class Random(OP):
@classmethod
def get_input_sign(cls):
return OPIOSign()
@classmethod
def get_output_sign(cls):
return OPIOSign({
"is_head": bool,
"msg1": str,
"msg2": str,
"foo": Artifact(str),
"bar": Artifact(str)
})
@OP.exec_sign_check
def execute(
self,
op_in: OPIO,
) -> OPIO:
open("foo.txt", "w").write("head")
open("bar.txt", "w").write("tail")
if random.random() < 0.5:
is_head = True
else:
is_head = False
return OPIO({
"is_head": is_head,
"msg1": "head",
"msg2": "tail",
"foo": "foo.txt",
"bar": "bar.txt"
})
def test_conditional_outputs():
steps = Steps("conditional-steps", outputs=Outputs(
parameters={"msg": OutputParameter()},
artifacts={"res": OutputArtifact()}))
random_step = Step(
name="random",
template=PythonOPTemplate(Random, image="python:3.8")
)
steps.add(random_step)
steps.outputs.parameters["msg"].value_from_expression = if_expression(
_if=random_step.outputs.parameters["is_head"],
_then=random_step.outputs.parameters["msg1"],
_else=random_step.outputs.parameters["msg2"])
steps.outputs.artifacts["res"].from_expression = if_expression(
_if=random_step.outputs.parameters["is_head"],
_then=random_step.outputs.artifacts["foo"],
_else=random_step.outputs.artifacts["bar"])
wf = Workflow(name="conditional", steps=steps)
wf.submit()
while wf.query_status() in ["Pending", "Running"]:
time.sleep(1)
assert(wf.query_status() == "Succeeded")
if __name__ == "__main__":
test_conditional_outputs()