-
Notifications
You must be signed in to change notification settings - Fork 2
/
main.py
149 lines (124 loc) · 6.03 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python
import yaml
import sys, getopt
from typing import List
import traceback
import asyncio
import json
import logging
from Clusters import BaseDeployment
from Clusters import OpenWhiskDeployment
from Clusters import GoogleDeployment
from FederatedLearning import FederatedLearning
functions_meta = []
from commons.Logger import ScriptLogger
logging.basicConfig(level=logging.DEBUG)
logger = ScriptLogger(__name__, 'SWI.log')
logger.setLevel(logging.DEBUG)
logging.captureWarnings(True)
async def clean_up_setup(configfile: str, fl_learning_obj: FederatedLearning,
openwhisk_obj: OpenWhiskDeployment, gcf_obj: GoogleDeployment):
with open(configfile, 'r') as stream:
try:
data = yaml.safe_load(stream)
# step1: Firstly, remove previous clients and manager functions if they exist
await fl_learning_obj.remove_managing_functions(configfile, openwhisk_obj,
data['scenarios']['federated_learning'][
'clients_info']['total_num_clients'],
'openwhisk',
"invasic_cluster",
)
# step2: Remove openwhisk clients
await fl_learning_obj.remove_clients(
data['scenarios']['federated_learning']['clients_info']['edge_clients_count'],
data['scenarios']['federated_learning']['clients_info']['edge_clients_count'] +
data['scenarios']['federated_learning']['clients_info']['ow_clients_count'],
configfile, openwhisk_obj, 'openwhisk', "invasic_cluster")
# step3: Remove GCF clients
await fl_learning_obj.remove_clients(
data['scenarios']['federated_learning']['clients_info']['edge_clients_count'] +
data['scenarios']['federated_learning']['clients_info']['ow_clients_count'],
data['scenarios']['federated_learning']['clients_info']['total_num_clients'],
configfile, gcf_obj, 'google', "gcf_cluster")
except yaml.YAMLError as exc:
print(exc)
async def create_setup(configfile: str, fl_learning_obj: FederatedLearning,
openwhisk_obj: OpenWhiskDeployment, gcf_obj: GoogleDeployment):
with open(configfile, 'r') as stream:
try:
data = yaml.safe_load(stream)
# step1: Firstly, create parameters files
await fl_learning_obj.create_all_params_files(configfile)
# step2: Deploy managing functions
await fl_learning_obj.deploy_managing_functions(configfile, openwhisk_obj,
data['scenarios']['federated_learning'][
'clients_info']['total_num_clients'],
'openwhisk',
"invasic_cluster",
)
# step2: Deploy openwhisk clients
await fl_learning_obj.deploy_clients(
data['scenarios']['federated_learning']['clients_info']['edge_clients_count'],
data['scenarios']['federated_learning']['clients_info']['edge_clients_count'] +
data['scenarios']['federated_learning']['clients_info']['ow_clients_count'],
configfile, openwhisk_obj, 'openwhisk', "invasic_cluster")
# step3: Deploy GCF clients
await fl_learning_obj.deploy_clients(
data['scenarios']['federated_learning']['clients_info']['edge_clients_count'] +
data['scenarios']['federated_learning']['clients_info']['ow_clients_count'],
data['scenarios']['federated_learning']['clients_info']['total_num_clients'],
configfile, gcf_obj, 'google', "gcf_cluster")
except yaml.YAMLError as exc:
print(exc)
async def main(argv):
openwhisk_obj = OpenWhiskDeployment()
google_obj = GoogleDeployment()
fl_learning_obj = FederatedLearning()
configfile = ''
deployment = False
remove = False
start_fl = False
try:
arguments, values = getopt.getopt(argv, "hc:drs", ["help", "configfile=", "deploy", "remove", "start_fl"])
except getopt.GetoptError:
print('main.py -c <configfile path> '
'-d <for deploying> -r <for removing>')
sys.exit(2)
for current_argument, current_value in arguments:
if current_argument in ("-h", "--help"):
print('python3 deploy.py \n -c <configfile path>'
'\n -d <for deploying> \n -r <for removing> \n -s <for starting federated learning>')
elif current_argument in ("-c", "--configfile"):
configfile = current_value
elif current_argument in ("-d", "--deploy"):
deployment = True
elif current_argument in ("-r", "--remove"):
remove = True
elif current_argument in ("-s", "--start_fl"):
start_fl = True
tasks: List[asyncio.Task] = []
if deployment:
tasks.append(
asyncio.create_task(
create_setup(configfile, fl_learning_obj, openwhisk_obj, google_obj)
)
)
elif remove:
tasks.append(
asyncio.create_task(
clean_up_setup(configfile, fl_learning_obj, openwhisk_obj, google_obj)
)
)
elif start_fl:
fl_learning_obj.start_fl_learning(configfile, openwhisk_obj)
# wait for all workers
if len(tasks):
try:
await asyncio.wait(tasks)
except Exception as e:
print("Exception in main worker loop")
print(e)
traceback.print_exc()
print("All deployment/removal finished")
if __name__ == "__main__":
asyncio.run(main(sys.argv[1:]))