Skip to content

Commit

Permalink
redis - adding in cluster type to constructor (#1219)
Browse files Browse the repository at this point in the history
* redis - adding in cluster type to constructor

* specify which client as they are unique

* moving to centralized type

* adding in lint fixes

* adding in iterator and clear on cluster masters

* adding in export of RedisClusterType and RedisClusterOptions

* adding in jsDoc

* adding in tests but will break

* adding in redis cluster compose

* updating tls and test:services:start

* adding in docker compose stop

* throwing errors if cluster on clear and iterator

* updating contributing

* adding in context on clustering issues because of no SCAN

* adding in valkey

* updating contributing instructions
  • Loading branch information
jaredwray authored Nov 22, 2024
1 parent 6c96788 commit 1d48bf4
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 43 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ You can contribute changes to this repo by opening a pull request:

1) After forking this repository to your Git account, make the proposed changes on your forked branch.
2) Run tests and linting locally.
- [Install and run Docker](https://docs.docker.com/get-docker/) if you aren't already.
- [Install and run Docker](https://docs.docker.com/get-docker/) if you aren't already. NOTE: on docker set `enable host networking` to true as it is required for the tests in redis clustering.
- Run `pnpm test:services:start`, allow for the services to come up.
- Run `pnpm test`.
3) Commit your changes and push them to your forked repository.
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

Keyv and its storage adapters are in this mono repo and there are details below on how to use this repository. In addtion we have a couple of other documents for review:

* [CODE_OF_CONDUCT.md](CODE_OF_CONDUCT.md) - Our code of conduct
* [CONTRIBUTING.md](CONTRIBUTING.md) - How to contribute to this project
* [SECURITY.md](SECURITY.md) - Security guidelines and supported versions
* [CODE_OF_CONDUCT](CODE_OF_CONDUCT.md) - Our code of conduct
* [CONTRIBUTING](CONTRIBUTING.md) - How to contribute to this project
* [SECURITY](SECURITY.md) - Security guidelines and supported versions

## Getting Started

Expand All @@ -27,7 +27,7 @@ You can contribute changes to this repo by opening a pull request:

1) After forking this repository to your Git account, make the proposed changes on your forked branch.
2) Run tests and linting locally.
- [Install and run Docker](https://docs.docker.com/get-docker/) if you aren't already.
- [Install and run Docker](https://docs.docker.com/get-docker/) if you aren't already. NOTE: on docker set `enable host networking` to true as it is required for the tests in redis clustering.
- Run `pnpm test:services:start`, allow for the services to come up.
- Run `pnpm test`.
3) Commit your changes and push them to your forked repository.
Expand Down
5 changes: 3 additions & 2 deletions docker-compose-arm64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ services:
REDIS_HOST: redis
ports:
- 6379:6379
keyv_redis_1:
keyv_redis_tls_1:
image: redis:latest
command: redis-server --port 0 --tls-port 6380 --tls-cert-file /tls/redis.crt --tls-key-file /tls/redis.key --tls-ca-cert-file /tls/ca.crt --tls-auth-clients no
environment:
Expand All @@ -94,4 +94,5 @@ services:
- ALLOW_NONE_AUTHENTICATION=yes
ports:
- 2379:2379
- 2380:2380
- 2380:2380

39 changes: 39 additions & 0 deletions docker-compose-redis-cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
x-redis-cluster-base: &redis-cluster-base
image: docker.io/bitnami/redis-cluster:latest
network_mode: host

services:
redis-cluster-1:
container_name: redis-cluster-1
<<: *redis-cluster-base
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003'
- 'REDIS_CLUSTER_DYNAMIC_IPS=no'
- 'REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1'
- 'REDIS_PORT_NUMBER=7001'

redis-cluster-2:
container_name: redis-cluster-2
<<: *redis-cluster-base
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003'
- 'REDIS_CLUSTER_DYNAMIC_IPS=no'
- 'REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1'
- 'REDIS_PORT_NUMBER=7002'

redis-cluster-3:
container_name: redis-cluster-3
<<: *redis-cluster-base
depends_on:
- redis-cluster-1
- redis-cluster-2
environment:
- 'ALLOW_EMPTY_PASSWORD=yes'
- 'REDIS_NODES=127.0.0.1:7001 127.0.0.1:7002 127.0.0.1:7003'
- 'REDIS_CLUSTER_DYNAMIC_IPS=no'
- 'REDIS_CLUSTER_ANNOUNCE_IP=127.0.0.1'
- 'REDIS_PORT_NUMBER=7003'
- 'REDIS_CLUSTER_REPLICAS=0'
- 'REDIS_CLUSTER_CREATOR=yes'
4 changes: 3 additions & 1 deletion packages/redis/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@ const cluster = createCluster({
const keyv = new Keyv({ store: new KeyvRedis(cluster) });
```

You can learn more about the `createCluster` function in the [documentation](https://github.com/redis/node-redis/blob/master/docs/clustering.md) at https://github.com/redis/node-redis/tree/master/docs.
There are some features that are not supported in clustering such as `clear()` and `iterator()`. This is because the `SCAN` command is not supported in clustering. If you need to clear or delete keys you can use the `deleteMany()` method.

You can learn more about the `createCluster` function in the [documentation](https://github.com/redis/node-redis/blob/master/docs/clustering.md) at https://github.com/redis/node-redis/tree/master/docs.

Here is an example of how to use TLS:

Expand Down
83 changes: 65 additions & 18 deletions packages/redis/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import EventEmitter from 'node:events';
import {createClient, type RedisClientType, type RedisClientOptions} from 'redis';
import {
createClient, createCluster, type RedisClientType, type RedisClientOptions, type RedisClusterType,
type RedisClusterOptions,
type RedisModules,
type RedisFunctions,
type RedisScripts,
} from 'redis';
import {Keyv, type KeyvStoreAdapter} from 'keyv';

export type KeyvRedisOptions = {
Expand Down Expand Up @@ -47,9 +53,12 @@ export type KeyvRedisEntry<T> = {
*/
ttl?: number;
};

export type RedisClientConnectionType = RedisClientType | RedisClusterType<RedisModules, RedisFunctions, RedisScripts>;

// eslint-disable-next-line unicorn/prefer-event-target
export default class KeyvRedis extends EventEmitter implements KeyvStoreAdapter {
private _client: RedisClientType = createClient() as RedisClientType;
private _client: RedisClientConnectionType = createClient() as RedisClientType;
private _namespace: string | undefined;
private _keyPrefixSeparator = '::';
private _clearBatchSize = 1000;
Expand All @@ -60,35 +69,34 @@ export default class KeyvRedis extends EventEmitter implements KeyvStoreAdapter
* @param {string | RedisClientOptions | RedisClientType} [connect] How to connect to the Redis server. If string pass in the url, if object pass in the options, if RedisClient pass in the client.
* @param {KeyvRedisOptions} [options] Options for the adapter such as namespace, keyPrefixSeparator, and clearBatchSize.
*/
constructor(connect?: string | RedisClientOptions | RedisClientType, options?: KeyvRedisOptions) {
constructor(connect?: string | RedisClientOptions | RedisClusterOptions | RedisClientConnectionType, options?: KeyvRedisOptions) {
super();

if (connect) {
if (typeof connect === 'string') {
this._client = createClient({url: connect}) as RedisClientType;
} else if ((connect as RedisClientType).connect !== undefined) {
this._client = connect as RedisClientType;
} else if ((connect as any).connect !== undefined) {
this._client = this.isClientCluster(connect as RedisClientConnectionType) ? connect as RedisClusterType : connect as RedisClientType;
} else if (connect instanceof Object) {
this._client = createClient(connect as RedisClientOptions) as RedisClientType;
this._client = (connect as any).rootNodes === undefined ? createClient(connect as RedisClientOptions) as RedisClientType : createCluster(connect as RedisClusterOptions) as RedisClusterType;
}
}

this.setOptions(options);

this.initClient();
}

/**
* Get the Redis client.
*/
public get client(): RedisClientType {
public get client(): RedisClientConnectionType {
return this._client;
}

/**
* Set the Redis client.
*/
public set client(value: RedisClientType) {
public set client(value: RedisClientConnectionType) {
this._client = value;
this.initClient();
}
Expand All @@ -97,13 +105,20 @@ export default class KeyvRedis extends EventEmitter implements KeyvStoreAdapter
* Get the options for the adapter.
*/
public get opts(): KeyvRedisPropertyOptions {
return {
let url = '';
if ((this._client as RedisClientType).options) {
url = (this._client as RedisClientType).options?.url ?? 'redis://localhost:6379';
}

const results: KeyvRedisPropertyOptions = {
namespace: this._namespace,
keyPrefixSeparator: this._keyPrefixSeparator,
clearBatchSize: this._clearBatchSize,
dialect: 'redis',
url: this._client?.options?.url ?? 'redis://localhost:6379',
url,
};

return results;
}

/**
Expand Down Expand Up @@ -176,7 +191,7 @@ export default class KeyvRedis extends EventEmitter implements KeyvStoreAdapter
/**
* Get the Redis URL used to connect to the server. This is used to get a connected client.
*/
public async getClient(): Promise<RedisClientType> {
public async getClient(): Promise<RedisClientConnectionType> {
if (!this._client.isOpen) {
await this._client.connect();
}
Expand Down Expand Up @@ -368,18 +383,39 @@ export default class KeyvRedis extends EventEmitter implements KeyvStoreAdapter
return key;
}

/**
* Is the client a cluster.
* @returns {boolean} - true if the client is a cluster, false if not
*/
public isCluster(): boolean {
return this.isClientCluster(this._client);
}

/**
* Get an async iterator for the keys and values in the store. If a namespace is provided, it will only iterate over keys with that namespace.
* @param {string} [namespace] - the namespace to iterate over
* @returns {AsyncGenerator<[string, T | undefined], void, unknown>} - async iterator with key value pairs
*/
public async * iterator<Value>(namespace?: string): AsyncGenerator<[string, Value | undefined], void, unknown> {
if (this.isCluster()) {
throw new Error('Iterating over keys in a cluster is not supported.');
} else {
yield * this.iteratorClient<Value>(namespace);
}
}

/**
* Get an async iterator for the keys and values in the store. If a namespace is provided, it will only iterate over keys with that namespace.
* @param {string} [namespace] - the namespace to iterate over
* @returns {AsyncGenerator<[string, T | undefined], void, unknown>} - async iterator with key value pairs
*/
public async * iteratorClient<Value>(namespace?: string): AsyncGenerator<[string, Value | undefined], void, unknown> {
const client = await this.getClient();
const match = namespace ? `${namespace}${this._keyPrefixSeparator}*` : '*';
let cursor = '0';
do {
// eslint-disable-next-line no-await-in-loop, @typescript-eslint/naming-convention
const result = await client.scan(Number.parseInt(cursor, 10), {MATCH: match, TYPE: 'string'});
const result = await (client as RedisClientType).scan(Number.parseInt(cursor, 10), {MATCH: match, TYPE: 'string'});
cursor = result.cursor.toString();
let {keys} = result;

Expand All @@ -401,13 +437,13 @@ export default class KeyvRedis extends EventEmitter implements KeyvStoreAdapter

/**
* Clear all keys in the store.
* IMPORTANT: this can cause performance issues if there are a large number of keys in the store. Use with caution as not recommended for production.
* IMPORTANT: this can cause performance issues if there are a large number of keys in the store and worse with clusters. Use with caution as not recommended for production.
* If a namespace is not set it will clear all keys with no prefix.
* If a namespace is set it will clear all keys with that namespace.
* @returns {Promise<void>}
*/
public async clear(): Promise<void> {
await this.clearNamespace(this._namespace);
await (this.isCluster() ? this.clearNamespaceCluster(this._namespace) : this.clearNamespace(this._namespace));
}

private async clearNamespace(namespace?: string): Promise<void> {
Expand All @@ -418,9 +454,8 @@ export default class KeyvRedis extends EventEmitter implements KeyvStoreAdapter
const client = await this.getClient();

do {
// Use SCAN to find keys incrementally in batches
// eslint-disable-next-line no-await-in-loop, @typescript-eslint/naming-convention
const result = await client.scan(Number.parseInt(cursor, 10), {MATCH: match, COUNT: batchSize, TYPE: 'string'});
const result = await (client as RedisClientType).scan(Number.parseInt(cursor, 10), {MATCH: match, COUNT: batchSize, TYPE: 'string'});

cursor = result.cursor.toString();
let {keys} = result;
Expand Down Expand Up @@ -450,6 +485,18 @@ export default class KeyvRedis extends EventEmitter implements KeyvStoreAdapter
}
}

private async clearNamespaceCluster(namespace?: string): Promise<void> {
throw new Error('Clearing all keys in a cluster is not supported.');
}

private isClientCluster(client: RedisClientConnectionType): boolean {
if ((client as any).options === undefined && (client as any).scan === undefined) {
return true;
}

return false;
}

private setOptions(options?: KeyvRedisOptions): void {
if (!options) {
return;
Expand Down Expand Up @@ -493,7 +540,7 @@ export function createKeyv(connect?: string | RedisClientOptions | RedisClientTy
}

export {
createClient, createCluster, type RedisClientOptions, type RedisClientType,
createClient, createCluster, type RedisClientOptions, type RedisClientType, type RedisClusterType, type RedisClusterOptions,
} from 'redis';

export {
Expand Down
100 changes: 100 additions & 0 deletions packages/redis/test/cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import {describe, test, expect} from 'vitest';
import KeyvRedis, {createCluster} from '../src/index.js';

const defaultClusterOptions = {
rootNodes: [
{
url: 'redis://localhost:7001',
},
{
url: 'redis://localhost:7002',
},
{
url: 'redis://localhost:7003',
},
],
useReplicas: true,
};

describe('KeyvRedis Cluster', () => {
test('should be able to connect to a cluster', async () => {
const cluster = createCluster(defaultClusterOptions);

const keyvRedis = new KeyvRedis(cluster);

expect(keyvRedis).toBeDefined();
expect(keyvRedis.client).toEqual(cluster);
});

test('should be able to send in cluster options', async () => {
const keyvRedis = new KeyvRedis(defaultClusterOptions);
expect(keyvRedis.isCluster()).toBe(true);
});

test('shoudl be able to set the redis cluster client', async () => {
const cluster = createCluster(defaultClusterOptions);

const keyvRedis = new KeyvRedis();
expect(keyvRedis.isCluster()).toBe(false);

keyvRedis.client = cluster;
expect(keyvRedis.client).toEqual(cluster);
expect(keyvRedis.isCluster()).toBe(true);
});

test('should be able to set a value', async () => {
const cluster = createCluster(defaultClusterOptions);

const keyvRedis = new KeyvRedis(cluster);

await keyvRedis.delete('test-cl1');

const undefinedResult = await keyvRedis.get('test-cl1');
expect(undefinedResult).toBeUndefined();

await keyvRedis.set('test-cl1', 'test');

const result = await keyvRedis.get('test-cl1');

expect(result).toBe('test');

await keyvRedis.delete('test-cl1');
});

test('should thrown an error on clear', async () => {
const cluster = createCluster(defaultClusterOptions);

const keyvRedis = new KeyvRedis(cluster);

let errorThrown = false;
try {
await keyvRedis.clear();
} catch (error) {
expect(error).toBeDefined();
errorThrown = true;
}

expect(errorThrown).toBe(true);
});

test('should throw an error on iterator', async () => {
const cluster = createCluster(defaultClusterOptions);

const keyvRedis = new KeyvRedis(cluster);

let errorThrown = false;
try {
const keys = [];
const values = [];
for await (const [key, value] of keyvRedis.iterator('foo')) {
keys.push(key);
values.push(value);
}
} catch (error) {
expect(error).toBeDefined();
errorThrown = true;
}

expect(errorThrown).toBe(true);
});
});
Loading

0 comments on commit 1d48bf4

Please sign in to comment.