Skip to content

Commit

Permalink
redis: clear and iterator methods support Clusters (#1246)
Browse files Browse the repository at this point in the history
* feat: add cluster support to clear and iterator methods

* perf: use MGET in getMany method

* docs(src/index.ts): fix JSDoc of mGetWithClusterSupport method

* docs: add code comment

* docs: improve code comments

* renaming mGetWithClusterSupport to mget since it is private

---------

Co-authored-by: Jared Wray <me@jaredwray.com>
  • Loading branch information
mahdavipanah and jaredwray authored Dec 12, 2024
1 parent e830335 commit e9a8edf
Show file tree
Hide file tree
Showing 5 changed files with 391 additions and 98 deletions.
4 changes: 1 addition & 3 deletions packages/redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ With namespaces being prefix based it is critical to understand some of the perf

* `delete()` - By default we are now using `UNLINK` instead of `DEL` for deleting keys. This is a non-blocking command that is more efficient than `DEL`. If you are deleting a large number of keys it is recommended to use the `deleteMany()` method instead of `delete()`.

* `clearBatchSize` - The `clearBatchSize` option is set to `1000` by default. This is because Redis has a limit of 1000 keys that can be deleted in a single batch.
* `clearBatchSize` - The `clearBatchSize` option is set to `1000` by default. This is because Redis has a limit of 1000 keys that can be deleted in a single batch. If no namespace is defined and noNamespaceAffectsAll is set to `true` this option will be ignored and the `FLUSHDB` command will be used instead.

* `useUnlink` - This option is set to `true` by default. This is because `UNLINK` is a non-blocking command that is more efficient than `DEL`. If you are not using `UNLINK` and are doing a lot of deletes it is recommended to set this option to `true`.

Expand Down Expand Up @@ -183,8 +183,6 @@ const cluster = createCluster({
const keyv = new Keyv({ store: new KeyvRedis(cluster) });
```

There are some features that are not supported in clustering such as `clear()` and `iterator()`. This is because the `SCAN` command is not supported in clustering. If you need to clear or delete keys you can use the `deleteMany()` method.

You can learn more about the `createCluster` function in the [documentation](https://github.com/redis/node-redis/blob/master/docs/clustering.md) at https://github.com/redis/node-redis/tree/master/docs.

Here is an example of how to use TLS:
Expand Down
1 change: 1 addition & 0 deletions packages/redis/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
},
"homepage": "https://github.com/jaredwray/keyv",
"dependencies": {
"cluster-key-slot": "^1.1.2",
"keyv": "workspace:^",
"redis": "^4.7.0"
},
Expand Down
210 changes: 138 additions & 72 deletions packages/redis/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
type RedisScripts,
} from 'redis';
import {Keyv, type KeyvStoreAdapter} from 'keyv';
import calculateSlot from 'cluster-key-slot';

export type KeyvRedisOptions = {
/**
Expand Down Expand Up @@ -315,16 +316,14 @@ export default class KeyvRedis extends EventEmitter implements KeyvStoreAdapter
* @returns {Promise<Array<string | undefined>>} - array of values or undefined if the key does not exist
*/
public async getMany<T>(keys: string[]): Promise<Array<T | undefined>> {
const client = await this.getClient();
const multi = client.multi();
for (const key of keys) {
const prefixedKey = this.createKeyPrefix(key, this._namespace);
multi.get(prefixedKey);
if (keys.length === 0) {
return [];
}

const values = await multi.exec();
keys = keys.map(key => this.createKeyPrefix(key, this._namespace));
const values = await this.mget<T>(keys);

return values.map(value => value === null ? undefined : value as T);
return values;
}

/**
Expand Down Expand Up @@ -417,47 +416,52 @@ export default class KeyvRedis extends EventEmitter implements KeyvStoreAdapter
}

/**
* Get an async iterator for the keys and values in the store. If a namespace is provided, it will only iterate over keys with that namespace.
* @param {string} [namespace] - the namespace to iterate over
* @returns {AsyncGenerator<[string, T | undefined], void, unknown>} - async iterator with key value pairs
* Get the master nodes in the cluster. If not a cluster, it will return the single client.
*
* @returns {Promise<RedisClientType[]>} - array of master nodes
*/
public async * iterator<Value>(namespace?: string): AsyncGenerator<[string, Value | undefined], void, unknown> {
public async getMasterNodes(): Promise<RedisClientType[]> {
if (this.isCluster()) {
throw new Error('Iterating over keys in a cluster is not supported.');
} else {
yield * this.iteratorClient<Value>(namespace);
const cluster = await this.getClient() as RedisClusterType;
return Promise.all(cluster.masters.map(async main => cluster.nodeClient(main)));
}

return [await this.getClient() as RedisClientType];
}

/**
* Get an async iterator for the keys and values in the store. If a namespace is provided, it will only iterate over keys with that namespace.
* @param {string} [namespace] - the namespace to iterate over
* @returns {AsyncGenerator<[string, T | undefined], void, unknown>} - async iterator with key value pairs
*/
public async * iteratorClient<Value>(namespace?: string): AsyncGenerator<[string, Value | undefined], void, unknown> {
const client = await this.getClient();
const match = namespace ? `${namespace}${this._keyPrefixSeparator}*` : '*';
let cursor = '0';
do {
// eslint-disable-next-line no-await-in-loop, @typescript-eslint/naming-convention
const result = await (client as RedisClientType).scan(Number.parseInt(cursor, 10), {MATCH: match, TYPE: 'string'});
cursor = result.cursor.toString();
let {keys} = result;

if (!namespace && !this._noNamespaceAffectsAll) {
keys = keys.filter(key => !key.includes(this._keyPrefixSeparator));
}
public async * iterator<Value>(namespace?: string): AsyncGenerator<[string, Value | undefined], void, unknown> {
// When instance is not a cluster, it will only have one client
const clients = await this.getMasterNodes();

for (const client of clients) {
const match = namespace ? `${namespace}${this._keyPrefixSeparator}*` : '*';
let cursor = '0';
do {
// eslint-disable-next-line no-await-in-loop, @typescript-eslint/naming-convention
const result = await client.scan(Number.parseInt(cursor, 10), {MATCH: match, TYPE: 'string'});
cursor = result.cursor.toString();
let {keys} = result;

if (keys.length > 0) {
// eslint-disable-next-line no-await-in-loop
const values = await client.mGet(keys);
for (const [i] of keys.entries()) {
const key = this.getKeyWithoutPrefix(keys[i], namespace);
const value = values ? values[i] : undefined;
yield [key, value as Value | undefined];
if (!namespace && !this._noNamespaceAffectsAll) {
keys = keys.filter(key => !key.includes(this._keyPrefixSeparator));
}
}
} while (cursor !== '0');

if (keys.length > 0) {
// eslint-disable-next-line no-await-in-loop
const values = await this.mget<Value>(keys);
for (const i of keys.keys()) {
const key = this.getKeyWithoutPrefix(keys[i], namespace);
const value = values[i];
yield [key, value];
}
}
} while (cursor !== '0');
}
}

/**
Expand All @@ -468,57 +472,119 @@ export default class KeyvRedis extends EventEmitter implements KeyvStoreAdapter
* @returns {Promise<void>}
*/
public async clear(): Promise<void> {
await (this.isCluster() ? this.clearNamespaceCluster(this._namespace) : this.clearNamespace(this._namespace));
}

private async clearNamespace(namespace?: string): Promise<void> {
try {
if (!namespace && this._noNamespaceAffectsAll) {
const client = await this.getClient() as RedisClientType;
await client.flushDb();
return;
}
// When instance is not a cluster, it will only have one client
const clients = await this.getMasterNodes();

let cursor = '0';
const batchSize = this._clearBatchSize;
const match = namespace ? `${namespace}${this._keyPrefixSeparator}*` : '*';
const client = await this.getClient();
await Promise.all(clients.map(async client => {
if (!this._namespace && this._noNamespaceAffectsAll) {
await client.flushDb();
return;
}

do {
// eslint-disable-next-line no-await-in-loop, @typescript-eslint/naming-convention
const result = await (client as RedisClientType).scan(Number.parseInt(cursor, 10), {MATCH: match, COUNT: batchSize, TYPE: 'string'});
let cursor = '0';
const batchSize = this._clearBatchSize;
const match = this._namespace ? `${this._namespace}${this._keyPrefixSeparator}*` : '*';
const deletePromises = [];

cursor = result.cursor.toString();
let {keys} = result;
do {
// eslint-disable-next-line no-await-in-loop, @typescript-eslint/naming-convention
const result = await client.scan(Number.parseInt(cursor, 10), {MATCH: match, COUNT: batchSize, TYPE: 'string'});

if (keys.length === 0) {
continue;
}
cursor = result.cursor.toString();
let {keys} = result;

if (!namespace) {
keys = keys.filter(key => !key.includes(this._keyPrefixSeparator));
}
if (keys.length === 0) {
continue;
}

if (keys.length > 0) {
// eslint-disable-next-line unicorn/prefer-ternary
if (this._useUnlink) {
// eslint-disable-next-line no-await-in-loop
await client.unlink(keys);
} else {
// eslint-disable-next-line no-await-in-loop
await client.del(keys);
if (!this._namespace) {
keys = keys.filter(key => !key.includes(this._keyPrefixSeparator));
}
}
} while (cursor !== '0');

deletePromises.push(this.clearWithClusterSupport(keys));
} while (cursor !== '0');

await Promise.all(deletePromises);
}));
/* c8 ignore next 3 */
} catch (error) {
this.emit('error', error);
}
}

private async clearNamespaceCluster(namespace?: string): Promise<void> {
throw new Error('Clearing all keys in a cluster is not supported.');
/**
* Get many keys. If the instance is a cluster, it will do multiple MGET calls
* by separating the keys by slot to solve the CROSS-SLOT restriction.
*/
private async mget<T = any>(keys: string[]): Promise<Array<T | undefined>> {
const slotMap = this.getSlotMap(keys);

const valueMap = new Map<string, string | undefined>();
await Promise.all(Array.from(slotMap.entries(), async ([slot, keys]) => {
const client = await this.getSlotMaster(slot);

const values = await client.mGet(keys);
for (const [index, value] of values.entries()) {
valueMap.set(keys[index], value ?? undefined);
}
}));

return keys.map(key => valueMap.get(key) as T | undefined);
}

/**
* Clear all keys in the store with a specific namespace. If the instance is a cluster, it will clear all keys
* by separating the keys by slot to solve the CROSS-SLOT restriction.
*/
private async clearWithClusterSupport(keys: string[]): Promise<void> {
if (keys.length > 0) {
const slotMap = this.getSlotMap(keys);

await Promise.all(Array.from(slotMap.entries(), async ([slot, keys]) => {
const client = await this.getSlotMaster(slot);

return this._useUnlink ? client.unlink(keys) : client.del(keys);
}));
}
}

/**
* Returns the master node client for a given slot or the instance's client if it's not a cluster.
*/
private async getSlotMaster(slot: number): Promise<RedisClientType> {
const connection = await this.getClient();

if (this.isCluster()) {
const cluster = connection as RedisClusterType;
const mainNode = cluster.slots[slot].master;
return cluster.nodeClient(mainNode);
}

return connection as RedisClientType;
}

/**
* Group keys by their slot.
*
* @param {string[]} keys - the keys to group
* @returns {Map<number, string[]>} - map of slot to keys
*/
private getSlotMap(keys: string[]) {
const slotMap = new Map<number, string[]>();
if (this.isCluster()) {
for (const key of keys) {
const slot = calculateSlot(key);
const slotKeys = slotMap.get(slot) ?? [];
slotKeys.push(key);
slotMap.set(slot, slotKeys);
}
} else {
// Non-clustered client supports CROSS-SLOT multi-key command so we set arbitrary slot 0
slotMap.set(0, keys);
}

return slotMap;
}

private isClientCluster(client: RedisClientConnectionType): boolean {
Expand Down
Loading

0 comments on commit e9a8edf

Please sign in to comment.