-
Notifications
You must be signed in to change notification settings - Fork 1
/
workflow.py
77 lines (61 loc) · 2.93 KB
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from conductor.client.configuration.configuration import Configuration
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from conductor.client.workflow.task.fork_task import ForkTask
from services.booking_service.booking import book_ride, confirm_booking
from services.assignment_service.assignment import assign_driver
from services.payment_service.payments import make_payment
from services.notification_service.notification import notify_customer, notify_driver
def register_cab_booking_workflow() -> ConductorWorkflow:
configuration = Configuration()
workflow_executor = WorkflowExecutor(configuration=configuration)
workflow = ConductorWorkflow(
name='cab_service_saga_python', description="Cab Service Saga Python", executor=workflow_executor
)
book = book_ride(
task_ref_name='book_ride_ref',
pick_up_location=workflow.input('pick_up_location'),
drop_off_location=workflow.input('drop_off_location'),
rider_id=workflow.input('rider_id'),
)
assign = assign_driver(task_ref_name='assign_driver_ref', booking_id=book.output('booking_id'))
pay = make_payment(
task_ref_name='make_payment_ref',
rider_id=workflow.input('rider_id'),
booking_id=book.output('booking_id')
)
confirm = confirm_booking(
task_ref_name='confirm_booking_ref',
booking_id=book.output('booking_id'),
driver_id=assign.output('driver_id')
)
notification_input_params = {
'booking_id': book.output('booking_id'),
'driver_id': assign.output('driver_id'),
'rider_id' : workflow.input('rider_id'),
'from' : workflow.input('pick_up_location'),
'to' : workflow.input('drop_off_location')
}
notify_d = notify_driver(task_ref_name='notify_driver_ref', **notification_input_params)
notify_d.optional = True
notify_c = notify_customer(task_ref_name='notify_customer_ref', **notification_input_params)
notify_c.optional = True
notify = ForkTask(
task_ref_name='send_notifications',
forked_tasks=[[notify_d], [notify_c]],
join_on=[]
)
workflow >> book >> assign >> pay >> confirm >> notify
workflow.input_parameters(['pick_up_location', 'drop_off_location', 'rider_id'])
workflow.output_parameters({'booking_id': book.output('booking_id')})
metadata_client = OrkesMetadataClient(configuration)
workflowDef = workflow.to_workflow_def()
workflowDef.failure_workflow = 'cab_service_saga_cancellation_wf'
metadata_client.register_workflow_def(workflowDef, overwrite=True)
return workflow
def main():
workflow = register_cab_booking_workflow()
print("Successfully created the workflow", workflow.name)
if __name__ == '__main__':
main()