-
Notifications
You must be signed in to change notification settings - Fork 0
/
ep.c
158 lines (131 loc) · 4.09 KB
/
ep.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/**
* Endpoints. Please #include this file in runtime.c
*/
struct ep_t* ep_make(int32_t full, int32_t self, struct board_t* board) {
struct ep_t* ep = (struct ep_t*)malloc(sizeof(struct ep_t));
ep->self = self;
ep->full = full;
ep->board = board_ref(board);
g_endpoints[++g_eindex] = ep;
return ep;
}
void ep_free(struct ep_t* ep) {
board_free(ep->board);
free(ep);
for (int i = 0; i <= g_eindex; i++) {
if (g_endpoints[i] == ep)
g_endpoints[i] = NULL;
}
return;
}
struct ep_t* ep_split(struct ep_t* ep, int32_t split) {
assert(MSG_SET_SUP(ep->self, split));
ep->self = MSG_SET_MINUS(ep->self, split);
struct ep_t* ret = ep_make(ep->full, split, board_ref(ep->board));
return ret;
}
void ep_send(struct ep_t* ep, int label, int32_t from, int32_t to, void* payload) {
struct msg_t* m = msg_make(label, from, to, payload);
struct board_t* child = board_write(ep->board, m);
if (child != ep->board) {
struct board_t* old = ep->board;
ep->board = board_ref(child);
board_free(old);
}
return;
}
void* ep_recv(struct ep_t* ep, int label, int32_t from, int32_t to) {
struct msg_t* p = msg_make(label, from, to, NULL);
struct msg_t* found = msg_make(-1, INT32_MIN, INT32_MIN, NULL);
struct board_t* child = board_read(ep->board, p, found);
assert(found->label == p->label);
if (child != ep->board) {
struct board_t* old = ep->board;
ep->board = board_ref(child);
board_free(old);
}
// The pointer is copied. The actual content is not owned by the message.
// This is only possible in shared memory.
void* payload = found->payload;
msg_free(p);
msg_free(found);
if (label == MSG_SYNC_CLOSE) {
log_trace("SYNC_CLOSE Received!");
}
// switch (label) {
// case MSG_MSG:
// log_info("MSG %s read from %s", payload, ep->board->id);
// break;
// }
return payload;
}
void ep_sync(struct ep_t* ep, int label, int syncer) {
int32_t senders = MSG_SET_MINUS(ep->full, ep->self);
char buffer[100];
int i = 0;
while (senders > 0) {
for(unsigned int mask = 0x80; mask; mask >>= 1) {
i += sprintf(buffer+i, "%d", !!(mask & senders));
}
log_trace("Syncing with %s", buffer);
int32_t sender = (int32_t)(ep_recv(ep, label, INT32_MIN, syncer));
senders = MSG_SET_MINUS(senders, sender);
i = 0;
}
return;
}
/**
* Link two endpoints. The operation puts the reference of each one into the other.
*
* When reading:
* - If KILL, jump to the other board and read again.
* - If KILL is the only message, return the new board.
* - Otherwise, keep using the old board.
* - If KEEP,
* - If the KEEP is sent by oneself (self < KEEP.senders, or !(self < KEEP.receivers)), skip.
* - Otherwise, jump to the other board and read again.
* - If succeed, return.
* - If failed, restart search from the next one.
* - If the other board only contains the KILL, remove KEEP.
*
* When writing:
* - If KILL, jump to the other board and write again.
* - If KILL is the only message, return the new board.
* - Otherwise, keep using the old board.
* - If KEEP, skip.
*
* @param ep1 The keep endpoint.
* @param ep2 The kill endpoint.
* @return The keep endpoint.
*/
struct ep_t* ep_link(struct ep_t* ep1, struct ep_t* ep2) {
// Kill ep2->board.
// FWD_KILL:
// senders: ep2.self
// receivers: ep2.full
struct msg_t* p = msg_make(MSG_FWD_KILL, ep2->self, ep2->full, board_ref(ep1->board));
// Keep ep1->board.
// FWD_KEEP:
// senders: full - ep2.self
// receivers: ep2.self
struct msg_t* q = msg_make(MSG_FWD_KEEP, ep2->full & ~ep2->self, ep2->self, board_ref(ep2->board));
// Kill ep2.
ep1->self &= ep2->self;
board_write(ep2->board, p);
board_write(ep1->board, q);
ep_free(ep2);
return ep1;
}
void ep_show(struct ep_t* ep) {
char buffer[100];
int i = sprintf(buffer, "Endpoint [");
for(unsigned int mask = 0x80; mask; mask >>= 1) {
i += sprintf(buffer+i, "%d", !!(mask & ep->full));
}
i += sprintf(buffer+i, "] [");
for(unsigned int mask = 0x80; mask; mask >>= 1) {
i += sprintf(buffer+i, "%d", !!(mask & ep->self));
}
sprintf(buffer+i, "] [%s] @ %p", ep->board->id, (void*)ep);
log_debug(buffer);
}