Skip to content

Commit

Permalink
refactor(queue-rate-limit): move QueueRateLimit.set() logic to LUA
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed May 15, 2024
1 parent 538cf78 commit d104a9c
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 7 deletions.
14 changes: 14 additions & 0 deletions src/common/redis-client/scripts/lua/set-queue-rate-limit.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
local keyQueueProperties = KEYS[1]

---

local EQueuePropertyRateLimit = ARGV[1]
local rateLimit = ARGV[2]

local result = redis.call("EXISTS", keyQueueProperties)
if result == 0 then
return 'QUEUE_NOT_FOUND';
end

redis.call("HSET", keyQueueProperties, EQueuePropertyRateLimit, rateLimit)
return 'OK'
7 changes: 7 additions & 0 deletions src/common/redis-client/scripts/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export enum ELuaScriptName {
FETCH_MESSAGE_FOR_PROCESSING = 'FETCH_MESSAGE_FOR_PROCESSING',
DELETE_CONSUMER_GROUP = 'DELETE_CONSUMER_GROUP',
CLEANUP_OFFLINE_CONSUMER = 'CLEANUP_OFFLINE_CONSUMER',
SET_QUEUE_RATE_LIMIT = 'SET_QUEUE_RATE_LIMIT',
}

RedisClientAbstract.addScript(
Expand Down Expand Up @@ -103,3 +104,9 @@ RedisClientAbstract.addScript(
.readFileSync(resolve(getDirname(), './lua/cleanup-offline-consumer.lua'))
.toString(),
);
RedisClientAbstract.addScript(
ELuaScriptName.SET_QUEUE_RATE_LIMIT,
fs
.readFileSync(resolve(getDirname(), './lua/set-queue-rate-limit.lua'))
.toString(),
);
1 change: 1 addition & 0 deletions src/lib/queue-rate-limit/errors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
export { QueueRateLimitError } from './queue-rate-limit.error.js';
export { QueueRateLimitInvalidIntervalError } from './queue-rate-limit-invalid-interval.error.js';
export { QueueRateLimitInvalidLimitError } from './queue-rate-limit-invalid-limit.error.js';
export { QueueRateLimitQueueNotFoundError } from './queue-rate-limit-queue-not-found.error.js';
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { QueueRateLimitError } from './queue-rate-limit.error.js';

export class QueueRateLimitQueueNotFoundError extends QueueRateLimitError {}
17 changes: 12 additions & 5 deletions src/lib/queue-rate-limit/queue-rate-limit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
logger,
} from 'redis-smq-common';
import { RedisClientInstance } from '../../common/redis-client/redis-client-instance.js';
import { ELuaScriptName } from '../../common/redis-client/scripts/scripts.js';
import { redisKeys } from '../../common/redis-keys/redis-keys.js';
import { Configuration } from '../../config/index.js';
import { _parseQueueParamsAndValidate } from '../queue/_/_parse-queue-params-and-validate.js';
Expand All @@ -27,6 +28,7 @@ import { _hasRateLimitExceeded } from './_/_has-rate-limit-exceeded.js';
import {
QueueRateLimitInvalidLimitError,
QueueRateLimitInvalidIntervalError,
QueueRateLimitQueueNotFoundError,
} from './errors/index.js';

export class QueueRateLimit {
Expand Down Expand Up @@ -91,11 +93,16 @@ export class QueueRateLimit {
queueParams,
null,
);
client.hset(
keyQueueProperties,
String(EQueueProperty.RATE_LIMIT),
JSON.stringify(validatedRateLimit),
(err) => cb(err),
client.runScript(
ELuaScriptName.SET_QUEUE_RATE_LIMIT,
[keyQueueProperties],
[EQueueProperty.RATE_LIMIT, JSON.stringify(validatedRateLimit)],
(err, reply) => {
if (err) cb(err);
else if (reply !== 'OK')
cb(new QueueRateLimitQueueNotFoundError());
else cb();
},
);
}
});
Expand Down
20 changes: 20 additions & 0 deletions tests/common/mock-module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { jest } from '@jest/globals';
import { TFunction } from 'redis-smq-common';

export function mockModule(moduleName: string, mockFactory: TFunction) {
if (process.env['NODE_OPTIONS']?.includes('--experimental-vm-modules')) {
jest.unstable_mockModule(moduleName, mockFactory);
} else {
// Fixing TypeError: The second argument of `jest.mock` must be an inline function.
jest.mock(moduleName, (...args: unknown[]) => mockFactory(...args));
}
}
11 changes: 10 additions & 1 deletion tests/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
* in the root directory of this source tree.
*/

import { beforeAll, afterAll, beforeEach, afterEach } from '@jest/globals';
import {
beforeAll,
afterAll,
beforeEach,
afterEach,
jest,
} from '@jest/globals';
import { shutdown } from './common/shut-down.js';
import { startUp } from './common/start-up.js';

Expand All @@ -16,7 +22,10 @@ beforeAll(() => void 0);
afterAll(() => void 0);

beforeEach(async () => {
jest.resetAllMocks();
jest.resetModules();
await startUp();
jest.resetModules();
});

afterEach(async () => {
Expand Down
14 changes: 13 additions & 1 deletion tests/tests/queue-rate-limit/test00028.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,23 @@
import { test, expect } from '@jest/globals';
import { QueueRateLimitInvalidIntervalError } from '../../../src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-interval.error.js';
import { QueueRateLimitInvalidLimitError } from '../../../src/lib/queue-rate-limit/errors/queue-rate-limit-invalid-limit.error.js';
import { defaultQueue } from '../../common/message-producing-consuming.js';
import { QueueQueueNotFoundError } from '../../../src/lib/queue/errors/queue-queue-not-found.error.js';
import {
createQueue,
defaultQueue,
} from '../../common/message-producing-consuming.js';
import { getQueueRateLimit } from '../../common/queue-rate-limit.js';

test('SetQueueRateLimit()/GetQueueRateLimit()/ClearQueueRateLimit()', async () => {
const queueRateLimit = await getQueueRateLimit();
await expect(
queueRateLimit.setAsync(defaultQueue, {
limit: 5,
interval: 1000,
}),
).rejects.toThrow(QueueQueueNotFoundError);

await createQueue(defaultQueue, false);
await queueRateLimit.setAsync(defaultQueue, {
limit: 5,
interval: 1000,
Expand Down
50 changes: 50 additions & 0 deletions tests/tests/queue-rate-limit/test00032.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c)
* Weyoss <weyoss@protonmail.com>
* https://github.com/weyoss
*
* This source code is licensed under the MIT license found in the LICENSE file
* in the root directory of this source tree.
*/

import { expect, test } from '@jest/globals';
import bluebird from 'bluebird';
import { resolve } from 'path';
import { getDirname, ICallback, IRedisClient } from 'redis-smq-common';
import { IQueueParams } from '../../../src/lib/queue/types/queue.js';
import { defaultQueue } from '../../common/message-producing-consuming.js';
import { mockModule } from '../../common/mock-module.js';

const dir = getDirname();

test('SetQueueRateLimit(): QueueRateLimitQueueNotFoundError', async () => {
const modulePath = resolve(
dir,
'../../../src/lib/queue/_/_parse-queue-params-and-validate.js',
);
mockModule(modulePath, () => {
return {
_parseQueueParamsAndValidate(
redisClient: IRedisClient,
queue: string | IQueueParams,
cb: ICallback<IQueueParams>,
) {
cb(null, defaultQueue);
},
};
});
const { QueueRateLimit } = await import(
'../../../src/lib/queue-rate-limit/queue-rate-limit.js'
);
const { QueueRateLimitQueueNotFoundError } = await import(
'../../../src/lib/queue-rate-limit/errors/queue-rate-limit-queue-not-found.error.js'
);
const queueRateLimit = bluebird.promisifyAll(new QueueRateLimit());
await expect(
queueRateLimit.setAsync(defaultQueue, {
limit: 5,
interval: 1000,
}),
).rejects.toThrow(QueueRateLimitQueueNotFoundError);
await queueRateLimit.shutdownAsync();
});

0 comments on commit d104a9c

Please sign in to comment.