-
Notifications
You must be signed in to change notification settings - Fork 0
/
TaskManager.h
executable file
·95 lines (80 loc) · 1.97 KB
/
TaskManager.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#include <set>
#include <vector>
#include <string>
#include <map>
#include "Buffer.h"
#include <sys/time.h>
#ifndef TASK_MANAGER
#define TASK_MANAGER
enum{
TASK_INIT=0,
TASK_DONE
};
enum{
TM_CLIENT_MESSAGE_RECEVIED
};
typedef int ClientFd;
typedef int TaskId;
struct Task{
int taskId;
uint64_t startOffset;
uint64_t endOffset;
uint8_t taskType;
std::string taskName;
std::string inputFile;
std::string inputFile2;
std::string outputFile;
timer_t timert;
int state;
int grpId;
Task(int taskCount)
{
taskId= taskCount;
}
void startTimer();
void stopTimer();
void sendTask(ClientFd cFd,uint16_t timeOut);
};
class TaskManager
{
int groupId;
std::string taskType;
std::string inputFile;
std::string delimiter;
int taskDone;
int mapTaskCount;
int reduceTaskCount;
int taskCount;
struct timeval begin,end;
int mapTaskTotal;
int reduceTaskTotal;
int retranTotal;
uint16_t timeOut;
double elapsed;
std::set<ClientFd> freeClients;
std::vector<TaskId> taskPending; //stack
std::map<TaskId, Task*> taskMap;
uint64_t adjustOffset(uint64_t pos, std::ifstream& inFile, uint64_t& length);
void splitter(int clientCount);
Task* createReduceTask(int taskId1, int taskId2);
int getFreeClient();
void cleanTaskMap();
public:
TaskManager(int gId, std::string &tType,
std::string &iFile,
std::string &delim,uint16_t tmOut): groupId(gId),
taskType(tType),inputFile(iFile),
delimiter(delim),taskDone(-1),
mapTaskCount(0),reduceTaskCount(0),
taskCount(0),mapTaskTotal(0),reduceTaskTotal(0),
retranTotal(0),timeOut(tmOut)
{
}
void start();
void handleNetworkEvent(int event,int clientFd, Buffer &buf);
void handleTaskTimeout(int taskId);
void servePendingTask();
void printStats();
void stop();
};
#endif