-
Notifications
You must be signed in to change notification settings - Fork 0
/
node_server.js
203 lines (152 loc) · 6.47 KB
/
node_server.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
// CTS_NODEJS Server
// Handles CTS Frontend web sockets and
// redis subscribe events. Pushes data
// to user!
// Local Config:
var config = require('./config');
// External Package Requirements:
var querystring = require('query-string');
var redis = require('redis');
var http = require('http');
var path = require('path');
var express = require('express');
var celery = require('node-celery');
// Define server, set socket.io server to listen on said server:
var app = express();
var server = http.createServer(app);
const io = require('socket.io')(server);
var nodejs_port = config.server.port; // node server port
var nodejs_host = config.server.host; // node server host
var celery_default_timeout = config.celery.defaultTimeout; // default timeout for celery worker calls
// var celery_default_timeout = config.celery.testingTimeout; // default timeout for celery worker calls
var redis_url = 'redis://' + config.redis.host + ':' + config.redis.port + '/0'; // url for redis instance
// var redisClient = redis.createClient(redis_url);
var redisManager = redis.createClient(redis_url);
console.log("redis running at " + redis_url);
server.listen(nodejs_port); // Start Express server
console.log("Server started.. \nTest page at http://" + nodejs_host + ":" + nodejs_port + "/test");
// Create Celery celeryClient:
var celeryClient = celery.createClient({
CELERY_BROKER_URL: redis_url,
CELERY_RESULT_BACKEND: redis_url,
CELERY_ROUTES: {
'tasks.removeUserJobsFromQueue': {
queue: 'manager_queue'
},
'tasks.test_celery': {
queue: 'manager_queue'
},
'tasks.cts_task': {
queue: 'cts_queue'
}
}
});
celeryClient.on('error', function(err) {
console.log("An error occurred calling the celery worker: " + err);
});
// Celery worker test endpoint:
app.get('/test', function(req, res){
// opens test page for nodejs->celery connection
res.sendFile(path.join(__dirname + '/public/html/ws_test_page.html'));
});
io.sockets.on('connection', function (socket)
{
console.log("session id: " + socket.id);
var redisClient = redis.createClient(redis_url);
// console.log("nodejs connected to redis..");
redisClient.subscribe(socket.id); // create channel with celeryClient's socket id
// console.log("subscribed to channel " + socket.id);
// Grab message from Redis that was created by django and send to celeryClient
redisClient.on('message', function(channel, message){
console.log(">>> messaged received from celery worker via redis sub..")
console.log("Channel: " + channel);
// console.log("Message: " + message);
socket.send(message); // send to browser
});
socket.on('get_data', function (message) {
// Request event from CTS Frontend
console.log("nodejs server received message..");
var message_obj = JSON.parse(message); // parse json str to obj
parseCTSRequestToCeleryWorkers(socket.id, message_obj, socket); // here we go...
});
socket.on('disconnect', function (err) {
/*
Triggered when a user closes site or
refreshes the page
*/
// unscribe here or in removal task??
redisClient.unsubscribe(socket.id); // unsubscribe from redis channel
console.log("Calling manager worker to cancel user job upon disconnect..");
celeryClient.call('tasks.removeUserJobsFromQueue', [socket.id]);
return;
});
socket.on('error', function (err) {
console.log("A socket error occured in cts_nodejs..");
console.log(err);
});
socket.on('test_socket', function (message) {
console.log("node received message: ");
console.log(message);
socket.send("hello from nodejs! at " + config.server.host + ", port " + config.server.port);
});
// test django-cts celery worker
socket.on('test_celery', function (message) {
console.log("received message: " + message);
var query = querystring.stringify({
sessionid: socket.id, // cts will now publish to session channel
message: "hello celery"
});
// passRequestToCTS(query);
celeryClient.call('tasks.test_celery', [socket.id, 'hello celery'], function(result) {
console.log(result);
celeryClient.end();
});
});
});
function parseCTSRequestToCeleryWorkers(sessionid, data_obj, socket) {
var ctsServices = ['getSpeciationData', 'getTransProducts', 'getChemInfo'];
data_obj['sessionid'] = sessionid; // add sessionid to data object
var calc = data_obj['calc'];
var jobObject = null; // job object from calling celery worker (contains job id)
if ('cancel' in data_obj) {
// Can celery worker be canceled from here? (probably not)
celeryClient.call('tasks.removeUserJobsFromQueue', [sessionid], null, {
expires: new Date(Date.now() + celery_default_timeout)
});
// could send cancel notification to user..
// console.log("Calling manager worker to cancel user job upon disconnect...");
console.log("Sending cancel signal to 'cancel' channel");
console.log("Data Object: ");
console.log(data_obj);
socket.emit('cancel', true);
return;
}
if (ctsServices.indexOf(data_obj['service']) > -1) {
// Calls a service (basically a longer-running/more-involved request than a pchem one)
console.log("calling " + data_obj['service'] + " service..");
jobObject = celeryClient.call('tasks.cts_task', [data_obj], null, {
expires: new Date(Date.now() + celery_default_timeout)
});
}
else {
handleCeleryPchemRequest(sessionid, data_obj, socket);
}
if (jobObject) {
redisManager.rpush([sessionid, jobObject.taskid]); // add job id to user's job list
}
}
function handleCeleryPchemRequest(sessionid, data_obj, socket) {
var jobObject = null;
// Breaking user request up by calc, send to
// the respective calc celery workers:
for (var calc in data_obj['pchem_request']) {
data_obj['calc'] = calc;
console.log("sending request to " + calc + " worker");
jobObject = celeryClient.call('tasks.cts_task', [data_obj], null, {
expires: new Date(Date.now() + celery_default_timeout)
});
if (jobObject) {
redisManager.rpush([sessionid, jobObject.taskid]); // add job id to user's job list
}
}
}