-
Notifications
You must be signed in to change notification settings - Fork 1
/
master.py
95 lines (77 loc) · 2.52 KB
/
master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import threading
import logging
import datetime
import queue
import sys
from pprint import pprint
from typing import Dict
from master_utils.getRequest import getRequestData, processTaskQueue
from master_utils.recvWorker import recvFromWorker, processWorkerMessage
from master_utils.scheduler import RandomScheduler, RoundRobinScheduler, LeastLoaded
from config_utils import getWorkers
# read configuration
workers = getWorkers("config.json")
pprint(workers)
# Initialize schedulers
scheduler = RandomScheduler(workers)
if len(sys.argv) > 1:
scAlgo = sys.argv[1]
scheduler = None
if scAlgo == "rr":
scheduler = RoundRobinScheduler(workers)
elif scAlgo == "ll":
scheduler = LeastLoaded(workers)
logFile = f"master_{scheduler.name}.log"
class CustomFormatter(logging.Formatter):
"""To get milliseconds in log"""
converter = datetime.datetime.fromtimestamp
def formatTime(self, record, datefmt=None):
ct = self.converter(record.created)
if datefmt:
s = ct.strftime(datefmt)
else:
t = ct.strftime("%Y-%m-%d %H:%M:%S")
s = "%s.%03d" % (t, record.msecs)
return s
logFormatter = CustomFormatter(
fmt="%(asctime)s: %(message)s", datefmt="%Y-%m-%dT%H:%M:%S.%f%z"
)
logFileHandler = logging.FileHandler(logFile, mode="w")
logFileHandler.setFormatter(logFormatter)
logStreamHandler = logging.StreamHandler()
logStreamHandler.setFormatter(logFormatter)
logHandlers = [logFileHandler, logStreamHandler]
logging.basicConfig(level=logging.INFO, handlers=logHandlers)
# message queues
workerMessages = queue.Queue()
taskQueue = queue.Queue()
# dictionary to associate map task with corresponding reduce task
jobStore: Dict[str, Dict[str, int]] = dict()
# host and port details
host = "localhost"
clientPort = 5000
recvWorkerPort = 5001
sendWorkerPort = 4000
# start master threads
logging.info("Starting client requests thread.")
clientThread = threading.Thread(
target=getRequestData, args=(host, clientPort, taskQueue, jobStore)
)
processQueueThread = threading.Thread(
target=processTaskQueue, args=(taskQueue, scheduler)
)
# worker threads
workerThread = threading.Thread(
target=recvFromWorker, args=(host, recvWorkerPort, workerMessages)
)
workerMsgThread = threading.Thread(
target=processWorkerMessage, args=(workerMessages, taskQueue, workers, jobStore)
)
clientThread.start()
processQueueThread.start()
workerThread.start()
workerMsgThread.start()
clientThread.join()
processQueueThread.join()
workerThread.join()
workerMsgThread.join()