Skip to content

Commit

Permalink
Merge pull request #1532 from ekanth/fix-marginal-timeout-causes-limbo
Browse files Browse the repository at this point in the history
Ensure checkPendingRequests is run when the throttle delay is marginal
  • Loading branch information
Nevon committed Feb 27, 2023
2 parents 7aa5b4c + 3ceeb24 commit e52044d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ module.exports = class Broker {
* @returns {Promise}
*/
async connect() {
await this.lock.acquire()
try {
await this.lock.acquire()
if (this.isConnected()) {
return
}
Expand Down
42 changes: 27 additions & 15 deletions src/network/requestQueue/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const PRIVATE = {
}

const REQUEST_QUEUE_EMPTY = 'requestQueueEmpty'
const CHECK_PENDING_REQUESTS_INTERVAL = 10

module.exports = class RequestQueue extends EventEmitter {
/**
Expand Down Expand Up @@ -102,23 +103,14 @@ module.exports = class RequestQueue extends EventEmitter {
}

maybeThrottle(clientSideThrottleTime) {
if (clientSideThrottleTime) {
if (clientSideThrottleTime !== null && clientSideThrottleTime > 0) {
this.logger.debug(`Client side throttling in effect for ${clientSideThrottleTime}ms`)
const minimumThrottledUntil = Date.now() + clientSideThrottleTime
this.throttledUntil = Math.max(minimumThrottledUntil, this.throttledUntil)
}
}

/**
* @typedef {Object} PushedRequest
* @property {import("./socketRequest").RequestEntry} entry
* @property {boolean} expectResponse
* @property {Function} sendRequest
* @property {number} [requestTimeout]
*
* @public
* @param {PushedRequest} pushedRequest
*/
push(pushedRequest) {
createSocketRequest(pushedRequest) {
const { correlationId } = pushedRequest.entry
const defaultRequestTimeout = this.requestTimeout
const customRequestTimeout = pushedRequest.requestTimeout
Expand Down Expand Up @@ -149,6 +141,23 @@ module.exports = class RequestQueue extends EventEmitter {
},
})

return socketRequest
}

/**
* @typedef {Object} PushedRequest
* @property {import("./socketRequest").RequestEntry} entry
* @property {boolean} expectResponse
* @property {Function} sendRequest
* @property {number} [requestTimeout]
*
* @public
* @param {PushedRequest} pushedRequest
*/
push(pushedRequest) {
const { correlationId } = pushedRequest.entry
const socketRequest = this.createSocketRequest(pushedRequest)

if (this.canSendSocketRequestImmediately()) {
this.sendSocketRequest(socketRequest)
return
Expand Down Expand Up @@ -300,12 +309,15 @@ module.exports = class RequestQueue extends EventEmitter {
// will be fine, and potentially fix up a new timeout if needed at that time.
// Note that if we're merely "overloaded" by having too many inflight requests
// we will anyways check the queue when one of them gets fulfilled.
const timeUntilUnthrottled = this.throttledUntil - Date.now()
if (timeUntilUnthrottled > 0 && !this.throttleCheckTimeoutId) {
let scheduleAt = this.throttledUntil - Date.now()
if (!this.throttleCheckTimeoutId) {
if (this.pending.length > 0) {
scheduleAt = scheduleAt > 0 ? scheduleAt : CHECK_PENDING_REQUESTS_INTERVAL
}
this.throttleCheckTimeoutId = setTimeout(() => {
this.throttleCheckTimeoutId = null
this.checkPendingRequests()
}, timeUntilUnthrottled)
}, scheduleAt)
}
}
}
22 changes: 22 additions & 0 deletions src/network/requestQueue/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,28 @@ describe('Network > RequestQueue', () => {
expect(sentAt).toBeGreaterThanOrEqual(before + clientSideThrottleTime)
})

it('ensure request is sent when client-side throttling delay is marginal', async () => {
const sendDone = new Promise(resolve => {
request.sendRequest = () => {
resolve(Date.now())
}
})

expect(requestQueue.canSendSocketRequestImmediately()).toBe(true)
const socketRequest = requestQueue.createSocketRequest(request)
requestQueue.pending.push(socketRequest)

const before = Date.now()
const clientSideThrottleTime = 1
requestQueue.maybeThrottle(clientSideThrottleTime)
// Sleep until the marginal delay is passed before calling scheduleCheckPendingRequests()
await sleep(clientSideThrottleTime)
requestQueue.scheduleCheckPendingRequests()

const sentAt = await sendDone
expect(sentAt).toBeGreaterThanOrEqual(before + clientSideThrottleTime)
})

it('does not allow for a inflight correlation ids collision', async () => {
requestQueue.inflight.set(request.entry.correlationId, 'already existing inflight')
expect(() => {
Expand Down

0 comments on commit e52044d

Please sign in to comment.