-
Notifications
You must be signed in to change notification settings - Fork 0
/
helper.py
185 lines (157 loc) · 6.83 KB
/
helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import importlib
import logging
import os
import random
from collections import defaultdict
from copy import deepcopy
from shutil import copyfile
from typing import Union
import numpy as np
import torch
import yaml
from torch.utils.tensorboard import SummaryWriter
from attack import Attack
from synthesizers.synthesizer import Synthesizer
from tasks.fl.fl_task import FederatedLearningTask
from tasks.task import Task
from utils.parameters import Params
from utils.utils import create_logger, create_table
logger = logging.getLogger('logger')
class Helper:
params: Params = None
task: Union[Task, FederatedLearningTask] = None
synthesizer: Synthesizer = None
attack: Attack = None
tb_writer: SummaryWriter = None
def __init__(self, params):
self.params = Params(**params)
self.times = {'backward': list(), 'forward': list(), 'step': list(),
'scales': list(), 'total': list(), 'poison': list()}
if self.params.random_seed is not None:
self.fix_random(self.params.random_seed)
self.make_folders()
self.make_task()
self.make_synthesizer()
self.attack = Attack(self.params, self.synthesizer)
# if 'spectral_evasion' in self.params.loss_tasks:
# self.attack.fixed_model = deepcopy(self.task.model)
self.best_loss = float('inf')
def make_task(self):
name_lower = self.params.task.lower()
name_cap = self.params.task
if self.params.fl:
module_name = f'tasks.fl.{name_lower}_task'
path = f'tasks/fl/{name_lower}_task.py'
else:
module_name = f'tasks.{name_lower}_task'
path = f'tasks/{name_lower}_task.py'
try:
task_module = importlib.import_module(module_name)
task_class = getattr(task_module, f'{name_cap}Task')
except (ModuleNotFoundError, AttributeError):
raise ModuleNotFoundError(f'Your task: {self.params.task} should '
f'be defined as a class '
f'{name_cap}'
f'Task in {path}')
self.task = task_class(self.params)
def make_synthesizer(self):
name_lower = self.params.synthesizer.lower()
name_cap = self.params.synthesizer
module_name = f'synthesizers.{name_lower}_synthesizer'
try:
synthesizer_module = importlib.import_module(module_name)
task_class = getattr(synthesizer_module, f'{name_cap}Synthesizer')
except (ModuleNotFoundError, AttributeError):
raise ModuleNotFoundError(
f'The synthesizer: {self.params.synthesizer}'
f' should be defined as a class '
f'{name_cap}Synthesizer in '
f'synthesizers/{name_lower}_synthesizer.py')
self.synthesizer = task_class(self.task)
def make_folders(self):
log = create_logger()
if self.params.log:
try:
os.mkdir(self.params.folder_path)
except FileExistsError:
log.info('Folder already exists')
fh = logging.FileHandler(
filename=f'{self.params.folder_path}/log.txt')
formatter = logging.Formatter('%(asctime)s - %(name)s '
'- %(levelname)s - %(message)s')
fh.setFormatter(formatter)
log.addHandler(fh)
log.warning(f'Logging to: {self.params.folder_path}')
with open(f'{self.params.folder_path}/params.yaml.txt', 'w') as f:
yaml.dump(self.params, f)
if self.params.tb:
wr = SummaryWriter(log_dir=f'runs/{self.params.name}')
self.tb_writer = wr
params_dict = self.params.to_dict()
table = create_table(params_dict)
self.tb_writer.add_text('Model Params', table)
def save_model(self, model=None, epoch=0, val_loss=0):
if self.params.save_model:
logger.info(f"Saving model to {self.params.folder_path}.")
model_name = '{0}/model_last.pt.tar'.format(self.params.folder_path)
saved_dict = {'state_dict': model.state_dict(),
'epoch': epoch,
'lr': self.params.lr,
'params_dict': self.params.to_dict()}
self.save_checkpoint(saved_dict, False, model_name)
if epoch in self.params.save_on_epochs:
logger.info(f'Saving model on epoch {epoch}')
self.save_checkpoint(saved_dict, False,
filename=f'{model_name}.epoch_{epoch}')
if val_loss < self.best_loss:
self.save_checkpoint(saved_dict, False, f'{model_name}.best')
self.best_loss = val_loss
def save_checkpoint(self, state, is_best, filename='checkpoint.pth.tar'):
if not self.params.save_model:
return False
torch.save(state, filename)
if is_best:
copyfile(filename, 'model_best.pth.tar')
def flush_writer(self):
if self.tb_writer:
self.tb_writer.flush()
def plot(self, x, y, name):
if self.tb_writer is not None:
self.tb_writer.add_scalar(tag=name, scalar_value=y, global_step=x)
self.flush_writer()
else:
return False
def report_training_losses_scales(self, batch_id, epoch):
if not self.params.report_train_loss or \
batch_id % self.params.log_interval != 0:
return
total_batches = len(self.task.train_loader)
losses = [f'{x}: {np.mean(y):.2f}'
for x, y in self.params.running_losses.items()]
scales = [f'{x}: {np.mean(y):.2f}'
for x, y in self.params.running_scales.items()]
logger.info(
f'Epoch: {epoch:3d}. '
f'Batch: {batch_id:5d}/{total_batches}. '
f' Losses: {losses}.'
f' Scales: {scales}')
for name, values in self.params.running_losses.items():
self.plot(epoch * total_batches + batch_id, np.mean(values),
f'Train/Loss_{name}')
for name, values in self.params.running_scales.items():
self.plot(epoch * total_batches + batch_id, np.mean(values),
f'Train/Scale_{name}')
self.params.running_losses = defaultdict(list)
self.params.running_scales = defaultdict(list)
@staticmethod
def fix_random(seed=1):
from torch.backends import cudnn
logger.warning('Setting random_seed seed for reproducible results.')
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
cudnn.deterministic = False
cudnn.enabled = True
cudnn.benchmark = True
np.random.seed(seed)
return True