-
Notifications
You must be signed in to change notification settings - Fork 0
/
vision_pdf_ocr.py
62 lines (50 loc) · 1.79 KB
/
vision_pdf_ocr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from workflows import Workflow, Step, Return, Switch, Condition, Assign
from workflows.standard import SysSleep, SysLog
from workflows.connectors.vision import Vision
w = Workflow(params=['input'])
w.add_step(Step('log_input', SysLog(json='${input}')))
w.add_step(Step('assign_variables', Assign(variables={'gcs_input_path': '${input.gcs_input_path}',
'gcs_output_path': '${input.gcs_output_path}'})))
# request for Cloud Vision with variable references
input_data = {
"requests": [
{
"inputConfig": {
"gcsSource": {
"uri": '${gcs_input_path}'
},
"mimeType": "application/pdf"
},
"features": [{
"type": "DOCUMENT_TEXT_DETECTION"
}],
"outputConfig": {
"gcsDestination": {
"uri": '${gcs_output_path}'
}
}
}
]
}
# using Result
ocr_res = Result('ocr_res')
# submit job
ocr_step = Step('vision_ocr', Vision.asyncBatchAnnotate(input_data), ocr_res)
# get job status
ocr_status = Step('get_vision_result', Vision.getJob('ocr_res.body.name', 'ocr_status'))
w.add_step(ocr_step)
# Sleep
wait_step = Step('wait_step', SysSleep(2))
w.add_step(wait_step)
w.add_step(ocr_status)
# final step, which returns result
step_end = Step('end_step', Return('ocr_status'))
# checking results, in next_ we're referencing step variable names instead of using string
check_status = Step('check_status', Switch([
Condition('${ocr_status.body.metadata.state!="DONE"}', next_=wait_step),
Condition('${ocr_status.body.metadata.state=="DONE"}', next_=step_end)
]))
w.add_step(check_status)
w.add_step(step_end)
if __name__ == '__main__':
print(w.to_yaml())