Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVE][ENTERPRISE] Improve how micro services are loaded #24388

Merged
merged 14 commits into from
Feb 12, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions app/livechat/server/roomAccessValidator.internalService.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { ServiceClass } from '../../../server/sdk/types/ServiceClass';
import { IUser } from '../../../definition/IUser';
import { IAuthorizationLivechat } from '../../../server/sdk/types/IAuthorizationLivechat';
import { ServiceClassInternal } from '../../../server/sdk/types/ServiceClass';
import { validators } from './roomAccessValidator.compatibility';
import { api } from '../../../server/sdk/api';
import type { IOmnichannelRoom } from '../../../definition/IRoom';
import { IUser } from '../../../definition/IUser';

class AuthorizationLivechat extends ServiceClass implements IAuthorizationLivechat {
export class AuthorizationLivechat extends ServiceClassInternal implements IAuthorizationLivechat {
protected name = 'authorization-livechat';

protected internal = true;
Expand All @@ -20,5 +19,3 @@ class AuthorizationLivechat extends ServiceClass implements IAuthorizationLivech
return false;
}
}

api.registerService(new AuthorizationLivechat());
4 changes: 2 additions & 2 deletions app/search/server/search.internalService.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { Users } from '../../models/server';
import { settings } from '../../settings/server';
import { searchProviderService } from './service/providerService';
import { ServiceClass } from '../../../server/sdk/types/ServiceClass';
import { ServiceClassInternal } from '../../../server/sdk/types/ServiceClass';
import { api } from '../../../server/sdk/api';
import { searchEventService } from './events/events';

class Search extends ServiceClass {
class Search extends ServiceClassInternal {
protected name = 'search';

protected internal = true;
Expand Down
7 changes: 2 additions & 5 deletions app/tokenpass/server/roomAccessValidator.internalService.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { ServiceClass } from '../../../server/sdk/types/ServiceClass';
import { ServiceClassInternal } from '../../../server/sdk/types/ServiceClass';
import { validators } from './roomAccessValidator.compatibility';
import { api } from '../../../server/sdk/api';
import { IAuthorizationTokenpass } from '../../../server/sdk/types/IAuthorizationTokenpass';
import { IRoom } from '../../../definition/IRoom';
import { IUser } from '../../../definition/IUser';

class AuthorizationTokenpass extends ServiceClass implements IAuthorizationTokenpass {
export class AuthorizationTokenpass extends ServiceClassInternal implements IAuthorizationTokenpass {
protected name = 'authorization-tokenpass';

protected internal = true;
Expand All @@ -20,5 +19,3 @@ class AuthorizationTokenpass extends ServiceClass implements IAuthorizationToken
return false;
}
}

api.registerService(new AuthorizationTokenpass());
1 change: 0 additions & 1 deletion ee/app/license/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import './license.internalService';
import './settings';
import './methods';
import './startup';
Expand Down
58 changes: 49 additions & 9 deletions ee/app/license/server/license.internalService.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { ServiceClass } from '../../../../server/sdk/types/ServiceClass';
import { debounce } from 'underscore';

import { Authorization } from '../../../../server/sdk';
import { api } from '../../../../server/sdk/api';
import { ILicense } from '../../../../server/sdk/types/ILicense';
import { hasLicense, isEnterprise, getModules, onValidateLicenses, onModule } from './license';
import { resetEnterprisePermissions } from '../../authorization/server/resetEnterprisePermissions';
import { Authorization } from '../../../../server/sdk';
import { ServiceClassInternal } from '../../../../server/sdk/types/ServiceClass';
import { guestPermissions } from '../../authorization/lib/guestPermissions';
import { resetEnterprisePermissions } from '../../authorization/server/resetEnterprisePermissions';
import { getModules, hasLicense, isEnterprise, onModule, onValidateLicenses } from './license';

class LicenseService extends ServiceClass implements ILicense {
export class LicenseService extends ServiceClassInternal implements ILicense {
protected name = 'license';

protected internal = true;

constructor() {
super();

Expand All @@ -26,6 +26,48 @@ class LicenseService extends ServiceClass implements ILicense {
onModule((licenseModule) => {
api.broadcast('license.module', licenseModule);
});

/**
* The main idea is if there is no scalability module enabled,
* then we should not allow more than one service per environment.
* So we list the services and nodes, and if there is more than
* one, we inform the service that it should be disabled.
*/
this.onEvent(
'$services.changed',
debounce(async () => {
if (hasLicense('scalability')) {
return;
}

const services: {
name: string;
nodes: string[];
}[] = await api.call('$node.services');

// Filter only the duplicated services
const duplicated = services.filter((service) => {
return service.name !== '$node' && service.nodes.length > 1;
});

if (!duplicated.length) {
return;
}

const brokers: Record<string, string[]> = Object.fromEntries(
duplicated.map((service) => {
// remove the first node from the list
const [, ...nodes] = service.nodes;
return [service.name, nodes];
}),
);

const duplicatedServicesNames = duplicated.map((service) => service.name);

// send shutdown signal to the duplicated services
api.broadcastToServices(duplicatedServicesNames, 'shutdown', brokers);
}, 1000),
);
}

async started(): Promise<void> {
Expand All @@ -49,5 +91,3 @@ class LicenseService extends ServiceClass implements ILicense {
return getModules();
}
}

api.registerService(new LicenseService());
7 changes: 2 additions & 5 deletions ee/app/settings/server/settings.internalService.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { ServiceClass } from '../../../../server/sdk/types/ServiceClass';
import { api } from '../../../../server/sdk/api';
import { ServiceClassInternal } from '../../../../server/sdk/types/ServiceClass';
import { IEnterpriseSettings } from '../../../../server/sdk/types/IEnterpriseSettings';
import { changeSettingValue } from './settings';
import { ISetting } from '../../../../definition/ISetting';

class EnterpriseSettings extends ServiceClass implements IEnterpriseSettings {
export class EnterpriseSettings extends ServiceClassInternal implements IEnterpriseSettings {
protected name = 'ee-settings';

protected internal = true;
Expand All @@ -13,5 +12,3 @@ class EnterpriseSettings extends ServiceClass implements IEnterpriseSettings {
return changeSettingValue(record);
}
}

api.registerService(new EnterpriseSettings());
188 changes: 188 additions & 0 deletions ee/server/NetworkBroker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import { ServiceBroker, Context, ServiceSchema } from 'moleculer';

import { asyncLocalStorage } from '../../server/sdk';
import { api } from '../../server/sdk/api';
import { IBroker, IBrokerNode, IServiceMetrics } from '../../server/sdk/types/IBroker';
import { ServiceClass } from '../../server/sdk/types/ServiceClass';
import { EventSignatures } from '../../server/sdk/lib/Events';

const events: { [k: string]: string } = {
onNodeConnected: '$node.connected',
onNodeUpdated: '$node.updated',
onNodeDisconnected: '$node.disconnected',
};

const lifecycle: { [k: string]: string } = {
created: 'created',
started: 'started',
stopped: 'stopped',
};

const {
WAIT_FOR_SERVICES_TIMEOUT = '10000', // 10 seconds
} = process.env;

const waitForServicesTimeout = parseInt(WAIT_FOR_SERVICES_TIMEOUT, 10) || 10000;

export class NetworkBroker implements IBroker {
private broker: ServiceBroker;

private started: Promise<void>;

metrics: IServiceMetrics;

constructor(broker: ServiceBroker) {
this.broker = broker;

api.setBroker(this);

this.metrics = broker.metrics;

this.started = this.broker.start();
}

async call(method: string, data: any): Promise<any> {
await this.started;

const context = asyncLocalStorage.getStore();

if (context?.ctx?.call) {
return context.ctx.call(method, data);
}

const services: { name: string }[] = await this.broker.call('$node.services', {
onlyAvailable: true,
});
if (!services.find((service) => service.name === method.split('.')[0])) {
return new Error('method-not-available');
}
return this.broker.call(method, data);
}

async waitAndCall(method: string, data: any): Promise<any> {
await this.started;

try {
await this.broker.waitForServices(method.split('.')[0], waitForServicesTimeout);
} catch (err) {
console.error(err);
}

const context = asyncLocalStorage.getStore();
if (context?.ctx?.call) {
return context.ctx.call(method, data);
}

return this.broker.call(method, data);
}

destroyService(instance: ServiceClass): void {
this.broker.destroyService(instance.getName());
}

createService(instance: ServiceClass): void {
const methods = (
instance.constructor?.name === 'Object'
? Object.getOwnPropertyNames(instance)
: Object.getOwnPropertyNames(Object.getPrototypeOf(instance))
).filter((name) => name !== 'constructor');

if (!instance.getEvents() || !methods.length) {
return;
}

const serviceInstance = instance as any;

const name = instance.getName();

if (!instance.isInternal()) {
instance.onEvent('shutdown', async (services) => {
if (!services[name]?.includes(this.broker.nodeID)) {
this.broker.logger.debug({ msg: 'Not shutting down, different node.', nodeID: this.broker.nodeID });
return;
}
this.broker.logger.info({ msg: 'Received shutdown event, destroying service.', nodeID: this.broker.nodeID });
this.destroyService(instance);
});
}

const dependencies = name !== 'license' ? { dependencies: ['license'] } : {};

const service: ServiceSchema = {
name,
actions: {},
...dependencies,
events: instance.getEvents().reduce<Record<string, Function>>((map, eventName) => {
map[eventName] = /^\$/.test(eventName)
? (ctx: Context): void => {
// internal events params are not an array
instance.emit(eventName, ctx.params as Parameters<EventSignatures[typeof eventName]>);
}
: (ctx: Context): void => {
instance.emit(eventName, ...(ctx.params as Parameters<EventSignatures[typeof eventName]>));
};
return map;
}, {}),
};

if (!service.events || !service.actions) {
return;
}

for (const method of methods) {
if (method.match(/^on[A-Z]/)) {
service.events[events[method]] = serviceInstance[method].bind(serviceInstance);
continue;
}

if (lifecycle[method]) {
service[method] = (): void =>
asyncLocalStorage.run(
{
id: '',
nodeID: this.broker.nodeID,
requestID: null,
broker: this,
},
serviceInstance[method].bind(serviceInstance),
);
continue;
}

service.actions[method] = async (ctx: Context<[]>): Promise<any> => {
return asyncLocalStorage.run(
{
id: ctx.id,
nodeID: ctx.nodeID,
requestID: ctx.requestID,
broker: this,
ctx,
},
() => serviceInstance[method](...ctx.params),
);
};
}

this.broker.createService(service);
}

async broadcast<T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>): Promise<void> {
return this.broker.broadcast(event, args);
}

async broadcastLocal<T extends keyof EventSignatures>(event: T, ...args: Parameters<EventSignatures[T]>): Promise<void> {
this.broker.broadcastLocal(event, args);
}

async broadcastToServices<T extends keyof EventSignatures>(
services: string[],
event: T,
...args: Parameters<EventSignatures[T]>
): Promise<void> {
this.broker.broadcast(event, args, services);
}

async nodeList(): Promise<IBrokerNode[]> {
return this.broker.call('$node.list');
}
}
Loading