From 791958e79cee633efcabd48776d3cc485f52412c Mon Sep 17 00:00:00 2001 From: Guilherme Gazzo Date: Tue, 1 Feb 2022 18:03:22 -0300 Subject: [PATCH 01/11] Reduce and remove complexity --- ee/app/license/server/index.ts | 5 +- .../license/server/license.internalService.ts | 42 +- ee/app/license/server/license.ts | 4 + ee/server/NetworkBroker.ts | 225 +++++++++ ee/server/broker.ts | 442 ++++-------------- server/sdk/lib/Api.ts | 10 +- server/sdk/lib/Events.ts | 2 + server/sdk/lib/LocalBroker.ts | 8 + server/sdk/lib/proxify.ts | 5 + server/sdk/types/IBroker.ts | 5 + 10 files changed, 399 insertions(+), 349 deletions(-) create mode 100644 ee/server/NetworkBroker.ts diff --git a/ee/app/license/server/index.ts b/ee/app/license/server/index.ts index b8795ba7f981..c74919b50bc6 100644 --- a/ee/app/license/server/index.ts +++ b/ee/app/license/server/index.ts @@ -1,8 +1,11 @@ -import './license.internalService'; import './settings'; import './methods'; import './startup'; +import { LicenseService } from './license.internalService'; +import { api } from '../../../../server/sdk/api'; export { onLicense, overwriteClassOnLicense, isEnterprise, getMaxGuestUsers } from './license'; export { getStatistics } from './getStatistics'; + +api.registerService(new LicenseService()); diff --git a/ee/app/license/server/license.internalService.ts b/ee/app/license/server/license.internalService.ts index 41572b6e82b3..c7ce66c493d0 100644 --- a/ee/app/license/server/license.internalService.ts +++ b/ee/app/license/server/license.internalService.ts @@ -6,7 +6,7 @@ import { resetEnterprisePermissions } from '../../authorization/server/resetEnte import { Authorization } from '../../../../server/sdk'; import { guestPermissions } from '../../authorization/lib/guestPermissions'; -class LicenseService extends ServiceClass implements ILicense { +export class LicenseService extends ServiceClass implements ILicense { protected name = 'license'; protected internal = true; @@ -26,6 +26,44 @@ class LicenseService extends ServiceClass implements ILicense { onModule((licenseModule) => { api.broadcast('license.module', licenseModule); }); + + this.onEvent('$services.changed', async () => { + // if (hasModule('scalability')) { + // return; + // } + + const services: { + name: string; + nodes: string[]; + }[] = await api.call('$node.services'); + + /* The main idea is if there is no scalability module enabled, + * then we should not allow more than one service per environment. + * So we list the services and the nodes, and if there is more than + * one, we inform the service that it should be disabled. + */ + + // Filter only the services are duplicated + const duplicated = services.filter((service) => { + return service.name !== '$node' && service.nodes.length > 1; + }); + + if (!duplicated.length) { + return; + } + + const brokers: Record = Object.fromEntries( + duplicated.map((service) => { + const [, ...nodes] = service.nodes; + return [service.name, nodes]; + }), + ); + + // Just inform the service that it should be disabled + + const duplicatedServicesNames = duplicated.map((service) => service.name); + api.broadcastToServices(duplicatedServicesNames, 'shutdown', brokers); + }); } async started(): Promise { @@ -49,5 +87,3 @@ class LicenseService extends ServiceClass implements ILicense { return getModules(); } } - -api.registerService(new LicenseService()); diff --git a/ee/app/license/server/license.ts b/ee/app/license/server/license.ts index a4da074e3e88..40ae20858a36 100644 --- a/ee/app/license/server/license.ts +++ b/ee/app/license/server/license.ts @@ -311,6 +311,10 @@ export function onValidFeature(feature: BundleFeature, cb: () => void): () => vo }; } +export const hasModule = (module: string): boolean => { + return License.hasModule(module); +}; + export function onInvalidFeature(feature: BundleFeature, cb: () => void): () => void { EnterpriseLicenses.on(`invalid:${feature}`, cb); diff --git a/ee/server/NetworkBroker.ts b/ee/server/NetworkBroker.ts new file mode 100644 index 000000000000..bac219fcc2f8 --- /dev/null +++ b/ee/server/NetworkBroker.ts @@ -0,0 +1,225 @@ +import { ServiceBroker, Context, ServiceSchema } from 'moleculer'; + +import { asyncLocalStorage } from '../../server/sdk'; +import { api } from '../../server/sdk/api'; +import { IBroker, IBrokerNode, IServiceMetrics } from '../../server/sdk/types/IBroker'; +import { ServiceClass } from '../../server/sdk/types/ServiceClass'; +import { EventSignatures } from '../../server/sdk/lib/Events'; + +const events: { [k: string]: string } = { + onNodeConnected: '$node.connected', + onNodeUpdated: '$node.updated', + onNodeDisconnected: '$node.disconnected', +}; + +const lifecycle: { [k: string]: string } = { + created: 'created', + started: 'started', + stopped: 'stopped', +}; + +const { + INTERNAL_SERVICES_ONLY = 'false', + // SERVICES_ALLOWED = '', + WAIT_FOR_SERVICES_TIMEOUT = '10000', // 10 seconds + WAIT_FOR_SERVICES_WHITELIST_TIMEOUT = '600000', // 10 minutes +} = process.env; + +const waitForServicesTimeout = parseInt(WAIT_FOR_SERVICES_TIMEOUT, 10) || 10000; +const waitForServicesWhitelistTimeout = parseInt(WAIT_FOR_SERVICES_WHITELIST_TIMEOUT, 10) || 600000; + +export class NetworkBroker implements IBroker { + private broker: ServiceBroker; + + private started: Promise; + + private whitelist = { + events: ['license.module', 'watch.settings'], + actions: ['license.hasLicense'], + }; + + // whether only internal services are allowed to be registered + private internalOnly = ['true', 'yes'].includes(INTERNAL_SERVICES_ONLY.toLowerCase()); + + metrics: IServiceMetrics; + + constructor(broker: ServiceBroker) { + this.broker = broker; + + api.setBroker(this); + + this.metrics = broker.metrics; + + this.started = this.broker.start(); + } + + isWhitelisted(list: string[], item: string): boolean { + return list.includes(item); + } + + isActionWhitelisted(method: string): boolean { + return this.isWhitelisted(this.whitelist.actions, method); + } + + isEventWhitelisted(event: string): boolean { + return this.isWhitelisted(this.whitelist.events, event); + } + + async call(method: string, data: any): Promise { + await this.started; + + const context = asyncLocalStorage.getStore(); + + if (context?.ctx?.call) { + return context.ctx.call(method, data); + } + + const services: { name: string }[] = await this.broker.call('$node.services', { + onlyAvailable: true, + }); + if (!services.find((service) => service.name === method.split('.')[0])) { + return new Error('method-not-available'); + } + return this.broker.call(method, data); + } + + async waitAndCall(method: string, data: any): Promise { + await this.started; + + try { + await this.broker.waitForServices( + method.split('.')[0], + this.isActionWhitelisted(method) ? waitForServicesWhitelistTimeout : waitForServicesTimeout, + ); + } catch (err) { + console.error(err); + } + + return this.call(method, data); + } + + destroyService(instance: ServiceClass): void { + this.broker.destroyService(instance.getName()); + } + + createService(instance: ServiceClass): void { + if (!this.isServiceAllowed(instance)) { + return; + } + + const name = instance.getName(); + + if (!this.isServiceInternal(instance)) { + instance.onEvent('shutdown', async (services) => { + const service = services[name]; + if (!service) { + return; + } + this.destroyService(instance); + }); + } + + const service: ServiceSchema = { + name, + actions: {}, + ...(name !== 'license' + ? { + dependencies: ['license'], + } + : {}), + events: instance.getEvents().reduce>((map, eventName) => { + map[eventName] = /^\$/.test(eventName) + ? (data: Parameters): void => { + instance.emit(eventName, data); + } + : (data: Parameters): void => { + instance.emit(eventName, ...data); + }; + return map; + }, {}), + }; + + if (!service.events || !service.actions) { + return; + } + + const methods = + instance.constructor?.name === 'Object' + ? Object.getOwnPropertyNames(instance) + : Object.getOwnPropertyNames(Object.getPrototypeOf(instance)); + for (const method of methods) { + if (method === 'constructor') { + continue; + } + + const i = instance as any; + + if (method.match(/^on[A-Z]/)) { + service.events[events[method]] = i[method].bind(i); + continue; + } + + if (lifecycle[method]) { + service[method] = (): void => + asyncLocalStorage.run( + { + id: '', + nodeID: this.broker.nodeID, + requestID: null, + broker: this, + }, + i[method].bind(i), + ); + continue; + } + + service.actions[method] = async (ctx: Context<[]>): Promise => { + return asyncLocalStorage.run( + { + id: ctx.id, + nodeID: ctx.nodeID, + requestID: ctx.requestID, + broker: this, + ctx, + }, + () => i[method](...ctx.params), + ); + }; + } + + this.broker.createService(service); + } + + async broadcast(event: T, ...args: Parameters): Promise { + return this.broker.broadcast(event, args); + } + + async broadcastLocal(event: T, ...args: Parameters): Promise { + this.broker.broadcastLocal(event, args); + } + + async broadcastToServices( + services: string[], + event: T, + ...args: Parameters + ): Promise { + this.broker.broadcast(event, args, services); + } + + async nodeList(): Promise { + return this.broker.call('$node.list'); + } + + private isServiceInternal(instance: ServiceClass): boolean { + return !(this.internalOnly && !instance.isInternal()); + } + + private isServiceAllowed(instance: ServiceClass): boolean { + // allow only internal services if internalOnly is true + if (this.internalOnly && !instance.isInternal()) { + return false; + } + + return true; + } +} diff --git a/ee/server/broker.ts b/ee/server/broker.ts index 017e370af19e..fbd87ad61030 100644 --- a/ee/server/broker.ts +++ b/ee/server/broker.ts @@ -1,268 +1,3 @@ -import { ServiceBroker, Context, ServiceSchema, Serializers } from 'moleculer'; -import EJSON from 'ejson'; - -import { asyncLocalStorage, License } from '../../server/sdk'; -import { api } from '../../server/sdk/api'; -import { IBroker, IBrokerNode, IServiceMetrics } from '../../server/sdk/types/IBroker'; -import { ServiceClass } from '../../server/sdk/types/ServiceClass'; -import { EventSignatures } from '../../server/sdk/lib/Events'; -import { LocalBroker } from '../../server/sdk/lib/LocalBroker'; - -const events: { [k: string]: string } = { - onNodeConnected: '$node.connected', - onNodeUpdated: '$node.updated', - onNodeDisconnected: '$node.disconnected', -}; - -const lifecycle: { [k: string]: string } = { - created: 'created', - started: 'started', - stopped: 'stopped', -}; - -const { - INTERNAL_SERVICES_ONLY = 'false', - SERVICES_ALLOWED = '', - WAIT_FOR_SERVICES_TIMEOUT = '10000', // 10 seconds - WAIT_FOR_SERVICES_WHITELIST_TIMEOUT = '600000', // 10 minutes -} = process.env; - -const waitForServicesTimeout = parseInt(WAIT_FOR_SERVICES_TIMEOUT, 10) || 10000; -const waitForServicesWhitelistTimeout = parseInt(WAIT_FOR_SERVICES_WHITELIST_TIMEOUT, 10) || 600000; - -class NetworkBroker implements IBroker { - private broker: ServiceBroker; - - private localBroker = new LocalBroker(); - - private allowed: Promise; - - private started: Promise; - - private whitelist = { - events: ['license.module', 'watch.settings'], - actions: ['license.hasLicense'], - }; - - // whether only internal services are allowed to be registered - private internalOnly = ['true', 'yes'].includes(INTERNAL_SERVICES_ONLY.toLowerCase()); - - // list of allowed services to run - has precedence over `internalOnly` - private allowedList = new Set( - SERVICES_ALLOWED?.split(',') - .map((i) => i.trim()) - .filter((i) => i), - ); - - metrics: IServiceMetrics; - - constructor(broker: ServiceBroker) { - this.broker = broker; - - api.setBroker(this); - - this.metrics = broker.metrics; - - this.started = this.broker.start(); - - this.allowed = License.hasLicense('scalability'); - } - - isWhitelisted(list: string[], item: string): boolean { - return list.includes(item); - } - - isActionWhitelisted(method: string): boolean { - return this.isWhitelisted(this.whitelist.actions, method); - } - - isEventWhitelisted(event: string): boolean { - return this.isWhitelisted(this.whitelist.events, event); - } - - async call(method: string, data: any): Promise { - await this.started; - - if (!(this.isActionWhitelisted(method) || (await this.allowed))) { - return this.localBroker.call(method, data); - } - - const context = asyncLocalStorage.getStore(); - if (context?.ctx?.call) { - return context.ctx.call(method, data); - } - - const services: { name: string }[] = await this.broker.call('$node.services', { - onlyAvailable: true, - }); - if (!services.find((service) => service.name === method.split('.')[0])) { - return new Error('method-not-available'); - } - return this.broker.call(method, data); - } - - async waitAndCall(method: string, data: any): Promise { - await this.started; - - if (!(this.isActionWhitelisted(method) || (await this.allowed))) { - return this.localBroker.call(method, data); - } - - try { - await this.broker.waitForServices( - method.split('.')[0], - this.isActionWhitelisted(method) ? waitForServicesWhitelistTimeout : waitForServicesTimeout, - ); - } catch (err) { - console.error(err); - } - - const context = asyncLocalStorage.getStore(); - if (context?.ctx?.call) { - return context.ctx.call(method, data); - } - - return this.broker.call(method, data); - } - - destroyService(instance: ServiceClass): void { - this.localBroker.destroyService(instance); - - this.broker.destroyService(instance.getName()); - } - - createService(instance: ServiceClass): void { - if (!this.isServiceAllowed(instance)) { - return; - } - - this.localBroker.createService(instance); - - const name = instance.getName(); - - // Listen for module license - instance.onEvent('license.module', async ({ module, valid }) => { - if (module === 'scalability') { - // Should we believe on the event only? Could it be a call from the CE version? - this.allowed = valid ? License.hasLicense('scalability') : Promise.resolve(false); - // console.log('on license.module', { allowed: this.allowed }); - } - }); - - const service: ServiceSchema = { - name, - actions: {}, - events: instance.getEvents().reduce>((map, eventName) => { - if (this.isEventWhitelisted(eventName)) { - map[eventName] = (data: Parameters): void => instance.emit(eventName, ...data); - return map; - } - - map[eventName] = (data: Parameters): any => { - this.allowed.then((allowed) => allowed && instance.emit(eventName, ...data)); - }; - - return map; - }, {}), - }; - - if (!service.events || !service.actions) { - return; - } - - const methods = - instance.constructor?.name === 'Object' - ? Object.getOwnPropertyNames(instance) - : Object.getOwnPropertyNames(Object.getPrototypeOf(instance)); - for (const method of methods) { - if (method === 'constructor') { - continue; - } - - const i = instance as any; - - if (method.match(/^on[A-Z]/)) { - service.events[events[method]] = i[method].bind(i); - continue; - } - - if (lifecycle[method]) { - service[method] = (): void => - asyncLocalStorage.run( - { - id: '', - nodeID: this.broker.nodeID, - requestID: null, - broker: this, - }, - i[method].bind(i), - ); - continue; - } - - service.actions[method] = async (ctx: Context<[]>): Promise => { - if (!this.isActionWhitelisted(`${name}.${method}`) && !(await this.allowed)) { - return; - } - - return asyncLocalStorage.run( - { - id: ctx.id, - nodeID: ctx.nodeID, - requestID: ctx.requestID, - broker: this, - ctx, - }, - () => i[method](...ctx.params), - ); - }; - } - - this.broker.createService(service); - } - - async broadcast(event: T, ...args: Parameters): Promise { - if (!(this.isEventWhitelisted(event) || (await this.allowed))) { - return this.localBroker.broadcast(event, ...args); - } - return this.broker.broadcast(event, args); - } - - async broadcastLocal(event: T, ...args: Parameters): Promise { - this.broker.broadcastLocal(event, args); - } - - async nodeList(): Promise { - return this.broker.call('$node.list'); - } - - private isServiceAllowed(instance: ServiceClass): boolean { - // check if the service is in the list of allowed services if the list is not empty - if (this.allowedList.size > 0 && !this.allowedList.has(instance.getName())) { - return false; - } - - // allow only internal services if internalOnly is true - if (this.internalOnly && !instance.isInternal()) { - return false; - } - - return true; - } -} - -const Base = Serializers.Base as unknown as new () => {}; - -class EJSONSerializer extends Base { - serialize(obj: {}): Buffer { - return Buffer.from(EJSON.stringify(obj)); - } - - deserialize(buf: Buffer): any { - return EJSON.parse(buf.toString()); - } -} - const { TRANSPORTER = '', CACHE = 'Memory', @@ -290,93 +25,112 @@ const { // only starts network broker if transporter properly configured if (TRANSPORTER.match(/^(?:nats|TCP)/)) { - const network = new ServiceBroker({ - // TODO: Reevaluate, without this setting it was preventing the process to stop - skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true', - transporter: TRANSPORTER, - metrics: { - enabled: MS_METRICS === 'true', - reporter: [ - { - type: 'Prometheus', - options: { - port: MS_METRICS_PORT, + (async (): Promise => { + const { ServiceBroker, Serializers } = await import('moleculer'); + const EJSON = (await import('ejson')).default; + + const { NetworkBroker } = await import('./NetworkBroker'); + const { api } = await import('../../server/sdk/api'); + const Base = Serializers.Base as unknown as new () => {}; + + class EJSONSerializer extends Base { + serialize(obj: {}): Buffer { + return Buffer.from(EJSON.stringify(obj)); + } + + deserialize(buf: Buffer): any { + return EJSON.parse(buf.toString()); + } + } + + const network = new ServiceBroker({ + // TODO: Reevaluate, without this setting it was preventing the process to stop + skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true', + transporter: TRANSPORTER, + metrics: { + enabled: MS_METRICS === 'true', + reporter: [ + { + type: 'Prometheus', + options: { + port: MS_METRICS_PORT, + }, }, + ], + }, + cacher: CACHE, + serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER, + logLevel: MOLECULER_LOG_LEVEL as any, + // logLevel: { + // // "TRACING": "trace", + // // "TRANS*": "warn", + // BROKER: 'debug', + // TRANSIT: 'debug', + // '**': 'info', + // }, + logger: { + type: 'Console', + options: { + formatter: 'short', }, - ], - }, - cacher: CACHE, - serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER, - logLevel: MOLECULER_LOG_LEVEL as any, - // logLevel: { - // // "TRACING": "trace", - // // "TRANS*": "warn", - // BROKER: 'debug', - // TRANSIT: 'debug', - // '**': 'info', - // }, - logger: { - type: 'Console', - options: { - formatter: 'short', }, - }, - registry: { - strategy: BALANCE_STRATEGY, - preferLocal: BALANCE_PREFER_LOCAL !== 'false', - }, - - requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000, - retryPolicy: { - enabled: RETRY_ENABLED === 'true', - retries: parseInt(RETRY_RETRIES), - delay: parseInt(RETRY_DELAY), - maxDelay: parseInt(RETRY_MAX_DELAY), - factor: parseInt(RETRY_FACTOR), - check: (err: any): boolean => err && !!err.retryable, - }, - - maxCallLevel: 100, - heartbeatInterval: parseInt(HEARTBEAT_INTERVAL), - heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT), + registry: { + strategy: BALANCE_STRATEGY, + preferLocal: BALANCE_PREFER_LOCAL !== 'false', + }, - // circuitBreaker: { - // enabled: false, - // threshold: 0.5, - // windowTime: 60, - // minRequestCount: 20, - // halfOpenTime: 10 * 1000, - // check: (err: any): boolean => err && err.code >= 500, - // }, + requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000, + retryPolicy: { + enabled: RETRY_ENABLED === 'true', + retries: parseInt(RETRY_RETRIES), + delay: parseInt(RETRY_DELAY), + maxDelay: parseInt(RETRY_MAX_DELAY), + factor: parseInt(RETRY_FACTOR), + check: (err: any): boolean => err && !!err.retryable, + }, - bulkhead: { - enabled: BULKHEAD_ENABLED === 'true', - concurrency: parseInt(BULKHEAD_CONCURRENCY), - maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE), - }, + maxCallLevel: 100, + heartbeatInterval: parseInt(HEARTBEAT_INTERVAL), + heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT), + + // circuitBreaker: { + // enabled: false, + // threshold: 0.5, + // windowTime: 60, + // minRequestCount: 20, + // halfOpenTime: 10 * 1000, + // check: (err: any): boolean => err && err.code >= 500, + // }, + + bulkhead: { + enabled: BULKHEAD_ENABLED === 'true', + concurrency: parseInt(BULKHEAD_CONCURRENCY), + maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE), + }, - tracing: { - enabled: TRACING_ENABLED === 'true', - exporter: { - type: 'Jaeger', - options: { - endpoint: null, - host: 'jaeger', - port: 6832, - sampler: { - // Sampler type. More info: https://www.jaegertracing.io/docs/1.14/sampling/#client-sampling-configuration - type: 'Const', - // Sampler specific options. - options: {}, + tracing: { + enabled: TRACING_ENABLED === 'true', + exporter: { + type: 'Jaeger', + options: { + endpoint: null, + host: 'jaeger', + port: 6832, + sampler: { + // Sampler type. More info: https://www.jaegertracing.io/docs/1.14/sampling/#client-sampling-configuration + type: 'Const', + // Sampler specific options. + options: {}, + }, + // Additional options for `Jaeger.Tracer` + tracerOptions: {}, + // Default tags. They will be added into all span tags. + defaultTags: null, }, - // Additional options for `Jaeger.Tracer` - tracerOptions: {}, - // Default tags. They will be added into all span tags. - defaultTags: null, }, }, - }, - }); + }); - new NetworkBroker(network); + api.setBroker(new NetworkBroker(network)); + })(); } diff --git a/server/sdk/lib/Api.ts b/server/sdk/lib/Api.ts index 7631341c1d82..9669b0fa5a11 100644 --- a/server/sdk/lib/Api.ts +++ b/server/sdk/lib/Api.ts @@ -36,7 +36,7 @@ export class Api { } } - async call(method: string, data: any): Promise { + async call(method: string, data?: unknown): Promise { return this.broker.call(method, data); } @@ -48,6 +48,14 @@ export class Api { return this.broker.broadcast(event, ...args); } + async broadcastToServices( + services: string[], + event: T, + ...args: Parameters + ): Promise { + return this.broker.broadcastToServices(services, event, ...args); + } + async broadcastLocal(event: T, ...args: Parameters): Promise { return this.broker.broadcastLocal(event, ...args); } diff --git a/server/sdk/lib/Events.ts b/server/sdk/lib/Events.ts index f8efb671fff5..1631324bbe21 100644 --- a/server/sdk/lib/Events.ts +++ b/server/sdk/lib/Events.ts @@ -20,6 +20,8 @@ import { ISocketConnection } from '../../../definition/ISocketConnection'; type ClientAction = 'inserted' | 'updated' | 'removed' | 'changed'; export type EventSignatures = { + 'shutdown': (params: Record) => void; + '$services.changed': (info: unknown) => void; 'accounts.login': (info: { userId: string; connection: ISocketConnection }) => void; 'accounts.logout': (info: { userId: string; connection: ISocketConnection }) => void; 'socket.connected': (connection: ISocketConnection) => void; diff --git a/server/sdk/lib/LocalBroker.ts b/server/sdk/lib/LocalBroker.ts index 74a87da9bb71..33dfd979135d 100644 --- a/server/sdk/lib/LocalBroker.ts +++ b/server/sdk/lib/LocalBroker.ts @@ -82,6 +82,14 @@ export class LocalBroker implements IBroker { this.events.emit(event, ...args); } + async broadcastToServices( + _services: string[], + event: T, + ...args: Parameters + ): Promise { + this.events.emit(event, ...args); + } + async nodeList(): Promise { return []; } diff --git a/server/sdk/lib/proxify.ts b/server/sdk/lib/proxify.ts index 4957250633a4..ec157ebd93c1 100644 --- a/server/sdk/lib/proxify.ts +++ b/server/sdk/lib/proxify.ts @@ -25,6 +25,11 @@ function handler(namespace: string, waitService: boolean): Pro }; } +/* + * @deprecated + * + * instead, declare the service dependencies in the constructor + */ export function proxifyWithWait(namespace: string): Prom { return new Proxy({}, handler(namespace, true)) as unknown as Prom; } diff --git a/server/sdk/types/IBroker.ts b/server/sdk/types/IBroker.ts index ba60b8bd83e8..c1621a2f2ea9 100644 --- a/server/sdk/types/IBroker.ts +++ b/server/sdk/types/IBroker.ts @@ -51,6 +51,11 @@ export interface IBroker { createService(service: ServiceClass): void; call(method: string, data: any): Promise; waitAndCall(method: string, data: any): Promise; + broadcastToServices( + services: string[], + event: T, + ...args: Parameters + ): Promise; broadcast(event: T, ...args: Parameters): Promise; broadcastLocal(event: T, ...args: Parameters): Promise; nodeList(): Promise; From 9c52976d50d8b403270dbaf29d9e8dd1d257c8f3 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Wed, 9 Feb 2022 17:46:36 -0300 Subject: [PATCH 02/11] Fix logic to shutdown services --- ee/server/NetworkBroker.ts | 29 ++++------------------------- ee/server/broker.ts | 3 +-- 2 files changed, 5 insertions(+), 27 deletions(-) diff --git a/ee/server/NetworkBroker.ts b/ee/server/NetworkBroker.ts index bac219fcc2f8..90715964ab3a 100644 --- a/ee/server/NetworkBroker.ts +++ b/ee/server/NetworkBroker.ts @@ -19,8 +19,6 @@ const lifecycle: { [k: string]: string } = { }; const { - INTERNAL_SERVICES_ONLY = 'false', - // SERVICES_ALLOWED = '', WAIT_FOR_SERVICES_TIMEOUT = '10000', // 10 seconds WAIT_FOR_SERVICES_WHITELIST_TIMEOUT = '600000', // 10 minutes } = process.env; @@ -38,9 +36,6 @@ export class NetworkBroker implements IBroker { actions: ['license.hasLicense'], }; - // whether only internal services are allowed to be registered - private internalOnly = ['true', 'yes'].includes(INTERNAL_SERVICES_ONLY.toLowerCase()); - metrics: IServiceMetrics; constructor(broker: ServiceBroker) { @@ -103,18 +98,15 @@ export class NetworkBroker implements IBroker { } createService(instance: ServiceClass): void { - if (!this.isServiceAllowed(instance)) { - return; - } - const name = instance.getName(); - if (!this.isServiceInternal(instance)) { + if (!instance.isInternal()) { instance.onEvent('shutdown', async (services) => { - const service = services[name]; - if (!service) { + if (!services[name]?.includes(this.broker.nodeID)) { + this.broker.logger.debug({ msg: 'Not shutting down, different node.', nodeID: this.broker.nodeID }); return; } + this.broker.logger.info({ msg: 'Received shutdown event, destroying service.', nodeID: this.broker.nodeID }); this.destroyService(instance); }); } @@ -209,17 +201,4 @@ export class NetworkBroker implements IBroker { async nodeList(): Promise { return this.broker.call('$node.list'); } - - private isServiceInternal(instance: ServiceClass): boolean { - return !(this.internalOnly && !instance.isInternal()); - } - - private isServiceAllowed(instance: ServiceClass): boolean { - // allow only internal services if internalOnly is true - if (this.internalOnly && !instance.isInternal()) { - return false; - } - - return true; - } } diff --git a/ee/server/broker.ts b/ee/server/broker.ts index fbd87ad61030..5cc67f725e08 100644 --- a/ee/server/broker.ts +++ b/ee/server/broker.ts @@ -20,7 +20,7 @@ const { MS_METRICS = 'false', MS_METRICS_PORT = '9458', TRACING_ENABLED = 'false', - SKIP_PROCESS_EVENT_REGISTRATION = 'true', + SKIP_PROCESS_EVENT_REGISTRATION = 'false', } = process.env; // only starts network broker if transporter properly configured @@ -44,7 +44,6 @@ if (TRANSPORTER.match(/^(?:nats|TCP)/)) { } const network = new ServiceBroker({ - // TODO: Reevaluate, without this setting it was preventing the process to stop skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true', transporter: TRANSPORTER, metrics: { From 38efdc978b9eaa1ae737f1a1ac473d69d9c79597 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Wed, 9 Feb 2022 17:48:11 -0300 Subject: [PATCH 03/11] Add ServiceClassInternal for internal services --- .../server/roomAccessValidator.internalService.ts | 7 ++----- app/search/server/search.internalService.ts | 4 ++-- .../server/roomAccessValidator.internalService.ts | 7 ++----- ee/app/license/server/license.internalService.ts | 12 ++++++++---- ee/app/settings/server/settings.internalService.ts | 7 ++----- ee/server/local-services/ldap/service.ts | 7 ++----- server/sdk/types/ServiceClass.ts | 8 ++++++++ server/services/analytics/service.ts | 4 ++-- server/services/banner/service.ts | 4 ++-- server/services/image/service.ts | 4 ++-- server/services/ldap/service.ts | 4 ++-- server/services/meteor/service.ts | 6 ++---- server/services/nps/service.ts | 4 ++-- server/services/room/service.ts | 4 ++-- server/services/sauMonitor/service.ts | 4 ++-- server/services/team/service.ts | 4 ++-- server/services/uikit-core-app/service.ts | 4 ++-- 17 files changed, 46 insertions(+), 48 deletions(-) diff --git a/app/livechat/server/roomAccessValidator.internalService.ts b/app/livechat/server/roomAccessValidator.internalService.ts index f7e94219b18c..24536a938480 100644 --- a/app/livechat/server/roomAccessValidator.internalService.ts +++ b/app/livechat/server/roomAccessValidator.internalService.ts @@ -1,11 +1,10 @@ -import { ServiceClass } from '../../../server/sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../../server/sdk/types/ServiceClass'; import { IAuthorizationLivechat } from '../../../server/sdk/types/IAuthorizationLivechat'; import { validators } from './roomAccessValidator.compatibility'; -import { api } from '../../../server/sdk/api'; import { IRoom } from '../../../definition/IRoom'; import { IUser } from '../../../definition/IUser'; -class AuthorizationLivechat extends ServiceClass implements IAuthorizationLivechat { +export class AuthorizationLivechat extends ServiceClassInternal implements IAuthorizationLivechat { protected name = 'authorization-livechat'; protected internal = true; @@ -20,5 +19,3 @@ class AuthorizationLivechat extends ServiceClass implements IAuthorizationLivech return false; } } - -api.registerService(new AuthorizationLivechat()); diff --git a/app/search/server/search.internalService.ts b/app/search/server/search.internalService.ts index 1e5efaa43569..03ed08742c04 100644 --- a/app/search/server/search.internalService.ts +++ b/app/search/server/search.internalService.ts @@ -1,11 +1,11 @@ import { Users } from '../../models/server'; import { settings } from '../../settings/server'; import { searchProviderService } from './service/providerService'; -import { ServiceClass } from '../../../server/sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../../server/sdk/types/ServiceClass'; import { api } from '../../../server/sdk/api'; import { searchEventService } from './events/events'; -class Search extends ServiceClass { +class Search extends ServiceClassInternal { protected name = 'search'; protected internal = true; diff --git a/app/tokenpass/server/roomAccessValidator.internalService.ts b/app/tokenpass/server/roomAccessValidator.internalService.ts index 0fcd51425b81..e6710593c82e 100644 --- a/app/tokenpass/server/roomAccessValidator.internalService.ts +++ b/app/tokenpass/server/roomAccessValidator.internalService.ts @@ -1,11 +1,10 @@ -import { ServiceClass } from '../../../server/sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../../server/sdk/types/ServiceClass'; import { validators } from './roomAccessValidator.compatibility'; -import { api } from '../../../server/sdk/api'; import { IAuthorizationTokenpass } from '../../../server/sdk/types/IAuthorizationTokenpass'; import { IRoom } from '../../../definition/IRoom'; import { IUser } from '../../../definition/IUser'; -class AuthorizationTokenpass extends ServiceClass implements IAuthorizationTokenpass { +export class AuthorizationTokenpass extends ServiceClassInternal implements IAuthorizationTokenpass { protected name = 'authorization-tokenpass'; protected internal = true; @@ -20,5 +19,3 @@ class AuthorizationTokenpass extends ServiceClass implements IAuthorizationToken return false; } } - -api.registerService(new AuthorizationTokenpass()); diff --git a/ee/app/license/server/license.internalService.ts b/ee/app/license/server/license.internalService.ts index c7ce66c493d0..aa5a11bbb525 100644 --- a/ee/app/license/server/license.internalService.ts +++ b/ee/app/license/server/license.internalService.ts @@ -1,4 +1,4 @@ -import { ServiceClass } from '../../../../server/sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../../../server/sdk/types/ServiceClass'; import { api } from '../../../../server/sdk/api'; import { ILicense } from '../../../../server/sdk/types/ILicense'; import { hasLicense, isEnterprise, getModules, onValidateLicenses, onModule } from './license'; @@ -6,11 +6,9 @@ import { resetEnterprisePermissions } from '../../authorization/server/resetEnte import { Authorization } from '../../../../server/sdk'; import { guestPermissions } from '../../authorization/lib/guestPermissions'; -export class LicenseService extends ServiceClass implements ILicense { +export class LicenseService extends ServiceClassInternal implements ILicense { protected name = 'license'; - protected internal = true; - constructor() { super(); @@ -37,6 +35,8 @@ export class LicenseService extends ServiceClass implements ILicense { nodes: string[]; }[] = await api.call('$node.services'); + // console.log('services ->', services); + /* The main idea is if there is no scalability module enabled, * then we should not allow more than one service per environment. * So we list the services and the nodes, and if there is more than @@ -48,6 +48,8 @@ export class LicenseService extends ServiceClass implements ILicense { return service.name !== '$node' && service.nodes.length > 1; }); + console.log('duplicated ->', duplicated); + if (!duplicated.length) { return; } @@ -59,6 +61,8 @@ export class LicenseService extends ServiceClass implements ILicense { }), ); + console.log('brokers ->', brokers); + // Just inform the service that it should be disabled const duplicatedServicesNames = duplicated.map((service) => service.name); diff --git a/ee/app/settings/server/settings.internalService.ts b/ee/app/settings/server/settings.internalService.ts index f0e37efdbcb1..cc8c7631622c 100644 --- a/ee/app/settings/server/settings.internalService.ts +++ b/ee/app/settings/server/settings.internalService.ts @@ -1,10 +1,9 @@ -import { ServiceClass } from '../../../../server/sdk/types/ServiceClass'; -import { api } from '../../../../server/sdk/api'; +import { ServiceClassInternal } from '../../../../server/sdk/types/ServiceClass'; import { IEnterpriseSettings } from '../../../../server/sdk/types/IEnterpriseSettings'; import { changeSettingValue } from './settings'; import { ISetting } from '../../../../definition/ISetting'; -class EnterpriseSettings extends ServiceClass implements IEnterpriseSettings { +export class EnterpriseSettings extends ServiceClassInternal implements IEnterpriseSettings { protected name = 'ee-settings'; protected internal = true; @@ -13,5 +12,3 @@ class EnterpriseSettings extends ServiceClass implements IEnterpriseSettings { return changeSettingValue(record); } } - -api.registerService(new EnterpriseSettings()); diff --git a/ee/server/local-services/ldap/service.ts b/ee/server/local-services/ldap/service.ts index 977f8d5a0055..d81c7c93fe3b 100644 --- a/ee/server/local-services/ldap/service.ts +++ b/ee/server/local-services/ldap/service.ts @@ -1,11 +1,10 @@ import '../../broker'; -import { api } from '../../../../server/sdk/api'; import { LDAPEEManager } from '../../lib/ldap/Manager'; import { ILDAPEEService } from '../../sdk/types/ILDAPEEService'; -import { ServiceClass } from '../../../../server/sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../../../server/sdk/types/ServiceClass'; -export class LDAPEEService extends ServiceClass implements ILDAPEEService { +export class LDAPEEService extends ServiceClassInternal implements ILDAPEEService { protected name = 'ldap-enterprise'; async sync(): Promise { @@ -20,5 +19,3 @@ export class LDAPEEService extends ServiceClass implements ILDAPEEService { return LDAPEEManager.syncLogout(); } } - -api.registerService(new LDAPEEService()); diff --git a/server/sdk/types/ServiceClass.ts b/server/sdk/types/ServiceClass.ts index 11663862710c..88de362841d0 100644 --- a/server/sdk/types/ServiceClass.ts +++ b/server/sdk/types/ServiceClass.ts @@ -72,3 +72,11 @@ export abstract class ServiceClass implements IServiceClass { this.events.emit(event, ...args); } } + +/** + * An internal service is a service that is registered only on monolith node. + * Services that run on their own node should use @ServiceClass instead. + */ +export abstract class ServiceClassInternal extends ServiceClass { + protected internal = true; +} diff --git a/server/services/analytics/service.ts b/server/services/analytics/service.ts index 4baa37bb7935..3fb07ce690e5 100644 --- a/server/services/analytics/service.ts +++ b/server/services/analytics/service.ts @@ -1,11 +1,11 @@ import type { Db } from 'mongodb'; -import { ServiceClass } from '../../sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../sdk/types/ServiceClass'; import { IAnalyticsService } from '../../sdk/types/IAnalyticsService'; import { AnalyticsRaw } from '../../../app/models/server/raw/Analytics'; import { IAnalyticsSeatRequest } from '../../../definition/IAnalytic'; -export class AnalyticsService extends ServiceClass implements IAnalyticsService { +export class AnalyticsService extends ServiceClassInternal implements IAnalyticsService { protected name = 'analytics'; private Analytics: AnalyticsRaw; diff --git a/server/services/banner/service.ts b/server/services/banner/service.ts index f2e62e7f1778..1db40f603789 100644 --- a/server/services/banner/service.ts +++ b/server/services/banner/service.ts @@ -1,7 +1,7 @@ import { Db } from 'mongodb'; import { v4 as uuidv4 } from 'uuid'; -import { ServiceClass } from '../../sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../sdk/types/ServiceClass'; import { BannersRaw } from '../../../app/models/server/raw/Banners'; import { BannersDismissRaw } from '../../../app/models/server/raw/BannersDismiss'; import { UsersRaw } from '../../../app/models/server/raw/Users'; @@ -11,7 +11,7 @@ import { api } from '../../sdk/api'; import { IUser } from '../../../definition/IUser'; import { Optional } from '../../../definition/utils'; -export class BannerService extends ServiceClass implements IBannerService { +export class BannerService extends ServiceClassInternal implements IBannerService { protected name = 'banner'; private Banners: BannersRaw; diff --git a/server/services/image/service.ts b/server/services/image/service.ts index d2b515cde66e..a55eda2241aa 100644 --- a/server/services/image/service.ts +++ b/server/services/image/service.ts @@ -4,14 +4,14 @@ import fileType from 'file-type'; import sharp from 'sharp'; import isSvg from 'is-svg'; -import { ServiceClass } from '../../sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../sdk/types/ServiceClass'; import { IMediaService, ResizeResult } from '../../sdk/types/IMediaService'; /* eslint-disable @typescript-eslint/no-var-requires */ const ExifTransformer = require('exif-be-gone'); /* eslint-enable @typescript-eslint/no-var-requires */ -export class MediaService extends ServiceClass implements IMediaService { +export class MediaService extends ServiceClassInternal implements IMediaService { protected name = 'media'; private imageExts = new Set([ diff --git a/server/services/ldap/service.ts b/server/services/ldap/service.ts index f08c3a4188e3..e868686ad4a8 100644 --- a/server/services/ldap/service.ts +++ b/server/services/ldap/service.ts @@ -1,9 +1,9 @@ import { LDAPManager } from '../../lib/ldap/Manager'; import { ILDAPService } from '../../sdk/types/ILDAPService'; -import { ServiceClass } from '../../sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../sdk/types/ServiceClass'; import { LDAPLoginResult } from '../../../definition/ldap/ILDAPLoginResult'; -export class LDAPService extends ServiceClass implements ILDAPService { +export class LDAPService extends ServiceClassInternal implements ILDAPService { protected name = 'ldap'; async loginRequest(username: string, password: string): Promise { diff --git a/server/services/meteor/service.ts b/server/services/meteor/service.ts index 52ec7ad5eb8e..0fdff3853f17 100644 --- a/server/services/meteor/service.ts +++ b/server/services/meteor/service.ts @@ -4,7 +4,7 @@ import { UserPresenceMonitor, UserPresence } from 'meteor/konecty:user-presence' import { MongoInternals } from 'meteor/mongo'; import { metrics } from '../../../app/metrics'; -import { ServiceClass } from '../../sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../sdk/types/ServiceClass'; import { IMeteor, AutoUpdateRecord } from '../../sdk/types/IMeteor'; import { api } from '../../sdk/api'; import { Users } from '../../../app/models/server/raw/index'; @@ -133,11 +133,9 @@ settings.set = use(settings.set, (context, next) => { updateValue(record._id, record); }); -export class MeteorService extends ServiceClass implements IMeteor { +export class MeteorService extends ServiceClassInternal implements IMeteor { protected name = 'meteor'; - protected internal = true; - constructor() { super(); diff --git a/server/services/nps/service.ts b/server/services/nps/service.ts index 08a238dd235a..be2c194e26c2 100644 --- a/server/services/nps/service.ts +++ b/server/services/nps/service.ts @@ -7,12 +7,12 @@ import { NpsVoteRaw } from '../../../app/models/server/raw/NpsVote'; import { SettingsRaw } from '../../../app/models/server/raw/Settings'; import { NPSStatus, INpsVoteStatus, INpsVote, INps } from '../../../definition/INps'; import { INPSService, NPSVotePayload, NPSCreatePayload } from '../../sdk/types/INPSService'; -import { ServiceClass } from '../../sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../sdk/types/ServiceClass'; import { Banner, NPS } from '../../sdk'; import { sendNpsResults } from './sendNpsResults'; import { getBannerForAdmins, notifyAdmins } from './notification'; -export class NPSService extends ServiceClass implements INPSService { +export class NPSService extends ServiceClassInternal implements INPSService { protected name = 'nps'; private Nps: NpsRaw; diff --git a/server/services/room/service.ts b/server/services/room/service.ts index 91a4b6789963..a8f4652fdf0e 100644 --- a/server/services/room/service.ts +++ b/server/services/room/service.ts @@ -1,6 +1,6 @@ import { Db } from 'mongodb'; -import { ServiceClass } from '../../sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../sdk/types/ServiceClass'; import { ICreateRoomParams, IRoomService } from '../../sdk/types/IRoomService'; import { Authorization } from '../../sdk'; import { IRoom } from '../../../definition/IRoom'; @@ -8,7 +8,7 @@ import { UsersRaw } from '../../../app/models/server/raw/Users'; import { createRoom } from '../../../app/lib/server/functions/createRoom'; // TODO remove this import import { IUser } from '../../../definition/IUser'; -export class RoomService extends ServiceClass implements IRoomService { +export class RoomService extends ServiceClassInternal implements IRoomService { protected name = 'room'; private Users: UsersRaw; diff --git a/server/services/sauMonitor/service.ts b/server/services/sauMonitor/service.ts index 200fa058e24e..007c5317a1c8 100644 --- a/server/services/sauMonitor/service.ts +++ b/server/services/sauMonitor/service.ts @@ -1,10 +1,10 @@ // import type { Db } from 'mongodb'; -import { ServiceClass } from '../../sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../sdk/types/ServiceClass'; import { ISAUMonitorService } from '../../sdk/types/ISAUMonitorService'; import { sauEvents } from './events'; -export class SAUMonitorService extends ServiceClass implements ISAUMonitorService { +export class SAUMonitorService extends ServiceClassInternal implements ISAUMonitorService { protected name = 'sau-monitor'; constructor() { diff --git a/server/services/team/service.ts b/server/services/team/service.ts index c13300ab0305..a8ff78cbcff9 100644 --- a/server/services/team/service.ts +++ b/server/services/team/service.ts @@ -26,13 +26,13 @@ import { ITeamService, ITeamUpdateData, } from '../../sdk/types/ITeamService'; -import { ServiceClass } from '../../sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../sdk/types/ServiceClass'; import { canAccessRoom } from '../authorization/canAccessRoom'; import { saveRoomName } from '../../../app/channel-settings/server'; import { saveRoomType } from '../../../app/channel-settings/server/functions/saveRoomType'; import { ISubscription } from '../../../definition/ISubscription'; -export class TeamService extends ServiceClass implements ITeamService { +export class TeamService extends ServiceClassInternal implements ITeamService { protected name = 'team'; private TeamModel: TeamRaw; diff --git a/server/services/uikit-core-app/service.ts b/server/services/uikit-core-app/service.ts index 180089e1e717..c1da2df7f1d5 100644 --- a/server/services/uikit-core-app/service.ts +++ b/server/services/uikit-core-app/service.ts @@ -1,4 +1,4 @@ -import { ServiceClass } from '../../sdk/types/ServiceClass'; +import { ServiceClassInternal } from '../../sdk/types/ServiceClass'; import { IUiKitCoreApp, IUiKitCoreAppService } from '../../sdk/types/IUiKitCoreApp'; const registeredApps = new Map(); @@ -17,7 +17,7 @@ export const registerCoreApp = (module: IUiKitCoreApp): void => { registeredApps.set(module.appId, module); }; -export class UiKitCoreApp extends ServiceClass implements IUiKitCoreAppService { +export class UiKitCoreApp extends ServiceClassInternal implements IUiKitCoreAppService { protected name = 'uikit-core-app'; async isRegistered(appId: string): Promise { From 2daf83769d6b0316fd243175a91f7e87c1e7786d Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Wed, 9 Feb 2022 17:49:26 -0300 Subject: [PATCH 04/11] Do not register internal services when using MS --- ee/app/license/server/index.ts | 4 ---- ee/server/startup/index.ts | 1 + ee/server/startup/services.ts | 9 +++++++++ server/services/startup.ts | 34 +++++++++++++++++++++++----------- 4 files changed, 33 insertions(+), 15 deletions(-) create mode 100644 ee/server/startup/services.ts diff --git a/ee/app/license/server/index.ts b/ee/app/license/server/index.ts index c74919b50bc6..f7d83ed388b8 100644 --- a/ee/app/license/server/index.ts +++ b/ee/app/license/server/index.ts @@ -1,11 +1,7 @@ import './settings'; import './methods'; import './startup'; -import { LicenseService } from './license.internalService'; -import { api } from '../../../../server/sdk/api'; export { onLicense, overwriteClassOnLicense, isEnterprise, getMaxGuestUsers } from './license'; export { getStatistics } from './getStatistics'; - -api.registerService(new LicenseService()); diff --git a/ee/server/startup/index.ts b/ee/server/startup/index.ts index 91cfb4839c7c..8b861f051e7e 100644 --- a/ee/server/startup/index.ts +++ b/ee/server/startup/index.ts @@ -1,2 +1,3 @@ import './engagementDashboard'; import './seatsCap'; +import './services'; diff --git a/ee/server/startup/services.ts b/ee/server/startup/services.ts new file mode 100644 index 000000000000..a3294a943a4d --- /dev/null +++ b/ee/server/startup/services.ts @@ -0,0 +1,9 @@ +import { api } from '../../../server/sdk/api'; +import { EnterpriseSettings } from '../../app/settings/server/settings.internalService'; +import { LDAPEEService } from '../local-services/ldap/service'; +import { LicenseService } from '../../app/license/server/license.internalService'; + +// TODO consider registering these services only after a valid license is added +api.registerService(new EnterpriseSettings()); +api.registerService(new LDAPEEService()); +api.registerService(new LicenseService()); diff --git a/server/services/startup.ts b/server/services/startup.ts index 4312d1452c4b..66d2c3aa400b 100644 --- a/server/services/startup.ts +++ b/server/services/startup.ts @@ -1,28 +1,40 @@ import { MongoInternals } from 'meteor/mongo'; +import { AnalyticsService } from './analytics/service'; import { api } from '../sdk/api'; -import { Authorization } from './authorization/service'; +import { AuthorizationLivechat } from '../../app/livechat/server/roomAccessValidator.internalService'; +import { AuthorizationTokenpass } from '../../app/tokenpass/server/roomAccessValidator.internalService'; import { BannerService } from './banner/service'; +import { LDAPService } from './ldap/service'; +import { MediaService } from './image/service'; import { MeteorService } from './meteor/service'; import { NPSService } from './nps/service'; import { RoomService } from './room/service'; +import { SAUMonitorService } from './sauMonitor/service'; import { TeamService } from './team/service'; import { UiKitCoreApp } from './uikit-core-app/service'; -import { MediaService } from './image/service'; -import { AnalyticsService } from './analytics/service'; -import { LDAPService } from './ldap/service'; -import { SAUMonitorService } from './sauMonitor/service'; const { db } = MongoInternals.defaultRemoteCollectionDriver().mongo; -api.registerService(new Authorization(db)); +api.registerService(new AnalyticsService(db)); +api.registerService(new AuthorizationLivechat()); +api.registerService(new AuthorizationTokenpass()); api.registerService(new BannerService(db)); +api.registerService(new LDAPService()); +api.registerService(new MediaService()); api.registerService(new MeteorService()); -api.registerService(new UiKitCoreApp()); api.registerService(new NPSService(db)); api.registerService(new RoomService(db)); -api.registerService(new TeamService(db)); -api.registerService(new MediaService()); -api.registerService(new AnalyticsService(db)); -api.registerService(new LDAPService()); api.registerService(new SAUMonitorService()); +api.registerService(new TeamService(db)); +api.registerService(new UiKitCoreApp()); + +// if TRANSPORTER env var it means the process is running in micro services mode +// in that case we don't need to register services that will run separately +if (!process.env.TRANSPORTER?.match(/^(?:nats|TCP)/)) { + (async (): Promise => { + const { Authorization } = await import('./authorization/service'); + + api.registerService(new Authorization(db)); + })(); +} From d4db10655d611b75dbfd358fbddff1884ae5c246 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Wed, 9 Feb 2022 18:07:10 -0300 Subject: [PATCH 05/11] Validate license --- .../license/server/license.internalService.ts | 94 +++++++++---------- ee/app/license/server/license.ts | 4 - 2 files changed, 47 insertions(+), 51 deletions(-) diff --git a/ee/app/license/server/license.internalService.ts b/ee/app/license/server/license.internalService.ts index aa5a11bbb525..69c0e0f483ff 100644 --- a/ee/app/license/server/license.internalService.ts +++ b/ee/app/license/server/license.internalService.ts @@ -1,10 +1,12 @@ -import { ServiceClassInternal } from '../../../../server/sdk/types/ServiceClass'; +import { debounce } from 'underscore'; + +import { Authorization } from '../../../../server/sdk'; import { api } from '../../../../server/sdk/api'; import { ILicense } from '../../../../server/sdk/types/ILicense'; -import { hasLicense, isEnterprise, getModules, onValidateLicenses, onModule } from './license'; -import { resetEnterprisePermissions } from '../../authorization/server/resetEnterprisePermissions'; -import { Authorization } from '../../../../server/sdk'; +import { ServiceClassInternal } from '../../../../server/sdk/types/ServiceClass'; import { guestPermissions } from '../../authorization/lib/guestPermissions'; +import { resetEnterprisePermissions } from '../../authorization/server/resetEnterprisePermissions'; +import { getModules, hasLicense, isEnterprise, onModule, onValidateLicenses } from './license'; export class LicenseService extends ServiceClassInternal implements ILicense { protected name = 'license'; @@ -25,49 +27,47 @@ export class LicenseService extends ServiceClassInternal implements ILicense { api.broadcast('license.module', licenseModule); }); - this.onEvent('$services.changed', async () => { - // if (hasModule('scalability')) { - // return; - // } - - const services: { - name: string; - nodes: string[]; - }[] = await api.call('$node.services'); - - // console.log('services ->', services); - - /* The main idea is if there is no scalability module enabled, - * then we should not allow more than one service per environment. - * So we list the services and the nodes, and if there is more than - * one, we inform the service that it should be disabled. - */ - - // Filter only the services are duplicated - const duplicated = services.filter((service) => { - return service.name !== '$node' && service.nodes.length > 1; - }); - - console.log('duplicated ->', duplicated); - - if (!duplicated.length) { - return; - } - - const brokers: Record = Object.fromEntries( - duplicated.map((service) => { - const [, ...nodes] = service.nodes; - return [service.name, nodes]; - }), - ); - - console.log('brokers ->', brokers); - - // Just inform the service that it should be disabled - - const duplicatedServicesNames = duplicated.map((service) => service.name); - api.broadcastToServices(duplicatedServicesNames, 'shutdown', brokers); - }); + /** + * The main idea is if there is no scalability module enabled, + * then we should not allow more than one service per environment. + * So we list the services and nodes, and if there is more than + * one, we inform the service that it should be disabled. + */ + this.onEvent( + '$services.changed', + debounce(async () => { + if (hasLicense('scalability')) { + return; + } + + const services: { + name: string; + nodes: string[]; + }[] = await api.call('$node.services'); + + // Filter only the duplicated services + const duplicated = services.filter((service) => { + return service.name !== '$node' && service.nodes.length > 1; + }); + + if (!duplicated.length) { + return; + } + + const brokers: Record = Object.fromEntries( + duplicated.map((service) => { + // remove the first node from the list + const [, ...nodes] = service.nodes; + return [service.name, nodes]; + }), + ); + + const duplicatedServicesNames = duplicated.map((service) => service.name); + + // send shutdown signal to the duplicated services + api.broadcastToServices(duplicatedServicesNames, 'shutdown', brokers); + }, 1000), + ); } async started(): Promise { diff --git a/ee/app/license/server/license.ts b/ee/app/license/server/license.ts index 40ae20858a36..a4da074e3e88 100644 --- a/ee/app/license/server/license.ts +++ b/ee/app/license/server/license.ts @@ -311,10 +311,6 @@ export function onValidFeature(feature: BundleFeature, cb: () => void): () => vo }; } -export const hasModule = (module: string): boolean => { - return License.hasModule(module); -}; - export function onInvalidFeature(feature: BundleFeature, cb: () => void): () => void { EnterpriseLicenses.on(`invalid:${feature}`, cb); From c5ca08c4dccd21e24dfb5033f12fd483c9434262 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Thu, 10 Feb 2022 12:03:22 -0300 Subject: [PATCH 06/11] Fix review --- ee/server/NetworkBroker.ts | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/ee/server/NetworkBroker.ts b/ee/server/NetworkBroker.ts index 90715964ab3a..a7b0f4b71a46 100644 --- a/ee/server/NetworkBroker.ts +++ b/ee/server/NetworkBroker.ts @@ -90,7 +90,12 @@ export class NetworkBroker implements IBroker { console.error(err); } - return this.call(method, data); + const context = asyncLocalStorage.getStore(); + if (context?.ctx?.call) { + return context.ctx.call(method, data); + } + + return this.broker.call(method, data); } destroyService(instance: ServiceClass): void { @@ -111,21 +116,20 @@ export class NetworkBroker implements IBroker { }); } + const dependencies = name !== 'license' ? { dependencies: ['license'] } : {}; + const service: ServiceSchema = { name, actions: {}, - ...(name !== 'license' - ? { - dependencies: ['license'], - } - : {}), + ...dependencies, events: instance.getEvents().reduce>((map, eventName) => { map[eventName] = /^\$/.test(eventName) - ? (data: Parameters): void => { - instance.emit(eventName, data); + ? (ctx: Context): void => { + // internal events params are not an array + instance.emit(eventName, ctx.params as Parameters); } - : (data: Parameters): void => { - instance.emit(eventName, ...data); + : (ctx: Context): void => { + instance.emit(eventName, ...(ctx.params as Parameters)); }; return map; }, {}), From 90bd57835e1a30983460e1442c4a0e1678f58fb3 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Thu, 10 Feb 2022 14:11:22 -0300 Subject: [PATCH 07/11] Remove whitelisted actions and events --- ee/server/NetworkBroker.ts | 52 ++++++++++++-------------------------- server/sdk/lib/proxify.ts | 6 +---- 2 files changed, 17 insertions(+), 41 deletions(-) diff --git a/ee/server/NetworkBroker.ts b/ee/server/NetworkBroker.ts index a7b0f4b71a46..f50591c31af8 100644 --- a/ee/server/NetworkBroker.ts +++ b/ee/server/NetworkBroker.ts @@ -20,22 +20,15 @@ const lifecycle: { [k: string]: string } = { const { WAIT_FOR_SERVICES_TIMEOUT = '10000', // 10 seconds - WAIT_FOR_SERVICES_WHITELIST_TIMEOUT = '600000', // 10 minutes } = process.env; const waitForServicesTimeout = parseInt(WAIT_FOR_SERVICES_TIMEOUT, 10) || 10000; -const waitForServicesWhitelistTimeout = parseInt(WAIT_FOR_SERVICES_WHITELIST_TIMEOUT, 10) || 600000; export class NetworkBroker implements IBroker { private broker: ServiceBroker; private started: Promise; - private whitelist = { - events: ['license.module', 'watch.settings'], - actions: ['license.hasLicense'], - }; - metrics: IServiceMetrics; constructor(broker: ServiceBroker) { @@ -48,18 +41,6 @@ export class NetworkBroker implements IBroker { this.started = this.broker.start(); } - isWhitelisted(list: string[], item: string): boolean { - return list.includes(item); - } - - isActionWhitelisted(method: string): boolean { - return this.isWhitelisted(this.whitelist.actions, method); - } - - isEventWhitelisted(event: string): boolean { - return this.isWhitelisted(this.whitelist.events, event); - } - async call(method: string, data: any): Promise { await this.started; @@ -82,10 +63,7 @@ export class NetworkBroker implements IBroker { await this.started; try { - await this.broker.waitForServices( - method.split('.')[0], - this.isActionWhitelisted(method) ? waitForServicesWhitelistTimeout : waitForServicesTimeout, - ); + await this.broker.waitForServices(method.split('.')[0], waitForServicesTimeout); } catch (err) { console.error(err); } @@ -103,6 +81,18 @@ export class NetworkBroker implements IBroker { } createService(instance: ServiceClass): void { + const methods = ( + instance.constructor?.name === 'Object' + ? Object.getOwnPropertyNames(instance) + : Object.getOwnPropertyNames(Object.getPrototypeOf(instance)) + ).filter((name) => name !== 'constructor'); + + if (!instance.getEvents() || !methods.length) { + return; + } + + const serviceInstance = instance as any; + const name = instance.getName(); if (!instance.isInternal()) { @@ -139,19 +129,9 @@ export class NetworkBroker implements IBroker { return; } - const methods = - instance.constructor?.name === 'Object' - ? Object.getOwnPropertyNames(instance) - : Object.getOwnPropertyNames(Object.getPrototypeOf(instance)); for (const method of methods) { - if (method === 'constructor') { - continue; - } - - const i = instance as any; - if (method.match(/^on[A-Z]/)) { - service.events[events[method]] = i[method].bind(i); + service.events[events[method]] = serviceInstance[method].bind(serviceInstance); continue; } @@ -164,7 +144,7 @@ export class NetworkBroker implements IBroker { requestID: null, broker: this, }, - i[method].bind(i), + serviceInstance[method].bind(serviceInstance), ); continue; } @@ -178,7 +158,7 @@ export class NetworkBroker implements IBroker { broker: this, ctx, }, - () => i[method](...ctx.params), + () => serviceInstance[method](...ctx.params), ); }; } diff --git a/server/sdk/lib/proxify.ts b/server/sdk/lib/proxify.ts index ec157ebd93c1..31112364ebaf 100644 --- a/server/sdk/lib/proxify.ts +++ b/server/sdk/lib/proxify.ts @@ -25,11 +25,7 @@ function handler(namespace: string, waitService: boolean): Pro }; } -/* - * @deprecated - * - * instead, declare the service dependencies in the constructor - */ +// TODO remove the need to wait for a service, if that is really needed it should have a dependency on startup export function proxifyWithWait(namespace: string): Prom { return new Proxy({}, handler(namespace, true)) as unknown as Prom; } From 763178e618fcb046efad8cc81500fcdd0fa7ed17 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Fri, 11 Feb 2022 11:03:10 -0300 Subject: [PATCH 08/11] Always add authorization service --- server/services/startup.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/server/services/startup.ts b/server/services/startup.ts index 66d2c3aa400b..ce5d34b0ab09 100644 --- a/server/services/startup.ts +++ b/server/services/startup.ts @@ -13,6 +13,7 @@ import { RoomService } from './room/service'; import { SAUMonitorService } from './sauMonitor/service'; import { TeamService } from './team/service'; import { UiKitCoreApp } from './uikit-core-app/service'; +import { Authorization } from './authorization/service'; const { db } = MongoInternals.defaultRemoteCollectionDriver().mongo; @@ -28,13 +29,15 @@ api.registerService(new RoomService(db)); api.registerService(new SAUMonitorService()); api.registerService(new TeamService(db)); api.registerService(new UiKitCoreApp()); +api.registerService(new Authorization(db)); +// TODO need to implement something to make this work // if TRANSPORTER env var it means the process is running in micro services mode // in that case we don't need to register services that will run separately -if (!process.env.TRANSPORTER?.match(/^(?:nats|TCP)/)) { - (async (): Promise => { - const { Authorization } = await import('./authorization/service'); +// if (!process.env.TRANSPORTER?.match(/^(?:nats|TCP)/)) { +// (async (): Promise => { +// const { Authorization } = await import('./authorization/service'); - api.registerService(new Authorization(db)); - })(); -} +// api.registerService(new Authorization(db)); +// })(); +// } From 5abc5d2eb33b8f2ebb47426376978c2796236bf3 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Fri, 11 Feb 2022 11:43:51 -0300 Subject: [PATCH 09/11] Move EE broker startup to startup folder --- ee/server/broker.ts | 164 -------------------- ee/server/index.ts | 1 - ee/server/local-services/ldap/service.ts | 2 - ee/server/services/authorization/service.ts | 2 +- ee/server/services/ddp-streamer/service.ts | 2 +- ee/server/services/ecdh-proxy/service.ts | 2 +- ee/server/services/presence/service.ts | 2 +- ee/server/services/stream-hub/service.ts | 2 +- ee/server/startup/broker.ts | 160 +++++++++++++++++++ ee/server/startup/index.ts | 7 + server/main.ts | 1 - 11 files changed, 172 insertions(+), 173 deletions(-) delete mode 100644 ee/server/broker.ts create mode 100644 ee/server/startup/broker.ts diff --git a/ee/server/broker.ts b/ee/server/broker.ts deleted file mode 100644 index 66392cc6b5e1..000000000000 --- a/ee/server/broker.ts +++ /dev/null @@ -1,164 +0,0 @@ -const { - TRANSPORTER = '', - CACHE = 'Memory', - // SERIALIZER = 'MsgPack', - SERIALIZER = 'EJSON', - MOLECULER_LOG_LEVEL = 'error', - BALANCE_STRATEGY = 'RoundRobin', - BALANCE_PREFER_LOCAL = 'false', - RETRY_FACTOR = '2', - RETRY_MAX_DELAY = '1000', - RETRY_DELAY = '100', - RETRY_RETRIES = '5', - RETRY_ENABLED = 'false', - REQUEST_TIMEOUT = '10', - HEARTBEAT_INTERVAL = '10', - HEARTBEAT_TIMEOUT = '30', - BULKHEAD_ENABLED = 'false', - BULKHEAD_CONCURRENCY = '10', - BULKHEAD_MAX_QUEUE_SIZE = '10000', - MS_METRICS = 'false', - MS_METRICS_PORT = '9458', - TRACING_ENABLED = 'false', - SKIP_PROCESS_EVENT_REGISTRATION = 'false', -} = process.env; - -// class CustomRegenerator extends Errors.Regenerator { -// restoreCustomError(plainError: any): Error | undefined { -// const { message, reason, details, errorType, isClientSafe } = plainError; - -// if (errorType === 'Meteor.Error') { -// const error = new MeteorError(message, reason, details); -// if (typeof isClientSafe !== 'undefined') { -// error.isClientSafe = isClientSafe; -// } -// return error; -// } - -// return undefined; -// } - -// extractPlainError(err: Error | MeteorError): Errors.PlainMoleculerError { -// return { -// ...super.extractPlainError(err), -// ...(isMeteorError(err) && { -// isClientSafe: err.isClientSafe, -// errorType: err.errorType, -// reason: err.reason, -// details: err.details, -// }), -// }; -// } -// } - -// only starts network broker if transporter properly configured -if (TRANSPORTER.match(/^(?:nats|TCP)/)) { - (async (): Promise => { - const { ServiceBroker, Serializers } = await import('moleculer'); - const EJSON = (await import('ejson')).default; - - const { NetworkBroker } = await import('./NetworkBroker'); - const { api } = await import('../../server/sdk/api'); - const Base = Serializers.Base as unknown as new () => {}; - - class EJSONSerializer extends Base { - serialize(obj: {}): Buffer { - return Buffer.from(EJSON.stringify(obj)); - } - - deserialize(buf: Buffer): any { - return EJSON.parse(buf.toString()); - } - } - - const network = new ServiceBroker({ - skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true', - transporter: TRANSPORTER, - metrics: { - enabled: MS_METRICS === 'true', - reporter: [ - { - type: 'Prometheus', - options: { - port: MS_METRICS_PORT, - }, - }, - ], - }, - cacher: CACHE, - serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER, - logLevel: MOLECULER_LOG_LEVEL as any, - // logLevel: { - // // "TRACING": "trace", - // // "TRANS*": "warn", - // BROKER: 'debug', - // TRANSIT: 'debug', - // '**': 'info', - // }, - logger: { - type: 'Console', - options: { - formatter: 'short', - }, - }, - registry: { - strategy: BALANCE_STRATEGY, - preferLocal: BALANCE_PREFER_LOCAL !== 'false', - }, - - requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000, - retryPolicy: { - enabled: RETRY_ENABLED === 'true', - retries: parseInt(RETRY_RETRIES), - delay: parseInt(RETRY_DELAY), - maxDelay: parseInt(RETRY_MAX_DELAY), - factor: parseInt(RETRY_FACTOR), - check: (err: any): boolean => err && !!err.retryable, - }, - - maxCallLevel: 100, - heartbeatInterval: parseInt(HEARTBEAT_INTERVAL), - heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT), - - // circuitBreaker: { - // enabled: false, - // threshold: 0.5, - // windowTime: 60, - // minRequestCount: 20, - // halfOpenTime: 10 * 1000, - // check: (err: any): boolean => err && err.code >= 500, - // }, - - bulkhead: { - enabled: BULKHEAD_ENABLED === 'true', - concurrency: parseInt(BULKHEAD_CONCURRENCY), - maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE), - }, - - tracing: { - enabled: TRACING_ENABLED === 'true', - exporter: { - type: 'Jaeger', - options: { - endpoint: null, - host: 'jaeger', - port: 6832, - sampler: { - // Sampler type. More info: https://www.jaegertracing.io/docs/1.14/sampling/#client-sampling-configuration - type: 'Const', - // Sampler specific options. - options: {}, - }, - // Additional options for `Jaeger.Tracer` - tracerOptions: {}, - // Default tags. They will be added into all span tags. - defaultTags: null, - }, - }, - }, - // errorRegenerator: new CustomRegenerator(), - }); - - api.setBroker(new NetworkBroker(network)); - })(); -} diff --git a/ee/server/index.ts b/ee/server/index.ts index 963e58fe6d97..6933a3eb9f27 100644 --- a/ee/server/index.ts +++ b/ee/server/index.ts @@ -1,4 +1,3 @@ -import './broker'; import './startup'; import '../app/models'; diff --git a/ee/server/local-services/ldap/service.ts b/ee/server/local-services/ldap/service.ts index d81c7c93fe3b..8d9a15d0a96a 100644 --- a/ee/server/local-services/ldap/service.ts +++ b/ee/server/local-services/ldap/service.ts @@ -1,5 +1,3 @@ -import '../../broker'; - import { LDAPEEManager } from '../../lib/ldap/Manager'; import { ILDAPEEService } from '../../sdk/types/ILDAPEEService'; import { ServiceClassInternal } from '../../../../server/sdk/types/ServiceClass'; diff --git a/ee/server/services/authorization/service.ts b/ee/server/services/authorization/service.ts index 4a1975fabfbe..3cad2245f557 100644 --- a/ee/server/services/authorization/service.ts +++ b/ee/server/services/authorization/service.ts @@ -1,4 +1,4 @@ -import '../../broker'; +import '../../startup/broker'; import { api } from '../../../../server/sdk/api'; import { Authorization } from '../../../../server/services/authorization/service'; diff --git a/ee/server/services/ddp-streamer/service.ts b/ee/server/services/ddp-streamer/service.ts index 2dcaaadea5ee..298b6e49ff30 100755 --- a/ee/server/services/ddp-streamer/service.ts +++ b/ee/server/services/ddp-streamer/service.ts @@ -1,4 +1,4 @@ -import '../../broker'; +import '../../startup/broker'; import { api } from '../../../../server/sdk/api'; import { DDPStreamer } from './DDPStreamer'; diff --git a/ee/server/services/ecdh-proxy/service.ts b/ee/server/services/ecdh-proxy/service.ts index 194c2d7f2c04..23533d0d6fef 100755 --- a/ee/server/services/ecdh-proxy/service.ts +++ b/ee/server/services/ecdh-proxy/service.ts @@ -1,4 +1,4 @@ -import '../../broker'; +import '../../startup/broker'; import { api } from '../../../../server/sdk/api'; import { ECDHProxy } from './ECDHProxy'; diff --git a/ee/server/services/presence/service.ts b/ee/server/services/presence/service.ts index f6ae2f810ef7..93908d7be80c 100755 --- a/ee/server/services/presence/service.ts +++ b/ee/server/services/presence/service.ts @@ -1,4 +1,4 @@ -import '../../broker'; +import '../../startup/broker'; import { api } from '../../../../server/sdk/api'; import { Presence } from './Presence'; diff --git a/ee/server/services/stream-hub/service.ts b/ee/server/services/stream-hub/service.ts index ee9f9cdbf053..edb5cafe1244 100755 --- a/ee/server/services/stream-hub/service.ts +++ b/ee/server/services/stream-hub/service.ts @@ -1,4 +1,4 @@ -import '../../broker'; +import '../../startup/broker'; import { api } from '../../../../server/sdk/api'; import { StreamHub } from './StreamHub'; diff --git a/ee/server/startup/broker.ts b/ee/server/startup/broker.ts new file mode 100644 index 000000000000..163148252843 --- /dev/null +++ b/ee/server/startup/broker.ts @@ -0,0 +1,160 @@ +import EJSON from 'ejson'; +import { Errors, Serializers, ServiceBroker } from 'moleculer'; + +import { api } from '../../../server/sdk/api'; +import { isMeteorError, MeteorError } from '../../../server/sdk/errors'; +import { NetworkBroker } from '../NetworkBroker'; + +const { + TRANSPORTER = '', + CACHE = 'Memory', + // SERIALIZER = 'MsgPack', + SERIALIZER = 'EJSON', + MOLECULER_LOG_LEVEL = 'error', + BALANCE_STRATEGY = 'RoundRobin', + BALANCE_PREFER_LOCAL = 'false', + RETRY_FACTOR = '2', + RETRY_MAX_DELAY = '1000', + RETRY_DELAY = '100', + RETRY_RETRIES = '5', + RETRY_ENABLED = 'false', + REQUEST_TIMEOUT = '10', + HEARTBEAT_INTERVAL = '10', + HEARTBEAT_TIMEOUT = '30', + BULKHEAD_ENABLED = 'false', + BULKHEAD_CONCURRENCY = '10', + BULKHEAD_MAX_QUEUE_SIZE = '10000', + MS_METRICS = 'false', + MS_METRICS_PORT = '9458', + TRACING_ENABLED = 'false', + SKIP_PROCESS_EVENT_REGISTRATION = 'false', +} = process.env; + +const Base = Serializers.Base as unknown as new () => {}; +class CustomRegenerator extends Errors.Regenerator { + restoreCustomError(plainError: any): Error | undefined { + const { message, reason, details, errorType, isClientSafe } = plainError; + + if (errorType === 'Meteor.Error') { + const error = new MeteorError(message, reason, details); + if (typeof isClientSafe !== 'undefined') { + error.isClientSafe = isClientSafe; + } + return error; + } + + return undefined; + } + + extractPlainError(err: Error | MeteorError): Errors.PlainMoleculerError { + return { + ...super.extractPlainError(err), + ...(isMeteorError(err) && { + isClientSafe: err.isClientSafe, + errorType: err.errorType, + reason: err.reason, + details: err.details, + }), + }; + } +} + +class EJSONSerializer extends Base { + serialize(obj: {}): Buffer { + return Buffer.from(EJSON.stringify(obj)); + } + + deserialize(buf: Buffer): any { + return EJSON.parse(buf.toString()); + } +} + +const network = new ServiceBroker({ + skipProcessEventRegistration: SKIP_PROCESS_EVENT_REGISTRATION === 'true', + transporter: TRANSPORTER, + metrics: { + enabled: MS_METRICS === 'true', + reporter: [ + { + type: 'Prometheus', + options: { + port: MS_METRICS_PORT, + }, + }, + ], + }, + cacher: CACHE, + serializer: SERIALIZER === 'EJSON' ? new EJSONSerializer() : SERIALIZER, + logLevel: MOLECULER_LOG_LEVEL as any, + // logLevel: { + // // "TRACING": "trace", + // // "TRANS*": "warn", + // BROKER: 'debug', + // TRANSIT: 'debug', + // '**': 'info', + // }, + logger: { + type: 'Console', + options: { + formatter: 'short', + }, + }, + registry: { + strategy: BALANCE_STRATEGY, + preferLocal: BALANCE_PREFER_LOCAL !== 'false', + }, + + requestTimeout: parseInt(REQUEST_TIMEOUT) * 1000, + retryPolicy: { + enabled: RETRY_ENABLED === 'true', + retries: parseInt(RETRY_RETRIES), + delay: parseInt(RETRY_DELAY), + maxDelay: parseInt(RETRY_MAX_DELAY), + factor: parseInt(RETRY_FACTOR), + check: (err: any): boolean => err && !!err.retryable, + }, + + maxCallLevel: 100, + heartbeatInterval: parseInt(HEARTBEAT_INTERVAL), + heartbeatTimeout: parseInt(HEARTBEAT_TIMEOUT), + + // circuitBreaker: { + // enabled: false, + // threshold: 0.5, + // windowTime: 60, + // minRequestCount: 20, + // halfOpenTime: 10 * 1000, + // check: (err: any): boolean => err && err.code >= 500, + // }, + + bulkhead: { + enabled: BULKHEAD_ENABLED === 'true', + concurrency: parseInt(BULKHEAD_CONCURRENCY), + maxQueueSize: parseInt(BULKHEAD_MAX_QUEUE_SIZE), + }, + + tracing: { + enabled: TRACING_ENABLED === 'true', + exporter: { + type: 'Jaeger', + options: { + endpoint: null, + host: 'jaeger', + port: 6832, + sampler: { + // Sampler type. More info: https://www.jaegertracing.io/docs/1.14/sampling/#client-sampling-configuration + type: 'Const', + // Sampler specific options. + options: {}, + }, + // Additional options for `Jaeger.Tracer` + tracerOptions: {}, + // Default tags. They will be added into all span tags. + defaultTags: null, + }, + }, + }, + errorRegenerator: new CustomRegenerator(), +}); + +api.setBroker(new NetworkBroker(network)); diff --git a/ee/server/startup/index.ts b/ee/server/startup/index.ts index 8b861f051e7e..0fac27be7e40 100644 --- a/ee/server/startup/index.ts +++ b/ee/server/startup/index.ts @@ -1,3 +1,10 @@ import './engagementDashboard'; import './seatsCap'; import './services'; + +const { TRANSPORTER = '' } = process.env; + +// only starts network broker if transporter properly configured +if (TRANSPORTER.match(/^(?:nats|TCP)/)) { + require('./broker'); +} diff --git a/server/main.ts b/server/main.ts index 12354325eb57..fb17010ec69d 100644 --- a/server/main.ts +++ b/server/main.ts @@ -1,4 +1,3 @@ -import '../ee/server/broker'; import '../app/settings/server/startup'; import '../lib/oauthRedirectUri'; import './lib/logger/startup'; From a59066e20c7a59e514f8f600685c3a660db15e8c Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Fri, 11 Feb 2022 14:51:23 -0300 Subject: [PATCH 10/11] Remove license service dependency on authorizations --- .../license/server/license.internalService.ts | 11 +++++-- ee/server/NetworkBroker.ts | 3 +- ee/server/services/account/service.ts | 2 +- server/sdk/lib/Events.ts | 1 + server/sdk/types/IAuthorization.ts | 1 - server/sdk/types/ILicense.ts | 2 ++ server/services/authorization/service.ts | 29 ++++++++++++++----- server/services/startup.ts | 15 ++++------ 8 files changed, 41 insertions(+), 23 deletions(-) diff --git a/ee/app/license/server/license.internalService.ts b/ee/app/license/server/license.internalService.ts index 69c0e0f483ff..d5df9d46dba9 100644 --- a/ee/app/license/server/license.internalService.ts +++ b/ee/app/license/server/license.internalService.ts @@ -1,6 +1,5 @@ import { debounce } from 'underscore'; -import { Authorization } from '../../../../server/sdk'; import { api } from '../../../../server/sdk/api'; import { ILicense } from '../../../../server/sdk/types/ILicense'; import { ServiceClassInternal } from '../../../../server/sdk/types/ServiceClass'; @@ -14,12 +13,14 @@ export class LicenseService extends ServiceClassInternal implements ILicense { constructor() { super(); + console.log('[ee] LicenseService', new Error().stack); + onValidateLicenses((): void => { if (!isEnterprise()) { return; } - Authorization.addRoleRestrictions('guest', guestPermissions); + api.broadcast('authorization.guestPermissions', guestPermissions); resetEnterprisePermissions(); }); @@ -75,7 +76,7 @@ export class LicenseService extends ServiceClassInternal implements ILicense { return; } - Authorization.addRoleRestrictions('guest', guestPermissions); + api.broadcast('authorization.guestPermissions', guestPermissions); resetEnterprisePermissions(); } @@ -90,4 +91,8 @@ export class LicenseService extends ServiceClassInternal implements ILicense { getModules(): string[] { return getModules(); } + + getGuestPermissions(): string[] { + return guestPermissions; + } } diff --git a/ee/server/NetworkBroker.ts b/ee/server/NetworkBroker.ts index f50591c31af8..b64a03b9fa72 100644 --- a/ee/server/NetworkBroker.ts +++ b/ee/server/NetworkBroker.ts @@ -136,7 +136,7 @@ export class NetworkBroker implements IBroker { } if (lifecycle[method]) { - service[method] = (): void => + service[method] = (): void => { asyncLocalStorage.run( { id: '', @@ -146,6 +146,7 @@ export class NetworkBroker implements IBroker { }, serviceInstance[method].bind(serviceInstance), ); + }; continue; } diff --git a/ee/server/services/account/service.ts b/ee/server/services/account/service.ts index 2b0a78114965..aec4f26f0ff3 100644 --- a/ee/server/services/account/service.ts +++ b/ee/server/services/account/service.ts @@ -1,4 +1,4 @@ -import '../../broker'; +import '../../startup/broker'; import { api } from '../../../../server/sdk/api'; import { Account } from './Account'; diff --git a/server/sdk/lib/Events.ts b/server/sdk/lib/Events.ts index 1631324bbe21..d1d7266d0605 100644 --- a/server/sdk/lib/Events.ts +++ b/server/sdk/lib/Events.ts @@ -24,6 +24,7 @@ export type EventSignatures = { '$services.changed': (info: unknown) => void; 'accounts.login': (info: { userId: string; connection: ISocketConnection }) => void; 'accounts.logout': (info: { userId: string; connection: ISocketConnection }) => void; + 'authorization.guestPermissions': (permissions: string[]) => void; 'socket.connected': (connection: ISocketConnection) => void; 'socket.disconnected': (connection: ISocketConnection) => void; 'banner.new'(bannerId: string): void; diff --git a/server/sdk/types/IAuthorization.ts b/server/sdk/types/IAuthorization.ts index 86d12e835c3b..e72d84078ac0 100644 --- a/server/sdk/types/IAuthorization.ts +++ b/server/sdk/types/IAuthorization.ts @@ -11,7 +11,6 @@ export interface IAuthorization { hasAllPermission(userId: string, permissions: string[], scope?: string): Promise; hasPermission(userId: string, permissionId: string, scope?: string): Promise; hasAtLeastOnePermission(userId: string, permissions: string[], scope?: string): Promise; - addRoleRestrictions(role: string, permissions: string[]): Promise; canAccessRoom: RoomAccessValidator; canAccessRoomId(rid: IRoom['_id'], uid: IUser['_id']): Promise; } diff --git a/server/sdk/types/ILicense.ts b/server/sdk/types/ILicense.ts index b6bcc6ed1b21..4ac2f5fe580c 100644 --- a/server/sdk/types/ILicense.ts +++ b/server/sdk/types/ILicense.ts @@ -6,4 +6,6 @@ export interface ILicense extends IServiceClass { isEnterprise(): boolean; getModules(): string[]; + + getGuestPermissions(): string[]; } diff --git a/server/services/authorization/service.ts b/server/services/authorization/service.ts index cd50baa65829..da2f0ba80d5f 100644 --- a/server/services/authorization/service.ts +++ b/server/services/authorization/service.ts @@ -16,6 +16,7 @@ import { UsersRaw } from '../../../app/models/server/raw/Users'; import { IRole } from '../../../definition/IRole'; import type { IRoom } from '../../../definition/IRoom'; import { ISubscription } from '../../../definition/ISubscription'; +import { License } from '../../sdk'; import './canAccessRoomLivechat'; import './canAccessRoomTokenpass'; @@ -74,6 +75,22 @@ export class Authorization extends ServiceClass implements IAuthorization { this.onEvent('watch.roles', clearCache); this.onEvent('permission.changed', clearCache); + this.onEvent('authorization.guestPermissions', (permissions: string[]) => { + AuthorizationUtils.addRolePermissionWhiteList('guest', permissions); + }); + } + + async started(): Promise { + if (!(await License.isEnterprise())) { + return; + } + + const permissions = await License.getGuestPermissions(); + if (!permissions) { + return; + } + + AuthorizationUtils.addRolePermissionWhiteList('guest', permissions); } async hasAllPermission(userId: string, permissions: string[], scope?: string): Promise { @@ -119,10 +136,6 @@ export class Authorization extends ServiceClass implements IAuthorization { return this.canAccessRoom(room, { _id: uid }); } - async addRoleRestrictions(role: string, permissions: string[]): Promise { - AuthorizationUtils.addRolePermissionWhiteList(role, permissions); - } - async getUsersFromPublicRoles(): Promise[]> { const roleIds = await this.getPublicRoles(); @@ -183,8 +196,8 @@ export class Authorization extends ServiceClass implements IAuthorization { private async atLeastOne(uid: string, permissions: string[] = [], scope?: string): Promise { const sortedRoles = await this.getRolesCached(uid, scope); - for (const permission of permissions) { - if (await this.rolesHasPermissionCached(permission, sortedRoles)) { // eslint-disable-line + for await (const permission of permissions) { + if (await this.rolesHasPermissionCached(permission, sortedRoles)) { return true; } } @@ -194,8 +207,8 @@ export class Authorization extends ServiceClass implements IAuthorization { private async all(uid: string, permissions: string[] = [], scope?: string): Promise { const sortedRoles = await this.getRolesCached(uid, scope); - for (const permission of permissions) { - if (!await this.rolesHasPermissionCached(permission, sortedRoles)) { // eslint-disable-line + for await (const permission of permissions) { + if (!(await this.rolesHasPermissionCached(permission, sortedRoles))) { return false; } } diff --git a/server/services/startup.ts b/server/services/startup.ts index ce5d34b0ab09..66d2c3aa400b 100644 --- a/server/services/startup.ts +++ b/server/services/startup.ts @@ -13,7 +13,6 @@ import { RoomService } from './room/service'; import { SAUMonitorService } from './sauMonitor/service'; import { TeamService } from './team/service'; import { UiKitCoreApp } from './uikit-core-app/service'; -import { Authorization } from './authorization/service'; const { db } = MongoInternals.defaultRemoteCollectionDriver().mongo; @@ -29,15 +28,13 @@ api.registerService(new RoomService(db)); api.registerService(new SAUMonitorService()); api.registerService(new TeamService(db)); api.registerService(new UiKitCoreApp()); -api.registerService(new Authorization(db)); -// TODO need to implement something to make this work // if TRANSPORTER env var it means the process is running in micro services mode // in that case we don't need to register services that will run separately -// if (!process.env.TRANSPORTER?.match(/^(?:nats|TCP)/)) { -// (async (): Promise => { -// const { Authorization } = await import('./authorization/service'); +if (!process.env.TRANSPORTER?.match(/^(?:nats|TCP)/)) { + (async (): Promise => { + const { Authorization } = await import('./authorization/service'); -// api.registerService(new Authorization(db)); -// })(); -// } + api.registerService(new Authorization(db)); + })(); +} From d691e0a6f80318d48da22a281f61e4e261f72857 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Fri, 11 Feb 2022 16:59:48 -0300 Subject: [PATCH 11/11] Remove console.log --- ee/app/license/server/license.internalService.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/ee/app/license/server/license.internalService.ts b/ee/app/license/server/license.internalService.ts index d5df9d46dba9..5d9d03381b36 100644 --- a/ee/app/license/server/license.internalService.ts +++ b/ee/app/license/server/license.internalService.ts @@ -13,8 +13,6 @@ export class LicenseService extends ServiceClassInternal implements ILicense { constructor() { super(); - console.log('[ee] LicenseService', new Error().stack); - onValidateLicenses((): void => { if (!isEnterprise()) { return;