diff --git a/src/broker/index.js b/src/broker/index.js index 2fb6c5e4f..bb86cc5d5 100644 --- a/src/broker/index.js +++ b/src/broker/index.js @@ -77,8 +77,8 @@ module.exports = class Broker { * @returns {Promise} */ async connect() { + await this.lock.acquire() try { - await this.lock.acquire() if (this.isConnected()) { return } diff --git a/src/network/requestQueue/index.js b/src/network/requestQueue/index.js index af417b342..eee14d66c 100644 --- a/src/network/requestQueue/index.js +++ b/src/network/requestQueue/index.js @@ -9,6 +9,7 @@ const PRIVATE = { } const REQUEST_QUEUE_EMPTY = 'requestQueueEmpty' +const CHECK_PENDING_REQUESTS_INTERVAL = 10 module.exports = class RequestQueue extends EventEmitter { /** @@ -102,23 +103,14 @@ module.exports = class RequestQueue extends EventEmitter { } maybeThrottle(clientSideThrottleTime) { - if (clientSideThrottleTime) { + if (clientSideThrottleTime !== null && clientSideThrottleTime > 0) { + this.logger.debug(`Client side throttling in effect for ${clientSideThrottleTime}ms`) const minimumThrottledUntil = Date.now() + clientSideThrottleTime this.throttledUntil = Math.max(minimumThrottledUntil, this.throttledUntil) } } - /** - * @typedef {Object} PushedRequest - * @property {import("./socketRequest").RequestEntry} entry - * @property {boolean} expectResponse - * @property {Function} sendRequest - * @property {number} [requestTimeout] - * - * @public - * @param {PushedRequest} pushedRequest - */ - push(pushedRequest) { + createSocketRequest(pushedRequest) { const { correlationId } = pushedRequest.entry const defaultRequestTimeout = this.requestTimeout const customRequestTimeout = pushedRequest.requestTimeout @@ -149,6 +141,23 @@ module.exports = class RequestQueue extends EventEmitter { }, }) + return socketRequest + } + + /** + * @typedef {Object} PushedRequest + * @property {import("./socketRequest").RequestEntry} entry + * @property {boolean} expectResponse + * @property {Function} sendRequest + * @property {number} [requestTimeout] + * + * @public + * @param {PushedRequest} pushedRequest + */ + push(pushedRequest) { + const { correlationId } = pushedRequest.entry + const socketRequest = this.createSocketRequest(pushedRequest) + if (this.canSendSocketRequestImmediately()) { this.sendSocketRequest(socketRequest) return @@ -300,12 +309,15 @@ module.exports = class RequestQueue extends EventEmitter { // will be fine, and potentially fix up a new timeout if needed at that time. // Note that if we're merely "overloaded" by having too many inflight requests // we will anyways check the queue when one of them gets fulfilled. - const timeUntilUnthrottled = this.throttledUntil - Date.now() - if (timeUntilUnthrottled > 0 && !this.throttleCheckTimeoutId) { + let scheduleAt = this.throttledUntil - Date.now() + if (!this.throttleCheckTimeoutId) { + if (this.pending.length > 0) { + scheduleAt = scheduleAt > 0 ? scheduleAt : CHECK_PENDING_REQUESTS_INTERVAL + } this.throttleCheckTimeoutId = setTimeout(() => { this.throttleCheckTimeoutId = null this.checkPendingRequests() - }, timeUntilUnthrottled) + }, scheduleAt) } } } diff --git a/src/network/requestQueue/index.spec.js b/src/network/requestQueue/index.spec.js index 62e84587e..238fbf310 100644 --- a/src/network/requestQueue/index.spec.js +++ b/src/network/requestQueue/index.spec.js @@ -211,6 +211,28 @@ describe('Network > RequestQueue', () => { expect(sentAt).toBeGreaterThanOrEqual(before + clientSideThrottleTime) }) + it('ensure request is sent when client-side throttling delay is marginal', async () => { + const sendDone = new Promise(resolve => { + request.sendRequest = () => { + resolve(Date.now()) + } + }) + + expect(requestQueue.canSendSocketRequestImmediately()).toBe(true) + const socketRequest = requestQueue.createSocketRequest(request) + requestQueue.pending.push(socketRequest) + + const before = Date.now() + const clientSideThrottleTime = 1 + requestQueue.maybeThrottle(clientSideThrottleTime) + // Sleep until the marginal delay is passed before calling scheduleCheckPendingRequests() + await sleep(clientSideThrottleTime) + requestQueue.scheduleCheckPendingRequests() + + const sentAt = await sendDone + expect(sentAt).toBeGreaterThanOrEqual(before + clientSideThrottleTime) + }) + it('does not allow for a inflight correlation ids collision', async () => { requestQueue.inflight.set(request.entry.correlationId, 'already existing inflight') expect(() => {