Skip to content

Commit

Permalink
Merge pull request #6027 from nb-ohad/ohad-move-encode-decode-to-rpc-…
Browse files Browse the repository at this point in the history
…connetion

RPC: Decouple message creation from message encoding/decoding
  • Loading branch information
nb-ohad committed Sep 22, 2020
2 parents cf7c650 + a9c0f44 commit ed1c45e
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 117 deletions.
35 changes: 19 additions & 16 deletions src/rpc/rpc.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
/* Copyright (C) 2016 NooBaa */
'use strict';

/** @typedef {import('./rpc_base_conn')} RpcBaseConnection **/

const _ = require('lodash');
const util = require('util');
const assert = require('assert');
// const ip_module = require('ip');
const EventEmitter = require('events').EventEmitter;

const P = require('../util/promise');
const js_utils = require('../util/js_utils');
const dbg = require('../util/debug_module')(__filename);
const config = require('../../config');
const RpcError = require('./rpc_error');
const url_utils = require('../util/url_utils');
const RpcRequest = require('./rpc_request');
const RpcMessage = require('./rpc_message');
const RpcWsServer = require('./rpc_ws_server');
const RpcN2NAgent = require('./rpc_n2n_agent');
const RpcTcpServer = require('./rpc_tcp_server');
Expand Down Expand Up @@ -213,7 +217,7 @@ class RPC extends EventEmitter {
}

// send request over the connection
return req.connection.send(req._encode_request(), 'req', req);
return req.connection.send(req.make_request_message(), req);

})
.then(() => {
Expand Down Expand Up @@ -320,7 +324,7 @@ class RPC extends EventEmitter {
'reqid', msg.body.reqid,
'connid', conn.connid);
req.error = new RpcError('NO_SUCH_RPC_SERVICE', 'No such RPC Service ' + srv);
return conn.send(req._encode_response(), 'res', req);
return conn.send(req.make_response_message(), req);
}

return P.resolve()
Expand Down Expand Up @@ -378,8 +382,7 @@ class RPC extends EventEmitter {
'reqid', req.reqid,
'connid', conn.connid);

return conn.send(req._encode_response(), 'res', req);

return conn.send(req.make_response_message(), req);
})
.catch(err => {

Expand All @@ -397,7 +400,7 @@ class RPC extends EventEmitter {
req.error = new RpcError(err.rpc_code || 'INTERNAL', err.message, { retryable: true });
}

return conn.send(req._encode_response(), 'res', req);
return conn.send(req.make_response_message(), req);
});
}

Expand Down Expand Up @@ -584,14 +587,14 @@ class RPC extends EventEmitter {
conn.once('connect', () => {
if (this.routing_hint) {
dbg.log0('RPC ROUTING REQ SEND', this.routing_hint, conn.connid, conn.url.href);
conn.send(RpcRequest.encode_message({
conn.send(new RpcMessage({
op: 'routing_req',
reqid: conn._alloc_reqid(),
routing_hint: this.routing_hint,
}));
}
});
conn.on('message', msg => this._on_message(conn, msg));
conn.on('decoded_message', msg => this._on_message(conn, msg));
conn.on('close', err => this._connection_closed(conn, err));
// we let the connection handle it's own errors and decide if to close or not

Expand All @@ -617,7 +620,7 @@ class RPC extends EventEmitter {
return null;
}
P.resolve()
.then(() => conn.send(RpcRequest.encode_message({
.then(() => conn.send(new RpcMessage({
op: 'ping',
reqid: reqid
})))
Expand Down Expand Up @@ -730,10 +733,10 @@ class RPC extends EventEmitter {


/**
*
* @param {RpcBaseConnection} conn
* @param {RpcMessage} msg
*/
_on_message(conn, msg_buffer) {
const msg = RpcRequest.decode_message(msg_buffer);
_on_message(conn, msg) {
if (!msg || !msg.body) {
conn.emit('error', new Error('RPC._on_message: BAD MESSAGE' +
' typeof(msg) ' + typeof(msg) +
Expand Down Expand Up @@ -764,7 +767,7 @@ class RPC extends EventEmitter {
const routing = this._routing_authority ? this._routing_authority(msg.body.routing_hint) : undefined;

dbg.log0('RPC ROUTING RES SEND', routing, conn.connid, conn.url.href);
conn.send(RpcRequest.encode_message({
conn.send(new RpcMessage({
op: 'routing_res',
reqid: msg.body.reqid,
routing,
Expand All @@ -781,9 +784,9 @@ class RPC extends EventEmitter {
case 'ping': {
dbg.log4('RPC PONG', conn.connid);
P.resolve()
.then(() => conn.send(RpcRequest.encode_message({
.then(() => conn.send(new RpcMessage({
op: 'pong',
reqid: msg.body.reqid
reqid: msg.body.reqid,
})))
.catch(_.noop); // already means the conn is closed
break;
Expand Down Expand Up @@ -815,8 +818,8 @@ class RPC extends EventEmitter {
method_api: api.id,
method_name: method.name,
target: options.address,
request_params: params || undefined,
[RPC_BUFFERS]: params && params[RPC_BUFFERS],
request_params: js_utils.omit_symbol(params, RPC_BUFFERS),
[RPC_BUFFERS]: params && params[RPC_BUFFERS]
};

return P.resolve()
Expand Down
59 changes: 57 additions & 2 deletions src/rpc/rpc_base_conn.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/* Copyright (C) 2016 NooBaa */
'use strict';

/** @typedef {import('./rpc_request')} RpcRequest **/

const _ = require('lodash');
const EventEmitter = require('events').EventEmitter;

Expand All @@ -9,6 +11,8 @@ const dbg = require('../util/debug_module')(__filename);
const config = require('../../config');
const time_utils = require('../util/time_utils');
const RpcError = require('./rpc_error');
const RpcMessage = require('./rpc_message');


const STATE_INIT = 'init';
const STATE_CONNECTING = 'connecting';
Expand Down Expand Up @@ -39,6 +43,8 @@ class RpcBaseConnection extends EventEmitter {

this._connect_timeout_ms = config.RPC_CONNECT_TIMEOUT;

this._ping_reqid_set = undefined;

// the 'connect' event is emitted by the inherited type (http/ws/tcp/n2n)
// and is expected after calling _connect() or when a connection is accepted
// and already considered connected.
Expand All @@ -63,6 +69,16 @@ class RpcBaseConnection extends EventEmitter {
this.close(err);
});

this.on('message', encoded_msg => {
try {
var decoded_message = this._decode_message(encoded_msg);
this.emit('decoded_message', decoded_message);
} catch (err) {
dbg.error(`RPC decode message failed, got: ${err.message}`);
this.emit_error(err);
}
});

// on send failures we handle by closing and rethrowing to the caller
this.emit_error = err => this.emit('error', err);
}
Expand Down Expand Up @@ -101,13 +117,19 @@ class RpcBaseConnection extends EventEmitter {
*
* send message
*
* @param {RpcMessage} msg
* @param {RpcRequest} [req]
* @returns Promise
*/
send(msg, op, req) {
send(msg, req) {
if (this._state !== STATE_CONNECTED) {
throw new Error('RPC CONN NOT CONNECTED ' + this._state + ' ' + this.connid);
}
return P.resolve()
.then(() => this._send(msg, op, req))
.then(() => {
const encoded_message = this._encode_message(msg);
return this._send(encoded_message, msg.body && msg.body.op, req);
})
.timeout(config.RPC_SEND_TIMEOUT)
.catch(P.TimeoutError, () => this.emit_error(new RpcError('RPC_SEND_TIMEOUT', 'RPC SEND TIMEOUT')))
.catch(this.emit_error);
Expand All @@ -129,12 +151,45 @@ class RpcBaseConnection extends EventEmitter {
return this._state === STATE_CLOSED;
}

/**
* @param {RpcMessage} msg
* @returns {object}
*/
_encode_message(msg) {
return msg.encode();
}

/**
* @param {object} encoded_message
* @returns {RpcMessage}
*/
_decode_message(encoded_message) {
return RpcMessage.decode(encoded_message);
}

_alloc_reqid() {
let reqid = this._rpc_req_seq + '@' + this.connid;
this._rpc_req_seq += 1;
return reqid;
}

/* ----------------------------------------------------------------
* The following methods should be overriden by every concrete type
* extending RpcBaseConnection
* ----------------------------------------------------------------
*/
_connect() {
throw new Error("Not Implemented");
}

_send(msg, op, req) {
throw new Error("Not Implemented");
}

_close() {
throw new Error("Not Implemented");
}

}


Expand Down
34 changes: 27 additions & 7 deletions src/rpc/rpc_fcall.js
Original file line number Diff line number Diff line change
@@ -1,18 +1,38 @@
/* Copyright (C) 2016 NooBaa */
'use strict';

let _ = require('lodash');
let RpcBaseConnection = require('./rpc_base_conn');
const RpcBaseConnection = require('./rpc_base_conn');

require('setimmediate');

class RpcFcallConnection extends RpcBaseConnection {
_close() {
/* noop */
}

_connect() {
setImmediate(() => this.emit('connect'));
}

_send(msg) {
setImmediate(() => this.emit('message', msg));
}

/**
* @override
*/
_encode_message(msg) {
// A clone is needed because an RPC connection lives inside the same process.
// If the and part of the msg content will change after the message is sent
// both the sender and reciver will e effected by the change.
return msg.clone();
}

constructor(addr_url) {
super(addr_url);
this._close = _.noop;
this._connect = () => setImmediate(() => this.emit('connect'));
this._send = msg => setImmediate(() => this.emit('message', msg));
/**
* @override
*/
_decode_message(msg) {
return msg;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/rpc/rpc_http.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const dbg = require('../util/debug_module')(__filename);
const buffer_utils = require('../util/buffer_utils');
const http_utils = require('../util/http_utils');
const RpcBaseConnection = require('./rpc_base_conn');
const { RPC_VERSION_NUMBER } = require('./rpc_request');
const { RPC_VERSION_NUMBER } = require('./rpc_message');

// dbg.set_level(5);

Expand Down
Loading

0 comments on commit ed1c45e

Please sign in to comment.