From a9c0f4419789ec50e0b932d8d766ea444c26f38c Mon Sep 17 00:00:00 2001 From: nb-ohad Date: Tue, 19 May 2020 21:23:10 +0300 Subject: [PATCH] Decouple rpc message creation from encoding/decoding allowing rpc_fcall to override defualt encoding/decoding decoding Ensure cloning of rpc messages procude a clone that is more like a message decoded form buffers by: 1. Removing rpc_buffers from message body (and n2n_proxy messages) 2. serializing object ids into strings 3. ensuring empty buffers array on every message Signed-off-by: nb-ohad --- src/rpc/rpc.js | 35 +++--- src/rpc/rpc_base_conn.js | 59 ++++++++- src/rpc/rpc_fcall.js | 34 +++-- src/rpc/rpc_http.js | 2 +- src/rpc/rpc_message.js | 112 +++++++++++++++++ src/rpc/rpc_request.js | 143 ++++++++-------------- src/server/node_services/nodes_monitor.js | 2 +- src/util/js_utils.js | 36 ++++++ 8 files changed, 306 insertions(+), 117 deletions(-) create mode 100644 src/rpc/rpc_message.js diff --git a/src/rpc/rpc.js b/src/rpc/rpc.js index 23e44ac384..22812c27d9 100644 --- a/src/rpc/rpc.js +++ b/src/rpc/rpc.js @@ -1,6 +1,8 @@ /* Copyright (C) 2016 NooBaa */ 'use strict'; +/** @typedef {import('./rpc_base_conn')} RpcBaseConnection **/ + const _ = require('lodash'); const util = require('util'); const assert = require('assert'); @@ -8,11 +10,13 @@ const assert = require('assert'); const EventEmitter = require('events').EventEmitter; const P = require('../util/promise'); +const js_utils = require('../util/js_utils'); const dbg = require('../util/debug_module')(__filename); const config = require('../../config'); const RpcError = require('./rpc_error'); const url_utils = require('../util/url_utils'); const RpcRequest = require('./rpc_request'); +const RpcMessage = require('./rpc_message'); const RpcWsServer = require('./rpc_ws_server'); const RpcN2NAgent = require('./rpc_n2n_agent'); const RpcTcpServer = require('./rpc_tcp_server'); @@ -213,7 +217,7 @@ class RPC extends EventEmitter { } // send request over the connection - return req.connection.send(req._encode_request(), 'req', req); + return req.connection.send(req.make_request_message(), req); }) .then(() => { @@ -320,7 +324,7 @@ class RPC extends EventEmitter { 'reqid', msg.body.reqid, 'connid', conn.connid); req.error = new RpcError('NO_SUCH_RPC_SERVICE', 'No such RPC Service ' + srv); - return conn.send(req._encode_response(), 'res', req); + return conn.send(req.make_response_message(), req); } return P.resolve() @@ -378,8 +382,7 @@ class RPC extends EventEmitter { 'reqid', req.reqid, 'connid', conn.connid); - return conn.send(req._encode_response(), 'res', req); - + return conn.send(req.make_response_message(), req); }) .catch(err => { @@ -397,7 +400,7 @@ class RPC extends EventEmitter { req.error = new RpcError(err.rpc_code || 'INTERNAL', err.message, { retryable: true }); } - return conn.send(req._encode_response(), 'res', req); + return conn.send(req.make_response_message(), req); }); } @@ -584,14 +587,14 @@ class RPC extends EventEmitter { conn.once('connect', () => { if (this.routing_hint) { dbg.log0('RPC ROUTING REQ SEND', this.routing_hint, conn.connid, conn.url.href); - conn.send(RpcRequest.encode_message({ + conn.send(new RpcMessage({ op: 'routing_req', reqid: conn._alloc_reqid(), routing_hint: this.routing_hint, })); } }); - conn.on('message', msg => this._on_message(conn, msg)); + conn.on('decoded_message', msg => this._on_message(conn, msg)); conn.on('close', err => this._connection_closed(conn, err)); // we let the connection handle it's own errors and decide if to close or not @@ -617,7 +620,7 @@ class RPC extends EventEmitter { return null; } P.resolve() - .then(() => conn.send(RpcRequest.encode_message({ + .then(() => conn.send(new RpcMessage({ op: 'ping', reqid: reqid }))) @@ -730,10 +733,10 @@ class RPC extends EventEmitter { /** - * + * @param {RpcBaseConnection} conn + * @param {RpcMessage} msg */ - _on_message(conn, msg_buffer) { - const msg = RpcRequest.decode_message(msg_buffer); + _on_message(conn, msg) { if (!msg || !msg.body) { conn.emit('error', new Error('RPC._on_message: BAD MESSAGE' + ' typeof(msg) ' + typeof(msg) + @@ -764,7 +767,7 @@ class RPC extends EventEmitter { const routing = this._routing_authority ? this._routing_authority(msg.body.routing_hint) : undefined; dbg.log0('RPC ROUTING RES SEND', routing, conn.connid, conn.url.href); - conn.send(RpcRequest.encode_message({ + conn.send(new RpcMessage({ op: 'routing_res', reqid: msg.body.reqid, routing, @@ -781,9 +784,9 @@ class RPC extends EventEmitter { case 'ping': { dbg.log4('RPC PONG', conn.connid); P.resolve() - .then(() => conn.send(RpcRequest.encode_message({ + .then(() => conn.send(new RpcMessage({ op: 'pong', - reqid: msg.body.reqid + reqid: msg.body.reqid, }))) .catch(_.noop); // already means the conn is closed break; @@ -815,8 +818,8 @@ class RPC extends EventEmitter { method_api: api.id, method_name: method.name, target: options.address, - request_params: params || undefined, - [RPC_BUFFERS]: params && params[RPC_BUFFERS], + request_params: js_utils.omit_symbol(params, RPC_BUFFERS), + [RPC_BUFFERS]: params && params[RPC_BUFFERS] }; return P.resolve() diff --git a/src/rpc/rpc_base_conn.js b/src/rpc/rpc_base_conn.js index ad8fafd872..1539db24c3 100644 --- a/src/rpc/rpc_base_conn.js +++ b/src/rpc/rpc_base_conn.js @@ -1,6 +1,8 @@ /* Copyright (C) 2016 NooBaa */ 'use strict'; +/** @typedef {import('./rpc_request')} RpcRequest **/ + const _ = require('lodash'); const EventEmitter = require('events').EventEmitter; @@ -9,6 +11,8 @@ const dbg = require('../util/debug_module')(__filename); const config = require('../../config'); const time_utils = require('../util/time_utils'); const RpcError = require('./rpc_error'); +const RpcMessage = require('./rpc_message'); + const STATE_INIT = 'init'; const STATE_CONNECTING = 'connecting'; @@ -39,6 +43,8 @@ class RpcBaseConnection extends EventEmitter { this._connect_timeout_ms = config.RPC_CONNECT_TIMEOUT; + this._ping_reqid_set = undefined; + // the 'connect' event is emitted by the inherited type (http/ws/tcp/n2n) // and is expected after calling _connect() or when a connection is accepted // and already considered connected. @@ -63,6 +69,16 @@ class RpcBaseConnection extends EventEmitter { this.close(err); }); + this.on('message', encoded_msg => { + try { + var decoded_message = this._decode_message(encoded_msg); + this.emit('decoded_message', decoded_message); + } catch (err) { + dbg.error(`RPC decode message failed, got: ${err.message}`); + this.emit_error(err); + } + }); + // on send failures we handle by closing and rethrowing to the caller this.emit_error = err => this.emit('error', err); } @@ -101,13 +117,19 @@ class RpcBaseConnection extends EventEmitter { * * send message * + * @param {RpcMessage} msg + * @param {RpcRequest} [req] + * @returns Promise */ - send(msg, op, req) { + send(msg, req) { if (this._state !== STATE_CONNECTED) { throw new Error('RPC CONN NOT CONNECTED ' + this._state + ' ' + this.connid); } return P.resolve() - .then(() => this._send(msg, op, req)) + .then(() => { + const encoded_message = this._encode_message(msg); + return this._send(encoded_message, msg.body && msg.body.op, req); + }) .timeout(config.RPC_SEND_TIMEOUT) .catch(P.TimeoutError, () => this.emit_error(new RpcError('RPC_SEND_TIMEOUT', 'RPC SEND TIMEOUT'))) .catch(this.emit_error); @@ -129,12 +151,45 @@ class RpcBaseConnection extends EventEmitter { return this._state === STATE_CLOSED; } + /** + * @param {RpcMessage} msg + * @returns {object} + */ + _encode_message(msg) { + return msg.encode(); + } + + /** + * @param {object} encoded_message + * @returns {RpcMessage} + */ + _decode_message(encoded_message) { + return RpcMessage.decode(encoded_message); + } + _alloc_reqid() { let reqid = this._rpc_req_seq + '@' + this.connid; this._rpc_req_seq += 1; return reqid; } + /* ---------------------------------------------------------------- + * The following methods should be overriden by every concrete type + * extending RpcBaseConnection + * ---------------------------------------------------------------- + */ + _connect() { + throw new Error("Not Implemented"); + } + + _send(msg, op, req) { + throw new Error("Not Implemented"); + } + + _close() { + throw new Error("Not Implemented"); + } + } diff --git a/src/rpc/rpc_fcall.js b/src/rpc/rpc_fcall.js index be30ec77c8..91373b7c30 100644 --- a/src/rpc/rpc_fcall.js +++ b/src/rpc/rpc_fcall.js @@ -1,18 +1,38 @@ /* Copyright (C) 2016 NooBaa */ 'use strict'; -let _ = require('lodash'); -let RpcBaseConnection = require('./rpc_base_conn'); +const RpcBaseConnection = require('./rpc_base_conn'); require('setimmediate'); class RpcFcallConnection extends RpcBaseConnection { + _close() { + /* noop */ + } + + _connect() { + setImmediate(() => this.emit('connect')); + } + + _send(msg) { + setImmediate(() => this.emit('message', msg)); + } + + /** + * @override + */ + _encode_message(msg) { + // A clone is needed because an RPC connection lives inside the same process. + // If the and part of the msg content will change after the message is sent + // both the sender and reciver will e effected by the change. + return msg.clone(); + } - constructor(addr_url) { - super(addr_url); - this._close = _.noop; - this._connect = () => setImmediate(() => this.emit('connect')); - this._send = msg => setImmediate(() => this.emit('message', msg)); + /** + * @override + */ + _decode_message(msg) { + return msg; } } diff --git a/src/rpc/rpc_http.js b/src/rpc/rpc_http.js index 6b115d6e6a..bdd6437da9 100644 --- a/src/rpc/rpc_http.js +++ b/src/rpc/rpc_http.js @@ -10,7 +10,7 @@ const dbg = require('../util/debug_module')(__filename); const buffer_utils = require('../util/buffer_utils'); const http_utils = require('../util/http_utils'); const RpcBaseConnection = require('./rpc_base_conn'); -const { RPC_VERSION_NUMBER } = require('./rpc_request'); +const { RPC_VERSION_NUMBER } = require('./rpc_message'); // dbg.set_level(5); diff --git a/src/rpc/rpc_message.js b/src/rpc/rpc_message.js new file mode 100644 index 0000000000..9e325f4188 --- /dev/null +++ b/src/rpc/rpc_message.js @@ -0,0 +1,112 @@ +/* Copyright (C) 2016 NooBaa */ +'use strict'; + +const _ = require('lodash'); +const buffer_utils = require('../util/buffer_utils'); +const mongo_utils = require('../util/mongo_utils'); + +const RPC_VERSION_MAGIC = 0xba; +const RPC_VERSION_MAJOR = 0; +const RPC_VERSION_MINOR = 0; +const RPC_VERSION_FLAGS = 0; +const RPC_VERSION_NUMBER = Buffer.from([ + RPC_VERSION_MAGIC, + RPC_VERSION_MAJOR, + RPC_VERSION_MINOR, + RPC_VERSION_FLAGS, +]).readUInt32BE(0); + +// Encoding and decoding of rpc messages is a non-symmetric operation +// when using cloning we need to replicate this non symmetry as much as +// possible in order to ensure complaince on both sides of the RPC call. +function clone_customizer(value) { + // Mongo object ids are always serialized into strings and never deserialized back. + if (mongo_utils.is_object_id(value)) { + return value.toJSON(); + } + + // Explicity returning undefined to empesis the need to return + // undefined and not the original value. + return undefined; +} + +/** + * RpcMessage represents a self-contained message that rpc sends/receives over connections. + * The `body.op` property specifies the type of message but the message encoding is agnostic to it. + * Known message ops are: `req`, `res`, `ping`, `pong`, `routing_req`, `routing_res`. + * + * Buffers can be attached to any message in order to send raw bytes. + * Note that the message does not preserve the buffers on the receiver side. + * Multiple buffers might collapse to a single one or vice versa. + * The only guarantee is that `Buffer.concat(buffers)` will contain the same bytes. + * So the body itself should provide the information needed to parse the buffers. + * Refer to how RpcRequest is setting body.buffers to contain the size of every attachment. + */ +class RpcMessage { + static get RPC_VERSION_NUMBER() { + return RPC_VERSION_NUMBER; + } + + /** + * @param {Object} body is the main payload of the message (json encoded) + * @param {Buffer[]} [buffers] optional list of buffers to append the body + */ + constructor(body, buffers) { + this.body = body; + this.buffers = buffers || []; + } + + /** + * @returns {RpcMessage} + */ + clone() { + const { body, buffers } = this; + return new RpcMessage( + _.cloneDeepWith(body, clone_customizer), + buffers.map(Buffer.from) + ); + } + + /** + * @returns {Buffer[]} + */ + encode() { + const { body, buffers = [] } = this; + const meta_buffer = Buffer.allocUnsafe(8); + const body_buffer = Buffer.from(JSON.stringify(body)); + meta_buffer.writeUInt32BE(RPC_VERSION_NUMBER, 0); + meta_buffer.writeUInt32BE(body_buffer.length, 4); + const msg_buffers = [ + meta_buffer, + body_buffer, + ...buffers + ]; + return msg_buffers; + } + + /** + * @param {Buffer[]} msg_buffers + * @returns {RpcMessage} + */ + static decode(msg_buffers) { + const meta_buffer = buffer_utils.extract_join(msg_buffers, 8); + const version = meta_buffer.readUInt32BE(0); + if (version !== RPC_VERSION_NUMBER) { + const magic = meta_buffer.readUInt8(0); + const major = meta_buffer.readUInt8(1); + const minor = meta_buffer.readUInt8(2); + const flags = meta_buffer.readUInt8(3); + if (magic !== RPC_VERSION_MAGIC) throw new Error('RPC VERSION MAGIC MISMATCH'); + if (major !== RPC_VERSION_MAJOR) throw new Error('RPC VERSION MAJOR MISMATCH'); + if (minor !== RPC_VERSION_MINOR) throw new Error('RPC VERSION MINOR MISMATCH'); + if (flags !== RPC_VERSION_FLAGS) throw new Error('RPC VERSION FLAGS MISMATCH'); + throw new Error('RPC VERSION MISMATCH'); + } + const body_length = meta_buffer.readUInt32BE(4); + const body = JSON.parse(buffer_utils.extract_join(msg_buffers, body_length)); + + return new RpcMessage(body, msg_buffers); + } +} + +module.exports = RpcMessage; diff --git a/src/rpc/rpc_request.js b/src/rpc/rpc_request.js index aa7e405e6b..2a8ab4326c 100644 --- a/src/rpc/rpc_request.js +++ b/src/rpc/rpc_request.js @@ -2,20 +2,12 @@ 'use strict'; const _ = require('lodash'); +const RpcMessage = require('./rpc_message'); const RpcError = require('./rpc_error'); const time_utils = require('../util/time_utils'); const buffer_utils = require('../util/buffer_utils'); +const js_utils = require('../util/js_utils'); -const RPC_VERSION_MAGIC = 0xba; -const RPC_VERSION_MAJOR = 0; -const RPC_VERSION_MINOR = 0; -const RPC_VERSION_FLAGS = 0; -const RPC_VERSION_NUMBER = Buffer.from([ - RPC_VERSION_MAGIC, - RPC_VERSION_MAJOR, - RPC_VERSION_MINOR, - RPC_VERSION_FLAGS, -]).readUInt32BE(0); const RPC_BUFFERS = Symbol('RPC_BUFFERS'); @@ -24,6 +16,10 @@ const RPC_BUFFERS = Symbol('RPC_BUFFERS'); */ class RpcRequest { + static get RPC_BUFFERS() { + return RPC_BUFFERS; + } + constructor() { this.ts = time_utils.millistamp(); this.connection = undefined; @@ -51,63 +47,59 @@ class RpcRequest { this.srv = api.id + '.' + method_api.name; } - static encode_message(body, buffers) { - const meta_buffer = Buffer.allocUnsafe(8); - const body_buffer = Buffer.from(JSON.stringify(body)); - meta_buffer.writeUInt32BE(RPC_VERSION_NUMBER, 0); - meta_buffer.writeUInt32BE(body_buffer.length, 4); - const msg_buffers = buffers ? [ - meta_buffer, - body_buffer, - ...buffers - ] : [ - meta_buffer, - body_buffer - ]; - return msg_buffers; - } - - static decode_message(msg_buffers) { - const meta_buffer = buffer_utils.extract_join(msg_buffers, 8); - const version = meta_buffer.readUInt32BE(0); - if (version !== RPC_VERSION_NUMBER) { - const magic = meta_buffer.readUInt8(0); - const major = meta_buffer.readUInt8(1); - const minor = meta_buffer.readUInt8(2); - const flags = meta_buffer.readUInt8(3); - if (magic !== RPC_VERSION_MAGIC) throw new Error('RPC VERSION MAGIC MISMATCH'); - if (major !== RPC_VERSION_MAJOR) throw new Error('RPC VERSION MAJOR MISMATCH'); - if (minor !== RPC_VERSION_MINOR) throw new Error('RPC VERSION MINOR MISMATCH'); - if (flags !== RPC_VERSION_FLAGS) throw new Error('RPC VERSION FLAGS MISMATCH'); - throw new Error('RPC VERSION MISMATCH'); - } - const body_length = meta_buffer.readUInt32BE(4); - const body = JSON.parse(buffer_utils.extract_join(msg_buffers, body_length)); - return { - body, - buffers: msg_buffers - }; - } - - _encode_request() { + /** + * @returns {RpcMessage} + */ + make_request_message() { + // The undefined is here to handle the case where the RPC_BUFFERS are sometimes set to null. + const rpc_buffers = this.params && this.params[RPC_BUFFERS]; const body = { op: 'req', reqid: this.reqid, api: this.api.id, method: this.method_api.name, - params: this.params, + params: this.params && js_utils.omit_symbol(this.params, RPC_BUFFERS), auth_token: this.auth_token || undefined, - buffers: (this.params && this.params[RPC_BUFFERS]) || undefined, + buffers: rpc_buffers && _.map(rpc_buffers, (buf, name) => ({ + name, + len: buf.length + })) }; - let buffers; - if (body.buffers) { - buffers = []; - body.buffers = _.map(body.buffers, (buf, name) => { - buffers.push(buf); - return { name, len: buf.length }; - }); + return new RpcMessage( + body, + rpc_buffers && Object.values(rpc_buffers) + ); + } + + /** + * @returns {RpcMessage} + */ + make_response_message() { + const body = { + op: 'res', + reqid: this.reqid, + took: time_utils.millistamp() - this.ts, + }; + + if (this.error) { + // copy the error to a plain object because otherwise + // the message is not encoded by + body.error = _.pick(this.error, 'message', 'rpc_code', 'rpc_data'); + return new RpcMessage(body); + + } else { + const rpc_buffers = this.reply && this.reply[RPC_BUFFERS]; + body.reply = js_utils.omit_symbol(this.reply, RPC_BUFFERS); + body.buffers = rpc_buffers && _.map(rpc_buffers, (buf, name) => ({ + name, + len: buf.length + })); + + return new RpcMessage( + body, + rpc_buffers && Object.values(rpc_buffers) + ); } - return RpcRequest.encode_message(body, buffers); } _set_request(msg, api, method_api) { @@ -116,8 +108,8 @@ class RpcRequest { this.method_api = method_api; this.params = msg.body.params; this.auth_token = msg.body.auth_token; - this.srv = (api ? api.id : '?') + - '.' + (method_api ? method_api.name : '?'); + this.srv = `${api ? api.id : '?'}.${method_api ? method_api.name : '?'}`; + if (msg.body.buffers) { const buffers = {}; _.forEach(msg.body.buffers, a => { @@ -127,31 +119,6 @@ class RpcRequest { } } - _encode_response() { - const body = { - op: 'res', - reqid: this.reqid, - took: time_utils.millistamp() - this.ts, - }; - let buffers; - if (this.error) { - // copy the error to a plain object because otherwise - // the message is not encoded by - body.error = _.pick(this.error, 'message', 'rpc_code', 'rpc_data'); - } else { - body.reply = this.reply; - body.buffers = this.reply && this.reply[RPC_BUFFERS]; - if (body.buffers) { - buffers = []; - body.buffers = _.map(body.buffers, (buf, name) => { - buffers.push(buf); - return { name, len: buf.length }; - }); - } - } - return RpcRequest.encode_message(body, buffers); - } - _set_response(msg) { const is_pending = this._response_defer.promise.isPending(); if (!is_pending) { @@ -181,10 +148,6 @@ class RpcRequest { this.took_total = time_utils.millistamp() - this.ts; this.took_flight = this.took_total - this.took_srv; } - } -RpcRequest.RPC_BUFFERS = RPC_BUFFERS; -RpcRequest.RPC_VERSION_NUMBER = RPC_VERSION_NUMBER; - module.exports = RpcRequest; diff --git a/src/server/node_services/nodes_monitor.js b/src/server/node_services/nodes_monitor.js index 706ec9f75b..9accd8e5d8 100644 --- a/src/server/node_services/nodes_monitor.js +++ b/src/server/node_services/nodes_monitor.js @@ -3279,7 +3279,7 @@ class NodesMonitor extends EventEmitter { } ) .then(reply => ({ - proxy_reply: reply, + proxy_reply: js_utils.omit_symbol(reply, RPC_BUFFERS), [RPC_BUFFERS]: reply && reply[RPC_BUFFERS], })); } diff --git a/src/util/js_utils.js b/src/util/js_utils.js index 33269cf091..3488d9e8d7 100644 --- a/src/util/js_utils.js +++ b/src/util/js_utils.js @@ -196,6 +196,41 @@ function map_get_or_create(map, key, item_initializer) { } } +/** + * Enable easier usage of Object.hasOwnProperty + * + * @param {Object} obj + * @param {String|Symbol} prop_name_or_sym + * @returns {Boolean} + */ +function hasOwnProperty(obj, prop_name_or_sym) { + return Object.prototype.hasOwnProperty.call(obj, prop_name_or_sym); +} + +/** + * Unlike lodash omit, this omit will not convert null, undefined, value typed, + * arrays or functions into an object (empty or not) and will not clone the passed + * object if the symbol does not exists on the object own properties + * + * @template T + * @param {T} maybe_obj + * @param {symbol} sym + * @returns {Omit | T} + */ + function omit_symbol(maybe_obj, sym) { + if ( + !_.isObjectLike(maybe_obj) || + Array.isArray(maybe_obj) || + !hasOwnProperty(maybe_obj, sym) + ) { + return maybe_obj; + } + + const obj = /** @type {object} */ (maybe_obj); + return _.omit(obj, sym); + } + + exports.self_bind = self_bind; exports.array_push_all = array_push_all; exports.array_push_keep_latest = array_push_keep_latest; @@ -208,3 +243,4 @@ exports.PackedObject = PackedObject; exports.inspect_lazy = inspect_lazy; exports.make_array = make_array; exports.map_get_or_create = map_get_or_create; +exports.omit_symbol = omit_symbol;