From 86326e1dd0258a1c3deacb87407906b103ab69be Mon Sep 17 00:00:00 2001 From: Nicolas Humbert Date: Tue, 7 Nov 2023 11:24:10 +0100 Subject: [PATCH] BB-466 Dealing with expired zookeeper sessions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introducing the ZookeeperManager Class, which offers: * Connection Management: This class efficiently manages connections to the ZooKeeper server. It ensures continuity by handling reconnections whenever the session expires.
 * Error and State Management: The class handles errors and meticulously logs state changes. It is essential for debugging.
 * Abstraction Layer: It creates an abstraction over the database-specific code, simplifying the integration of application logic. This layer not only facilitates an easier transition to different databases in the future but also aids in simplifying the mocking and testing processes. --- .../lifecycle/conductor/LifecycleConductor.js | 8 +- .../queueProcessor/QueueProcessor.js | 8 +- lib/KafkaBacklogMetrics.js | 5 +- lib/api/BackbeatAPI.js | 18 +- lib/api/Healthcheck.js | 2 +- lib/clients/ZookeeperManager.js | 313 ++++++++++++++++++ lib/provisioning/ProvisionDispatcher.js | 14 +- lib/queuePopulator/QueuePopulator.js | 9 +- lib/queuePopulator/QueuePopulatorExtension.js | 7 +- tests/functional/lib/ZookeeperManager.js | 281 ++++++++++++++++ .../lifecycle/LifecycleConductor.spec.js | 10 +- .../notification/NotificationConfigManager.js | 23 +- tests/functional/utils/mockZookeeperClient.js | 12 + tests/unit/ProvisionDispatcher.spec.js | 49 ++- 14 files changed, 676 insertions(+), 83 deletions(-) create mode 100644 lib/clients/ZookeeperManager.js create mode 100644 tests/functional/lib/ZookeeperManager.js create mode 100644 tests/functional/utils/mockZookeeperClient.js diff --git a/extensions/lifecycle/conductor/LifecycleConductor.js b/extensions/lifecycle/conductor/LifecycleConductor.js index 7da16f584..bf3d61220 100644 --- a/extensions/lifecycle/conductor/LifecycleConductor.js +++ b/extensions/lifecycle/conductor/LifecycleConductor.js @@ -10,7 +10,7 @@ const BucketClient = require('bucketclient').RESTClient; const BackbeatProducer = require('../../../lib/BackbeatProducer'); const BackbeatTask = require('../../../lib/tasks/BackbeatTask'); -const zookeeperHelper = require('../../../lib/clients/zookeeper'); +const ZookeeperManager = require('../../../lib/clients/ZookeeperManager'); const KafkaBacklogMetrics = require('../../../lib/KafkaBacklogMetrics'); const { authTypeAssumeRole } = require('../../../lib/constants'); const VaultClientCache = require('../../../lib/clients/VaultClientCache'); @@ -493,9 +493,7 @@ class LifecycleConductor { process.nextTick(cb); return; } - this._zkClient = zookeeperHelper.createClient( - this.zkConfig.connectionString); - this._zkClient.connect(); + this._zkClient = new ZookeeperManager(this.zkConfig.connectionString, null, this.logger); this._zkClient.once('error', cb); this._zkClient.once('ready', () => { // just in case there would be more 'error' events @@ -552,7 +550,7 @@ class LifecycleConductor { // just in case there would be more 'error' events emitted this._kafkaBacklogMetrics.removeAllListeners('error'); this._kafkaBacklogMetrics.on('error', err => { - this._log.error('error from kafka topic metrics', { + this.logger.error('error from kafka topic metrics', { error: err.message, method: 'LifecycleConductor._initKafkaBacklogMetrics', }); diff --git a/extensions/notification/queueProcessor/QueueProcessor.js b/extensions/notification/queueProcessor/QueueProcessor.js index 5f9f616db..ece00ce39 100644 --- a/extensions/notification/queueProcessor/QueueProcessor.js +++ b/extensions/notification/queueProcessor/QueueProcessor.js @@ -7,7 +7,7 @@ const errors = require('arsenal').errors; const BackbeatConsumer = require('../../../lib/BackbeatConsumer'); const NotificationDestination = require('../destination'); -const zookeeper = require('../../../lib/clients/zookeeper'); +const ZookeeperManager = require('../../../lib/clients/ZookeeperManager'); const configUtil = require('../utils/config'); const messageUtil = require('../utils/message'); const NotificationConfigManager = require('../NotificationConfigManager'); @@ -72,10 +72,10 @@ class QueueProcessor extends EventEmitter { `${this.zkConfig.connectionString}${populatorZkPath}`; this.logger.info('opening zookeeper connection for reading ' + 'bucket notification configuration', { zookeeperUrl }); - this.zkClient = zookeeper.createClient(zookeeperUrl, { + this.zkClient = new ZookeeperManager(zookeeperUrl, { autoCreateNamespace: this.zkConfig.autoCreateNamespace, - }); - this.zkClient.connect(); + }, this.logger); + this.zkClient.once('error', done); this.zkClient.once('ready', () => { // just in case there would be more 'error' events emitted diff --git a/lib/KafkaBacklogMetrics.js b/lib/KafkaBacklogMetrics.js index fcaa4fefd..4704a7571 100644 --- a/lib/KafkaBacklogMetrics.js +++ b/lib/KafkaBacklogMetrics.js @@ -5,7 +5,7 @@ const zookeeper = require('node-zookeeper-client'); const Logger = require('werelogs').Logger; const { errors, metrics } = require('arsenal'); -const zookeeperHelper = require('./clients/zookeeper'); +const ZookeeperManager = require('./clients/ZookeeperManager'); const { readUInt64BE } = require('./util/buffer'); const { promMetricNames } = require('./constants').kafkaBacklogMetrics; @@ -58,8 +58,7 @@ class KafkaBacklogMetrics extends EventEmitter { } _initZookeeperClient() { - this._zookeeper = zookeeperHelper.createClient(this._zookeeperEndpoint); - this._zookeeper.connect(); + this._zookeeper = new ZookeeperManager(this._zookeeperEndpoint, null, this._log); this._zookeeper.on('error', err => { this.emit('error', err); }); diff --git a/lib/api/BackbeatAPI.js b/lib/api/BackbeatAPI.js index 0dd7ae0be..1c0fbe5a8 100644 --- a/lib/api/BackbeatAPI.js +++ b/lib/api/BackbeatAPI.js @@ -1,7 +1,6 @@ 'use strict'; // eslint-disable-line strict const async = require('async'); -const zookeeper = require('node-zookeeper-client'); const { errors } = require('arsenal'); const { RedisClient } = require('arsenal').metrics; @@ -17,6 +16,7 @@ const Healthcheck = require('./Healthcheck'); const routes = require('./routes'); const { getSortedSetKey, getSortedSetMember } = require('../util/sortedSetHelper'); +const ZookeeperManager = require('../clients/ZookeeperManager'); // StatsClient constant defaults // TODO: This should be moved to constants file @@ -182,15 +182,6 @@ class BackbeatAPI { && this._crrStatusProducer.isReady(); } - /** - * Check if Zookeeper and Producer are connected - * @return {boolean} true/false - */ - isConnected() { - return this._zkClient.getState().name === 'SYNC_CONNECTED' - && this._checkProducersReady(); - } - /** * Get Kafka healthcheck * @param {object} details - route details from lib/api/routes.js @@ -1079,13 +1070,12 @@ class BackbeatAPI { const zookeeperUrl = `${this._zkConfig.connectionString}${populatorZkPath}`; - const zkClient = zookeeper.createClient(zookeeperUrl, { + const zkClient = new ZookeeperManager(zookeeperUrl, { autoCreateNamespace: this._zkConfig.autoCreateNamespace, - }); - zkClient.connect(); + }, this._logger); zkClient.once('error', cb); - zkClient.once('connected', () => { + zkClient.once('ready', () => { zkClient.removeAllListeners('error'); this._zkClient = zkClient; return cb(); diff --git a/lib/api/Healthcheck.js b/lib/api/Healthcheck.js index 43341df10..4d551db37 100644 --- a/lib/api/Healthcheck.js +++ b/lib/api/Healthcheck.js @@ -11,7 +11,7 @@ class Healthcheck { /** * @constructor * @param {object} repConfig - extensions.replication configs - * @param {node-zookeeper-client.Client} zkClient - zookeeper client + * @param {ZookeeperManager} zkClient - zookeeper client manager * @param {BackbeatProducer} crrProducer - producer for CRR topic * @param {BackbeatProducer} crrStatusProducer - CRR status producer * @param {BackbeatProducer} metricProducer - producer for metric diff --git a/lib/clients/ZookeeperManager.js b/lib/clients/ZookeeperManager.js new file mode 100644 index 000000000..8c1523137 --- /dev/null +++ b/lib/clients/ZookeeperManager.js @@ -0,0 +1,313 @@ +const zookeeper = require('node-zookeeper-client'); +const EventEmitter = require('events'); +const async = require('async'); + +class ZookeeperManager extends EventEmitter { + /** + * Constructs an instance of the ZookeeperManager. + * + * @param {string} connectionString - The connection string for the ZooKeeper server. + * @param {Object} options - Configuration options for the ZooKeeper client. + * @param {Logger} log - An instance of a logging utility. + */ + constructor(connectionString, options, log) { + super(); + + this.connectionString = connectionString; + this.log = log; + this.options = options; + this.client = null; + + this._connect(); + } + + /** + * Establishes a connection to the ZooKeeper server. + * + * This method initializes a new ZooKeeper client using the provided connection string + * and options. It sets up event listeners for various client states such as 'connected', + * 'disconnected', and 'expired'. The method handles reconnection logic in case of + * client expiration and forwards ZooKeeper events through this class's EventEmitter interface. + * + * If 'autoCreateNamespace' is set in the options and the base path does not exist in ZooKeeper, + * this method tries to create the necessary path (namespace) in ZooKeeper. + * + * @method _connect + * @return {undefined} + */ + _connect() { + // clean up exists client before reconnect + if (this.client) { + this.client.removeAllListeners(); + } + + this.client = zookeeper.createClient(this.connectionString, this.options); + + this.client.once('connected', () => { + // TODO: ARTESCA-10337 The 'autoCreateNamespace' functionality is currently specific to + // Artesca and may be removed in future versions once the Zenko Operator can handle base path creation. + // Once removed, we can simply rely on the 'connected' state instead of the 'ready' state and + // stop listening on 'error' event. + if (this.options && this.options.autoCreateNamespace) { + // for some reason this.client.exists() does not return + // NO_NODE when base path does not exist, hence use + // getData() instead + this.getData('/', err => { + if (err && err.name !== 'NO_NODE') { + this.emit('error', err); + return; + } + // NO_NODE error and autoCreateNamespace is enabled + if (err) { + const nsIndex = this.connectionString.indexOf('/'); + const hostPort = this.connectionString.slice(0, nsIndex); + const namespace = this.connectionString.slice(nsIndex); + const rootZkClient = zookeeper.createClient(hostPort, this.options); + rootZkClient.connect(); + rootZkClient.mkdirp(namespace, err => { + if (err && err.name !== 'NODE_EXISTS') { + this.emit('error'); + return; + } + this.emit('ready'); + return; + }); + return; + } + this.emit('ready'); + return; + }); + return; + } + this.emit('ready'); + return; + }); + + this.client.once('expired', () => { + this.log.info('zookeeper client expired', { + method: 'ZookeeperManager.once.expired', + }); + // close and clean up the existing ZooKeeper client connection. + this.close(); + // establish a new session with the ZooKeeper. + this._connect(); + }); + + this.client.on('state', (state) => { + this.log.debug('zookeeper new state', { + state, + method: 'ZookeeperManager.on.state', + }); + }); + + // Forward ZooKeeper events + this.client.on('connected', () => { + this.emit('connected'); + }); + + this.client.on('disconnected', () => { + this.emit('disconnected'); + }); + + this.client.connect(); + } + + // Forward ZooKeeper methods + + /** + * Shutdown the client. + * @method close + * @return {undefined} + */ + close() { + this.client.close(); + return; + } + + /** + * Create a node with given path, data, acls and mode. + * + * @method create + * @param {String} path The node path. + * @param {Buffer} [data=undefined] The data buffer. + * @param {Array} [acls=ACL.OPEN_ACL_UNSAFE] An array of ACL object. + * @param {CreateMode} [mode=CreateMode.PERSISTENT] The creation mode. + * @param {Function} callback The callback function. + * @return {undefined} + */ + create(path, data, acls, mode, callback) { + this.client.create(path, data, acls, mode, callback); + return; + } + + /** + * Check the existence of a node. The callback will be invoked with the + * stat of the given path, or null if node such node exists. + * + * If the watcher function is provided and the call is successful (no error + * from callback), a watcher will be placed on the node with the given path. + * The watcher will be triggered by a successful operation that creates/delete + * the node or sets the data on the node. + * + * @method exists + * @param {String} path - The node path. + * @param {Function} [watcher] - The watcher function. + * @param {Function} callback - The callback function. + * @return {undefined} + */ + exists(path, watcher, callback) { + this.client.exists(path, watcher, callback); + return; + } + + /** + * For the given node path, retrieve the children list and the stat. + * + * If the watcher callback is provided and the method completes successfully, + * a watcher will be placed the given node. The watcher will be triggered + * when an operation successfully deletes the given node or creates/deletes + * the child under it. + * + * @method getChildren + * @param {String} path - The node path. + * @param {Function} [watcher] - The watcher function. + * @param {Function} callback - The callback function. + * @return {undefined} + */ + getChildren(path, watcher, callback) { + this.client.getChildren(path, watcher, callback); + return; + } + + /** + * + * Retrieve the data and the stat of the node of the given path. + * + * If the watcher is provided and the call is successful (no error), a watcher + * will be left on the node with the given path. + * + * The watch will be triggered by a successful operation that sets data on + * the node, or deletes the node. + * + * @method getData + * @param {String} path - The node path. + * @param {Function} [watcher] - The watcher function. + * @param {Function} callback - The callback function. + * @return {undefined} + */ + getData(path, watcher, callback) { + this.client.getData(path, watcher, callback); + return; + } + + /** + * Returns the state of the client. + * + * @method getState + * @return {State} the state of the client. + */ + getState() { + return this.client.getState(); + } + + /** + * Create node path in the similar way of `mkdir -p` + * + * + * @method mkdirp + * @param {String} path - The node path. + * @param {Buffer} [data=undefined] - The data buffer. + * @param {Array} [acls=ACL.OPEN_ACL_UNSAFE] - The array of ACL object. + * @param {CreateMode} [mode=CreateMode.PERSISTENT] - The creation mode. + * @param {Function} callback - The callback function. + * @return {undefined} + */ + mkdirp(path, data, acls, mode, callback) { + this.client.mkdirp(path, data, acls, mode, callback); + return; + } + + /** + * Delete a node with the given path. If version is not -1, the request will + * fail when the provided version does not match the server version. + * + * @method remove + * @param {String} path - The node path. + * @param {Number} [version=-1] - The version of the node. + * @param {Function} callback - The callback function. + * @return {undefined} + */ + remove(path, version, callback) { + this.client.remove(path, version, callback); + return; + } + + /** + * Set the data for the node of the given path if such a node exists and the + * optional given version matches the version of the node (if the given + * version is -1, it matches any node's versions). + * + * @method setData + * @param {String} path - The node path. + * @param {Buffer} data - The data buffer. + * @param {Number} [version=-1] - The version of the node. + * @param {Function} callback - The callback function. + * @return {undefined} + */ + setData(path, data, version, callback) { + this.client.setData(path, data, version, callback); + return; + } + + // Custom methods + /** + * Recursively removes a node and all of its children from ZooKeeper. + * + * @method removeRecur + * @param {String} path - The path of the node to remove. + * @param {Function} cb - The callback function. + * @return {undefined} + */ + removeRecur(path, cb) { + return async.waterfall([ + next => this.getChildren(path, next), + (children, stat, next) => async.eachLimit( + children, 2, + (child, done) => this.removeRecur(`${path}/${child}`, done), + next), + next => this.remove(path, -1, next), + ], cb); + } + + /** + * Sets the data for a node or creates the node if it does not exist. + * + * This method attempts to set the data for a node at the specified path. + * If the node does not exist (NO_NODE error), it creates the path with + * the provided data. This is similar to a 'create or update' operation. + * + * @param {String} path - The path of the node. + * @param {Buffer} data - Data to set on the node. + * @param {Function} cb - Callback function. Called with an error argument + * if an error occurs, otherwise null. + * @return {undefined} + */ + setOrCreate(path, data, cb) { + this.setData(path, data, err => { + if (err) { + if (err.getCode() === zookeeper.Exception.NO_NODE) { + return this.mkdirp(path, err => { + if (err) { + return cb(err); + } + return this.setData(path, data, cb); + }); + } + return cb(err); + } + return cb(); + }); + } +} + +module.exports = ZookeeperManager; diff --git a/lib/provisioning/ProvisionDispatcher.js b/lib/provisioning/ProvisionDispatcher.js index 83810e4f2..70e7fa308 100644 --- a/lib/provisioning/ProvisionDispatcher.js +++ b/lib/provisioning/ProvisionDispatcher.js @@ -4,6 +4,7 @@ const async = require('async'); const crypto = require('crypto'); const path = require('path'); const zookeeper = require('node-zookeeper-client'); +const ZookeeperManager = require('../clients/ZookeeperManager'); const Logger = require('werelogs').Logger; @@ -28,20 +29,19 @@ class ProvisionDispatcher { * @param {object} zkConfig - zookeeper config object * @param {string} zkConfig.connectionString - zookeeper connection string * (e.g. "localhost:2181/shared-tasks") - * @param {string} [_zookeeper] - zookeeper module */ - constructor(zkConfig, _zookeeper) { + constructor(zkConfig) { + this._log = new Logger('Backbeat:ProvisionDispatcher'); + this._zkEndpoint = zkConfig.connectionString; - this._zookeeper = (_zookeeper !== undefined) ? _zookeeper : zookeeper; - this._doRandDelay = false; - this._client = this._zookeeper.createClient(zkConfig.connectionString); + this._client = new ZookeeperManager(zkConfig.connectionString, null, this._log); + this._connectWaitList = []; this._client.once('connected', () => { this._log.debug('connected to the ZK server'); this._connectWaitList.forEach(cb => cb()); this._connectWaitList = null; }); - this._client.connect(); this._myName = null; this._myLeaderName = null; this._isLeader = false; @@ -51,8 +51,6 @@ class ProvisionDispatcher { this._redispatchInProgress = false; this._redoRedispatch = false; this._interval = -1; - - this._log = new Logger('Backbeat:ProvisionDispatcher'); } /** diff --git a/lib/queuePopulator/QueuePopulator.js b/lib/queuePopulator/QueuePopulator.js index 39d755571..8f05388e5 100644 --- a/lib/queuePopulator/QueuePopulator.js +++ b/lib/queuePopulator/QueuePopulator.js @@ -1,6 +1,6 @@ const { each, series } = require('async'); const Logger = require('werelogs').Logger; -const zookeeper = require('../clients/zookeeper'); +const ZookeeperManager = require('../clients/ZookeeperManager'); const { State: ZKState } = require('node-zookeeper-client'); const ProvisionDispatcher = require('../provisioning/ProvisionDispatcher'); const RaftLogReader = require('./RaftLogReader'); @@ -280,10 +280,11 @@ class QueuePopulator { const populatorZkPath = this.qpConfig.zookeeperPath; const zookeeperUrl = `${this.zkConfig.connectionString}${populatorZkPath}`; this.log.info('opening zookeeper connection for persisting populator state', { zookeeperUrl }); - this.zkClient = zookeeper.createClient(zookeeperUrl, { + + this.zkClient = new ZookeeperManager(zookeeperUrl, { autoCreateNamespace: this.zkConfig.autoCreateNamespace, - }); - this.zkClient.connect(); + }, this.log); + this.zkClient.once('error', done); this.zkClient.once('ready', () => { // just in case there would be more 'error' events emitted diff --git a/lib/queuePopulator/QueuePopulatorExtension.js b/lib/queuePopulator/QueuePopulatorExtension.js index 553bbb280..6eca6be5b 100644 --- a/lib/queuePopulator/QueuePopulatorExtension.js +++ b/lib/queuePopulator/QueuePopulatorExtension.js @@ -1,6 +1,6 @@ const assert = require('assert'); -const zookeeper = require('../clients/zookeeper'); +const ZookeeperManager = require('../clients/ZookeeperManager'); class QueuePopulatorExtension { /** @@ -40,10 +40,9 @@ class QueuePopulatorExtension { this.log.info('opening zookeeper connection for populator extensions', { zookeeperUrl: connectionString, }); - this.zkClient = zookeeper.createClient(connectionString, { + this.zkClient = new ZookeeperManager(connectionString, { autoCreateNamespace, - }); - this.zkClient.connect(); + }, this.log); this.zkClient.once('error', cb); this.zkClient.once('ready', () => { // just in case there would be more 'error' events emitted diff --git a/tests/functional/lib/ZookeeperManager.js b/tests/functional/lib/ZookeeperManager.js new file mode 100644 index 000000000..b38fe28c8 --- /dev/null +++ b/tests/functional/lib/ZookeeperManager.js @@ -0,0 +1,281 @@ +const assert = require('assert'); +const werelogs = require('werelogs'); +const async = require('async'); + +const ZookeeperManager = require('../../../lib/clients/ZookeeperManager'); +const zookeeper = require('node-zookeeper-client'); + +const log = new werelogs.Logger('ZookeeperManager:test'); + +// simulateSessionExpired is designed to artificially trigger a session expiration in a ZooKeeper client +function simulateSessionExpired(client) { + client.connectionManager.setState(-3); // SESSION_EXPIRED EVENT CODE + client.connectionManager.socket.emit('close'); +} + +describe('ZookeeperManager', () => { + let zkClient; + + afterEach(() => { + // Clean up and reset the ZookeeperManager + zkClient.removeAllListeners(); + zkClient.close(); + }); + + describe('with autoCreateNamespace', () => { + const basePath = '/hello/world'; + let rootZkClient; + + before(done => { + rootZkClient = zookeeper.createClient('localhost:2181'); + rootZkClient.connect(); + rootZkClient.once('connected', () => done()); + }); + + after(() => rootZkClient.close()); + + afterEach(done => rootZkClient.remove(basePath, err => { + if (err && err.name !== 'NO_NODE') { + return done(err); + } + return done(); + })); + + it('should create the base path if autoCreateNamespace set to true', done => { + const connectionString = `localhost:2181${basePath}`; + const options = { autoCreateNamespace: true }; + zkClient = new ZookeeperManager(connectionString, options, log); + zkClient.once('ready', () => { + // makes sure the base path exists + rootZkClient.getData(basePath, err => { + assert.ifError(err); + + // makes sure no NO_NODE error is returned. + zkClient.getData('/', err => { + assert.ifError(err); + done(); + }); + }); + }); + }); + + it('should not create the base path if autoCreateNamespace set to false', done => { + const connectionString = `localhost:2181${basePath}`; + const options = { autoCreateNamespace: false }; + zkClient = new ZookeeperManager(connectionString, options, log); + zkClient.once('ready', () => { + rootZkClient.getData(basePath, err => { + assert(err, 'Expected an error to be returned'); + assert.strictEqual(err.name, 'NO_NODE'); + + zkClient.getData('/', err => { + assert(err, 'Expected an error to be returned'); + assert.strictEqual(err.name, 'NO_NODE'); + done(); + }); + }); + }); + }); + }); + + describe('testing connection events', () => { + beforeEach(done => { + const connectionString = 'localhost:2181'; + const options = {}; + zkClient = new ZookeeperManager(connectionString, options, log); + zkClient.once('connected', () => done()); + }); + + it('should trigger the disconnected event after closing the connection', done => { + zkClient.on('disconnected', () => done()); + zkClient.close(); + }); + + it('should handle "expired" event and reconnect', done => { + const originalClient = zkClient.client; + simulateSessionExpired(originalClient); + assert.notStrictEqual(zkClient.client, originalClient); + // the original connection should be disconnected + originalClient.getData('/', err => { + assert(err, 'Expected an error to be returned'); + assert.strictEqual(err.name, 'CONNECTION_LOSS'); + + // the client should have reconnected and work + zkClient.getData('/', err => { + assert.ifError(err); + done(); + }); + }); + }); + + it('should close the client connection', done => { + zkClient.close(); + zkClient.getData('/', err => { + assert(err, 'Expected an error to be returned'); + assert.strictEqual(err.name, 'CONNECTION_LOSS'); + done(); + }); + }); + }); + + describe('CRUD', () => { + const path = '/testNode'; + beforeEach(done => { + const connectionString = 'localhost:2181'; + const options = {}; + zkClient = new ZookeeperManager(connectionString, options, log); + zkClient.on('connected', () => done()); + }); + + afterEach(done => { + zkClient.removeRecur(path, err => { + if (err && err.name !== 'NO_NODE') { + return done(err); + } + return done(); + }); + }); + + it('should create a node with a given data', done => { + const expectedValue = 'testData'; + const data = Buffer.from(expectedValue); + zkClient.create(path, data, err => { + assert.ifError(err); + zkClient.getData(path, (err, data) => { + assert.ifError(err); + assert.strictEqual(data.toString(), expectedValue); + done(); + }); + }); + }); + + it('should create a node and check that it exists', done => { + zkClient.create(path, err => { + assert.ifError(err); + zkClient.exists(path, (err, stat) => { + assert.ifError(err); + assert(stat); + done(); + }); + }); + }); + + it('should check that a node does not exist', done => { + zkClient.exists(path, (err, stat) => { + assert.ifError(err); + assert(!stat); + done(); + }); + }); + + it('should create intermediate node', done => { + const complexPath = '/testNode/second'; + const expectedValue = 'testData'; + const data = Buffer.from(expectedValue); + zkClient.mkdirp(complexPath, data, (err, returnedPath) => { + assert.ifError(err); + assert.strictEqual(returnedPath, complexPath); + zkClient.getChildren('/testNode', (err, children) => { + assert.ifError(err); + assert.strictEqual(children.length, 1); + const child = children[0]; + assert.strictEqual(child, 'second'); + done(); + }); + }); + }); + + it('should create a node and set, get data and then remove the node', done => { + const expectedValue = 'testData'; + async.series([ + next => zkClient.create(path, err => { + assert.ifError(err); + next(); + }), + next => zkClient.setData(path, Buffer.from(expectedValue), err => { + assert.ifError(err); + next(); + }), + next => zkClient.getData(path, (err, data) => { + assert.ifError(err); + assert.strictEqual(data.toString(), expectedValue); + next(); + }), + next => zkClient.remove(path, err => { + assert.ifError(err); + next(); + }), + next => zkClient.getData(path, err => { + assert(err, 'Expected an error to be returned'); + assert.strictEqual(err.name, 'NO_NODE'); + next(); + }), + ], done); + }); + + it('should return the client state', () => { + assert.strictEqual(zkClient.getState(), zookeeper.State.SYNC_CONNECTED); + }); + }); + + describe('removeRecur', () => { + beforeEach(done => { + const connectionString = 'localhost:2181'; + const options = {}; + zkClient = new ZookeeperManager(connectionString, options, log); + zkClient.on('connected', () => { + zkClient.mkdirp('/to/be/recursively/removed', err => done(err)); + }); + }); + + it('should recursively remove a node and all of its children', done => { + zkClient.removeRecur('/to', err => { + assert.ifError(err); + zkClient.getData('/to', err => { + assert(err, 'Expected an error to be returned'); + assert.strictEqual(err.name, 'NO_NODE'); + done(); + }); + }); + }); + }); + + describe('setOrCreate', () => { + const alreadyCreatedNodePath = '/to/set/data/in'; + beforeEach(done => { + const connectionString = 'localhost:2181'; + const options = {}; + zkClient = new ZookeeperManager(connectionString, options, log); + zkClient.on('connected', () => zkClient.mkdirp(alreadyCreatedNodePath, err => done(err))); + }); + + afterEach(done => zkClient.removeRecur('/to', err => done(err))); + + it('should create the node since it does not exist', done => { + const expectedValue = 'val'; + const data = Buffer.from(expectedValue); + const nodePath = '/to/be/created'; + zkClient.setOrCreate(nodePath, data, err => { + assert.ifError(err); + zkClient.getData(nodePath, (err, data) => { + assert.ifError(err); + assert.strictEqual(data.toString(), expectedValue); + done(err); + }); + }); + }); + + it('should set the data for a node that already exists', done => { + const expectedValue = 'val'; + const data = Buffer.from(expectedValue); + zkClient.setOrCreate(alreadyCreatedNodePath, data, err => { + assert.ifError(err); + zkClient.getData(alreadyCreatedNodePath, (err, data) => { + assert.ifError(err); + assert.strictEqual(data.toString(), expectedValue); + done(err); + }); + }); + }); + }); +}); diff --git a/tests/functional/lifecycle/LifecycleConductor.spec.js b/tests/functional/lifecycle/LifecycleConductor.spec.js index 51f28aed0..466d8a3b5 100644 --- a/tests/functional/lifecycle/LifecycleConductor.spec.js +++ b/tests/functional/lifecycle/LifecycleConductor.spec.js @@ -6,7 +6,7 @@ const http = require('http'); const url = require('url'); const werelogs = require('werelogs'); -const zookeeper = require('../../../lib/clients/zookeeper'); +const ZookeeperManager = require('../../../lib/clients/ZookeeperManager'); const BackbeatTestConsumer = require('../../utils/BackbeatTestConsumer'); const LifecycleConductor = require( '../../../extensions/lifecycle/conductor/LifecycleConductor'); @@ -107,6 +107,7 @@ const TIMEOUT = 120000; const CONSUMER_TIMEOUT = 60000; werelogs.configure({ level: 'info', dump: 'error' }); +const log = new werelogs.Logger('LifecycleConductor:test'); describe('lifecycle conductor', function lifecycleConductor() { this.timeout(TIMEOUT); @@ -427,10 +428,11 @@ describe('lifecycle conductor', function lifecycleConductor() { }, next => { if (setupZookeeper) { - zkClient = zookeeper.createClient( + zkClient = new ZookeeperManager( zkConfig.zookeeper.connectionString, - zkConfig.zookeeper); - zkClient.connect(); + zkConfig.zookeeper, + log + ); zkClient.once('ready', () => { lcConductor.initZkPaths(next); }); diff --git a/tests/functional/notification/NotificationConfigManager.js b/tests/functional/notification/NotificationConfigManager.js index ec91da8cf..261eeb225 100644 --- a/tests/functional/notification/NotificationConfigManager.js +++ b/tests/functional/notification/NotificationConfigManager.js @@ -1,11 +1,14 @@ const assert = require('assert'); const async = require('async'); const werelogs = require('werelogs'); -const ZookeeperMock = require('zookeeper-mock'); +const sinon = require('sinon'); const NotificationConfigManager = require('../../../extensions/notification/NotificationConfigManager'); +const ZookeeperManager = require('../../../lib/clients/ZookeeperManager'); +const mockZookeeperClient = require('../utils/mockZookeeperClient'); + const logger = new werelogs.Logger('NotificationConfigManager:test'); const zkConfigParentNode = 'config'; const concurrency = 10; @@ -63,15 +66,9 @@ function deleteTestConfigs(zkClient, cb) { } describe('NotificationConfigManager multiple managers functional tests', () => { - const zk = new ZookeeperMock({ doLog: false }); - const zkClient = zk.createClient(); - const params = { - zkClient, - parentNode: zkConfigParentNode, - logger, - }; let configManager1 = null; let configManager2 = null; + let zkClient; function checkCount() { const buckets1 = configManager1.getBucketsWithConfigs(); @@ -86,6 +83,14 @@ describe('NotificationConfigManager multiple managers functional tests', () => { } beforeEach(done => { + mockZookeeperClient(); + const fakeEndpoint = 'fake.endpoint:2181'; + zkClient = new ZookeeperManager(fakeEndpoint, {}, logger); + const params = { + zkClient, + parentNode: zkConfigParentNode, + logger, + }; configManager1 = new NotificationConfigManager(params); configManager2 = new NotificationConfigManager(params); populateTestConfigs(zkClient, 5, () => { @@ -97,7 +102,7 @@ describe('NotificationConfigManager multiple managers functional tests', () => { }); afterEach(() => { - zk._resetState(); + sinon.restore(); }); it('managers should have the same config values after init', done => { diff --git a/tests/functional/utils/mockZookeeperClient.js b/tests/functional/utils/mockZookeeperClient.js new file mode 100644 index 000000000..a92ca875e --- /dev/null +++ b/tests/functional/utils/mockZookeeperClient.js @@ -0,0 +1,12 @@ +const ZookeeperMock = require('zookeeper-mock'); +const sinon = require('sinon'); +const zookeeper = require('node-zookeeper-client'); + +function mockZookeeperClient() { + const endpoint = 'fake.endpoint:2181'; + const zk = new ZookeeperMock({ doLog: false, maxRandomDelay: 100 }); + const client = zk.createClient(endpoint); + sinon.stub(zookeeper, 'createClient').returns(client); +} + +module.exports = mockZookeeperClient; diff --git a/tests/unit/ProvisionDispatcher.spec.js b/tests/unit/ProvisionDispatcher.spec.js index 7d06bdafa..26d41e33b 100644 --- a/tests/unit/ProvisionDispatcher.spec.js +++ b/tests/unit/ProvisionDispatcher.spec.js @@ -2,39 +2,34 @@ const assert = require('assert'); const async = require('async'); -const ZookeeperMock = require('zookeeper-mock'); const jsutil = require('arsenal').jsutil; const ProvisionDispatcher = require('../../lib/provisioning/ProvisionDispatcher'); +const mockZookeeperClient = require('../functional/utils/mockZookeeperClient'); -const ZK_TEST_PATH = '/tests/prov-test'; +const sinon = require('sinon'); describe('provision dispatcher based on zookeeper recipes', function testDispatch() { - const zkConf = { connectionString: `localhost:2181${ZK_TEST_PATH}` }; + const endpoint = 'fake.endpoint:2181'; + const zkConf = { connectionString: endpoint }; const provisionList = ['0', '1', '2', '3', '4', '5', '6', '7']; let clients = []; this.timeout(60000); - const zk = new ZookeeperMock({ - doLog: false, - maxRandomDelay: 100, + before(done => { + mockZookeeperClient(); + const prov = new ProvisionDispatcher( + zkConf); + prov.addProvisions(provisionList, done); }); - before(done => { - const zkClient = zk.createClient('localhost:2181'); - zkClient.connect(); - zkClient.on('connected', () => { - zkClient.mkdirp(ZK_TEST_PATH, err => { - assert.ifError(err); - const prov = new ProvisionDispatcher( - zkConf, zk); - prov.addProvisions(provisionList, done); - }); - }); + after(() => { + sinon.restore(); }); + afterEach(done => { async.each(clients, (client, cb) => { if (client !== undefined) { @@ -52,7 +47,7 @@ function testDispatch() { it('should be given all provisions when alone', done => { const cbOnce = jsutil.once(done); clients[0] = new ProvisionDispatcher( - zkConf, zk); + zkConf); clients[0].subscribe((err, items) => { assert.ifError(err); assert.deepStrictEqual(items, provisionList); @@ -63,7 +58,7 @@ function testDispatch() { it('should recheck if missing a watcher event', done => { const cbOnce = jsutil.once(done); clients[0] = new ProvisionDispatcher( - zkConf, zk); + zkConf); let times = 0; clients[0].subscribe((err, items) => { assert.ifError(err); @@ -72,14 +67,14 @@ function testDispatch() { assert(items.length === 8); // simulate a watcher event loss const myPath = - clients[0]._client._basePath + + clients[0]._client.client._basePath + clients[0]._getMyPath(); - const result = clients[0]._client._getZNode(myPath); + const result = clients[0]._client.client._getZNode(myPath); assert.ifError(result.err); result.parent.children[result.baseName].emitter.removeAllListeners(); // introduce a new client clients[1] = new ProvisionDispatcher( - zkConf, zk); + zkConf); clients[1].subscribe(err => { assert.ifError(err); }, false); @@ -100,7 +95,7 @@ function testDispatch() { it('should not notify if provisions have not changed', done => { clients[0] = new ProvisionDispatcher( - zkConf, zk); + zkConf); let times = 0; clients[0].subscribe((err, items) => { assert.ifError(err); @@ -109,9 +104,9 @@ function testDispatch() { assert(items.length === 8); // simulate a watcher event loss const myPath = - clients[0]._client._basePath + + clients[0]._client.client._basePath + clients[0]._getMyPath(); - const result = clients[0]._client._getZNode(myPath); + const result = clients[0]._client.client._getZNode(myPath); assert.ifError(result.err); result.parent.children[result.baseName].emitter.removeAllListeners(); // leave enough time for the checker to trigger and make sure it does not notify @@ -160,7 +155,7 @@ function testDispatch() { }, false); } for (let i = 0; i < 10; ++i) { - clients[i] = new ProvisionDispatcher(zkConf, zk); + clients[i] = new ProvisionDispatcher(zkConf); } // register clients with a random wait time for each for (let i = 0; i < 10; ++i) { @@ -209,7 +204,7 @@ function testDispatch() { }, false); } for (let i = 0; i < 10; ++i) { - clients[i] = new ProvisionDispatcher(zkConf, zk); + clients[i] = new ProvisionDispatcher(zkConf); } // register clients with a random wait time for each for (let i = 0; i < 10; ++i) {