Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote registry minimized memory usage + node cluster increase metadata size #272

Merged
merged 5 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/cluster-nodejs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"build-rollup": "rollup -c",
"lint": "tslint '{src,tests}/**/*.{ts,tsx}' --fix",
"prettier": "prettier --write '{src,tests}/**/*.{ts,tsx}'",
"test": "jest --config jest.config.js"
"test": "jest --config jest.config.js --forceExit"
},
"author": "Scalecube (https://github.com/scalecube/scalecube-js)",
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/cluster-nodejs/src/Cluster/JoinCluster.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export const joinCluster: ClusterApi.JoinCluster = (options: ClusterApi.ClusterO
pingReqTimeout: 60, // optional
pingReqGroupSize: 3, // optional
suspectTimeout: 60, // optional
udp: { maxDgramSize: 512 }, // optional
udp: { maxDgramSize: 4096 }, // optional
preferCurrentMeta: true, // optional
};

Expand Down
91 changes: 91 additions & 0 deletions packages/cluster-nodejs/tests/integration/metadataSize.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { joinCluster } from '../../src';
import { ClusterEvent } from '@scalecube/api/lib/cluster';
import { getFullAddress } from '@scalecube/utils';

describe('Member metadata size limit', () => {
test('metadata size should not be limited', (done) => {
const address1 = {
protocol: 'ws',
host: '127.0.0.1',
port: 8125,
path: '',
};

const address2 = {
protocol: 'ws',
host: '127.0.0.1',
port: 8126,
path: '',
};

const publish: any[] = [
{
'ws://192.168.0.1:1000/some/path': {
greeting: {
greet: 0,
greet1: 0,
greet2: 0,
greet3: 0,
greet4: 0,
greet5: 0,
greet6: 0,
greet7: 0,
greet8: 0,
},
stream: {
greet0: 1,
greet1: 1,
greet2: 1,
greet3: 1,
greet4: 1,
greet5: 1,
greet6: 1,
greet7: 1,
greet8: 1,
},
service1: {
greet: 0,
greet1: 0,
greet2: 0,
greet3: 0,
greet4: 0,
greet5: 0,
greet6: 0,
greet7: 0,
greet8: 0,
},
service2: {
greet: 0,
greet1: 0,
greet2: 0,
greet3: 0,
greet4: 0,
greet5: 0,
greet6: 0,
greet7: 0,
greet8: 0,
},
},
},
];

joinCluster({
address: address1,
itemsToPublish: publish,
});

const node2 = joinCluster({
address: address2,
seedAddress: [address1],
itemsToPublish: [],
});
node2.listen$().subscribe((res: ClusterEvent) => {
expect(res).toMatchObject({
from: getFullAddress(address1),
items: publish,
type: 'ADDED',
});
done();
});
});
});
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
import { DiscoveryApi, MicroserviceApi } from '@scalecube/api';
import { saveToLogs } from '@scalecube/utils';
import { tap } from 'rxjs/operators';
import { map, tap } from 'rxjs/operators';
import { GetServiceFactoryOptions, SetMicroserviceInstanceOptions } from '../helpers/types';
import { createProxy } from '../Proxy/createProxy';
import { destroy } from './Destroy';
import { createServiceCall } from '../ServiceCall/ServiceCall';
import { ServiceDiscoveryEvent } from '@scalecube/api/lib/discovery';
import { restore } from './endpointsUtil';

export const setMicroserviceInstance = (options: SetMicroserviceInstanceOptions): MicroserviceApi.Microservice => {
const { transportClient, serverStop, discoveryInstance, debug, defaultRouter, microserviceContext } = options;

const { remoteRegistry } = microserviceContext;

discoveryInstance &&
discoveryInstance
.discoveredItems$()
.pipe(printLogs(microserviceContext.whoAmI, debug))
.subscribe(remoteRegistry.update);

const serviceFactoryOptions = getServiceFactoryOptions({
microserviceContext,
transportClient,
defaultRouter,
});

discoveryInstance &&
discoveryInstance
.discoveredItems$()
.pipe(
map((i: ServiceDiscoveryEvent) => ({
type: i.type,
items: restore(i.items[0]),
})),
printLogs(microserviceContext.whoAmI, debug)
)
.subscribe(remoteRegistry.update);
return Object.freeze({
destroy: () =>
destroy({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { flatteningServices } from '../helpers/serviceData';
import { getServiceFactoryOptions, setMicroserviceInstance } from './MicroserviceInstance';
import { ROUTER_NOT_PROVIDED } from '../helpers/constants';
import { loggerUtil } from '../helpers/logger';
import { minimized } from './endpointsUtil';

export const createMicroservice: MicroserviceApi.CreateMicroservice = (
options: MicroserviceApi.MicroserviceOptions
Expand Down Expand Up @@ -85,7 +86,7 @@ export const createMicroservice: MicroserviceApi.CreateMicroservice = (

const discoveryInstance = createDiscovery({
address: fallBackAddress,
itemsToPublish: endPointsToPublishInCluster,
itemsToPublish: [minimized(endPointsToPublishInCluster)],
seedAddress,
cluster,
debug,
Expand Down Expand Up @@ -129,13 +130,8 @@ const createMicroserviceContext = ({ address, debug }: MicroserviceContextOption
};

const multiSeedSupport = (seedAddress: string | Address | string[] | Address[]) => {
let seeds = [];
if (!check.isArray(seedAddress)) {
seeds = check.isString(seedAddress) ? [getAddress(seedAddress as string)] : [seedAddress];
} else {
seeds = (seedAddress as []).map((val: string | Address) => {
return check.isString(val) ? getAddress(val as string) : val;
});
return check.isString(seedAddress) ? [getAddress(seedAddress as string)] : [seedAddress];
}
return seeds;
return (seedAddress as []).map((val: string | Address) => (check.isString(val) ? getAddress(val as string) : val));
};
49 changes: 49 additions & 0 deletions packages/scalecube-microservice/src/Microservices/endpointsUtil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/// This util is minimize and restore Endpoint[]
/// use for optimize endpoints transport
import { MicroserviceApi } from '@scalecube/api';
import { getAddress, getFullAddress } from '@scalecube/utils';

const eAsyncModel: { [key: number]: MicroserviceApi.AsyncModel } = {
0: 'requestResponse',
1: 'requestStream',
};
const sAsyncModel: { [key: string]: keyof typeof eAsyncModel } = {
requestResponse: 0,
requestStream: 1,
};

export interface Endpoints {
[address: string]: {
[serviceName: string]: {
[methodName: string]: keyof typeof eAsyncModel;
};
};
}

export function minimized(endpoints: MicroserviceApi.Endpoint[]): Endpoints {
const res: Endpoints = {};
endpoints.forEach((e) => {
res[getFullAddress(e.address)] = res[getFullAddress(e.address)] || {};
res[getFullAddress(e.address)][e.serviceName] = res[getFullAddress(e.address)][e.serviceName] || {};
res[getFullAddress(e.address)][e.serviceName][e.methodName] = sAsyncModel[e.asyncModel];
});
return res;
}

export function restore(endpoints: Endpoints): MicroserviceApi.Endpoint[] {
const res: MicroserviceApi.Endpoint[] = [];
for (const address in endpoints) {
for (const service in endpoints[address]) {
for (const method in endpoints[address][service]) {
res.push({
asyncModel: eAsyncModel[endpoints[address][service][method]],
methodName: method,
serviceName: service,
address: getAddress(address),
qualifier: `${service}/${method}`,
});
}
}
}
return res;
}
1 change: 1 addition & 0 deletions packages/scalecube-microservice/tests/helper.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import EventEmitter = require('events');
// @ts-ignore
const myEmitter = new EventEmitter();

// @ts-ignore
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { restore, minimized } from '../../../src/Microservices/endpointsUtil';
import { AsyncModel } from '@scalecube/api/lib/microservice';

describe('endpointsUtil', () => {
test('Given endpoints when minimized & restore it should be the same', () => {
const endpoints = [
{
qualifier: 'GreetingService/hello',
serviceName: 'GreetingService',
methodName: 'hello',
asyncModel: 'requestResponse' as AsyncModel,
address: {
protocol: 'pm',
host: 'defaultHost',
port: 8080,
path: 'B',
},
},
{
qualifier: 'GreetingService/greet$',
serviceName: 'GreetingService',
methodName: 'greet$',
asyncModel: 'requestStream' as AsyncModel,
address: {
protocol: 'pm',
host: 'defaultHost',
port: 8080,
path: 'B',
},
},
];

expect(endpoints).toEqual(restore(minimized(endpoints)));
});
});
2 changes: 1 addition & 1 deletion packages/utils/src/address.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export const getAddress = (address: string): Address => {
address = buildAddress({ key: 'host', optionalValue: 'defaultHost', delimiter: ':', str: address, newAddress });
address = buildAddress({ key: 'port', optionalValue: 8080, delimiter: '/', str: address, newAddress });
newAddress.path = address;

newAddress.port = typeof newAddress.port === 'string' ? parseInt(newAddress.port, 10) : newAddress.port;
return newAddress as Address;
};

Expand Down