Skip to content

Commit

Permalink
Merge branch 'improvement/BB-536-support-per-site-probe-server-config…
Browse files Browse the repository at this point in the history
…' into q/8.7
  • Loading branch information
bert-e committed Oct 22, 2024
2 parents 76202ce + 6cd316e commit a1533a4
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 5 deletions.
7 changes: 5 additions & 2 deletions extensions/replication/ReplicationConfigValidator.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const fs = require('fs');
const joi = require('joi');
const { hostPortJoi, transportJoi, bootstrapListJoi, adminCredsJoi,
retryParamsJoi, probeServerJoi } =
retryParamsJoi, probeServerJoi, probeServerPerSite } =
require('../../lib/config/configItems.joi');

const qpRetryJoi = joi.object({
Expand Down Expand Up @@ -71,7 +71,10 @@ const joiSchema = joi.object({
concurrency: joi.number().greater(0).default(10),
mpuPartsConcurrency: joi.number().greater(0).default(10),
minMPUSizeMB: joi.number().greater(0).default(20),
probeServer: probeServerJoi.default(),
probeServer: joi.alternatives().try(
probeServerJoi,
probeServerPerSite,
).default({ bindAddress: 'localhost', port: 4042 }),
circuitBreaker: joi.object().optional(),
}).required(),
replicationStatusProcessor: {
Expand Down
4 changes: 2 additions & 2 deletions extensions/replication/queueProcessor/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const internalHttpsConfig = config.internalHttps;
const mConfig = config.metrics;
const { connectionString, autoCreateNamespace } = zkConfig;
const RESUME_NODE = 'scheduledResume';
const { startProbeServer } = require('../../../lib/util/probe');
const { startProbeServer, getProbeConfig } = require('../../../lib/util/probe');
const { DEFAULT_LIVE_ROUTE, DEFAULT_METRICS_ROUTE, DEFAULT_READY_ROUTE } =
require('arsenal').network.probe.ProbeServer;
const { sendSuccess } = require('arsenal').network.probe.Utils;
Expand Down Expand Up @@ -240,7 +240,7 @@ function initAndStart(zkClient) {
});

startProbeServer(
repConfig.queueProcessor.probeServer,
getProbeConfig(repConfig.queueProcessor, siteNames, log),
(err, probeServer) => {
if (err) {
log.fatal('error creating probe server', {
Expand Down
9 changes: 9 additions & 0 deletions lib/config/configItems.joi.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ const probeServerJoi = joi.object({
port: joi.number().required(),
});

const probeServerPerSite = joi.array().min(1).items(
joi.object({
bindAddress: joi.string().default('localhost'),
port: joi.number().required(),
site: joi.string().required()
})
);

const mongoJoi = joi.object({
replicaSetHosts: joi.string().default('localhost:27017'),
logName: joi.string().default('s3-recordlog'),
Expand Down Expand Up @@ -161,6 +169,7 @@ module.exports = {
retryParamsJoi,
certFilePathsJoi,
probeServerJoi,
probeServerPerSite,
stsConfigJoi,
mongoJoi,
qpKafkaJoi,
Expand Down
34 changes: 34 additions & 0 deletions lib/util/probe.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,42 @@ function observeKafkaStats(msg) {
kafkaMetrics.observe(JSON.parse(msg.message));
}

/**
* Get probe config will pull the configuration for the probe server based on
* the provided site name. If siteNames is empty, it returns the global probe server config
* only if it's a single object.
*
* @param {Object} queueProcessorConfig - Configuration of the queue processor that
* holds the probe server configs for all sites
* @param {Array<String>} siteNames - List of site names (should contain at most one element)
* @param {Logger} logger - Logger instance
* @returns {Object|undefined} Config for site or global config, undefined if no match found or invalid config
*/
function getProbeConfig(queueProcessorConfig, siteNames, logger) {
if (Array.isArray(queueProcessorConfig.probeServer)) {
if (siteNames.length !== 1) {
logger.error('Process configured for more than one site or no site provided', {
siteNames,
queueProcessorConfig,
});
return undefined;
}
const siteConfig = queueProcessorConfig.probeServer.find(config => config.site === siteNames[0]);
if (siteConfig === undefined) {
logger.warn('Probe server configuration for site not found', {
siteName: siteNames[0],
queueProcessorConfig,
});
}
return siteConfig;
}

return queueProcessorConfig.probeServer;
}

module.exports = {
startProbeServer,
startProbeServerPromise,
observeKafkaStats,
getProbeConfig,
};
74 changes: 73 additions & 1 deletion tests/unit/lib/util/probe.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const assert = require('assert');
const { startProbeServer } =
const { startProbeServer, getProbeConfig } =
require('../../../../lib/util/probe');
const Logger = require('werelogs').Logger;

describe('Probe server', () => {
it('is not created with no config', done => {
Expand All @@ -25,3 +26,74 @@ describe('Probe server', () => {
});
});
});

describe('getProbeConfig', () => {
const log = new Logger('getProbeConfig');
it('returns the probeServer config when siteNames is empty and probeServer is a single object', () => {
const queueProcessorConfig = {
probeServer: { bindAddress: '127.0.0.1', port: '8080' }
};
const siteNames = [];

const result = getProbeConfig(queueProcessorConfig, siteNames, log);
assert.deepStrictEqual(result, { bindAddress: '127.0.0.1', port: '8080' });
});

it('returns undefined when siteNames is empty and probeServer is not a single object', () => {
const queueProcessorConfig = {
probeServer: [{ site: 'site1', bindAddress: '127.0.0.1', port: '8080' }]
};
const siteNames = [];

const result = getProbeConfig(queueProcessorConfig, siteNames, log);
assert.strictEqual(result, undefined);
});

it('returns the correct site config when probeServer is an array and siteNames has one matching element', () => {
const queueProcessorConfig = {
probeServer: [
{ site: 'site1', bindAddress: '127.0.0.1', port: '8080' },
{ site: 'site2', bindAddress: '127.0.0.2', port: '8081' }
]
};
const siteNames = ['site2'];

const result = getProbeConfig(queueProcessorConfig, siteNames, log);
assert.deepStrictEqual(result, { site: 'site2', bindAddress: '127.0.0.2', port: '8081' });
});

it('returns undefined when probeServer is an array and siteNames has no matching element', () => {
const queueProcessorConfig = {
probeServer: [
{ site: 'site1', bindAddress: '127.0.0.1', port: '8080' }
]
};
const siteNames = ['site2'];

const result = getProbeConfig(queueProcessorConfig, siteNames, log);
assert.strictEqual(result, undefined);
});

it('returns undefined when siteNames contains more than one element', () => {
const queueProcessorConfig = {
probeServer: [
{ site: 'site1', bindAddress: '127.0.0.1', port: '8080' },
{ site: 'site2', bindAddress: '127.0.0.2', port: '8081' }
]
};
const siteNames = ['site1', 'site2']; // More than one element in siteNames

const result = getProbeConfig(queueProcessorConfig, siteNames, log);
assert.strictEqual(result, undefined);
});

it('returns probeserver when probeServer is not an array and siteNames is not empty', () => {
const queueProcessorConfig = {
probeServer: { bindAddress: '127.0.0.1', port: '8080' } // probeServer is a single object
};
const siteNames = ['site1']; // siteNames is not empty

const result = getProbeConfig(queueProcessorConfig, siteNames, log);
assert.deepStrictEqual(result, queueProcessorConfig.probeServer);
});
});

0 comments on commit a1533a4

Please sign in to comment.