Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DX-1019: Read Your Writes Support #1175

Merged
merged 23 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/auto-pipeline.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ describe("Auto pipeline", () => {
redis.zscore(newKey(), "member"),
redis.zunionstore(newKey(), 1, [newKey()]),
redis.zunion(1, [newKey()]),
redis.json.set(persistentKey3, '$', { log: ["one", "two"] }),
redis.json.set(persistentKey3, "$", { log: ["one", "two"] }),
redis.json.arrappend(persistentKey3, "$.log", '"three"'),
]);
expect(result).toBeTruthy();
Expand Down
5 changes: 3 additions & 2 deletions pkg/auto-pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ export function createAutoPipelineProxy(_redis: Redis, json?: boolean): Redis {

// If the method is a function on the pipeline, wrap it with the executor logic
const isFunction = json
? typeof redis.autoPipelineExecutor.pipeline.json[command as keyof Pipeline["json"]] === "function"
: typeof redis.autoPipelineExecutor.pipeline[command as keyof Pipeline] === "function"
? typeof redis.autoPipelineExecutor.pipeline.json[command as keyof Pipeline["json"]] ===
"function"
: typeof redis.autoPipelineExecutor.pipeline[command as keyof Pipeline] === "function";
if (isFunction) {
return (...args: CommandArgs<typeof Command>) => {
// pass the function as a callback
Expand Down
2 changes: 2 additions & 0 deletions pkg/commands/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ export class Command<TResult, TData> {
public async exec(client: Requester): Promise<TData> {
const { result, error } = await client.request<TResult>({
body: this.command,
upstashSyncToken: client.upstashSyncToken,
});

if (error) {
throw new UpstashError(error);
}
Expand Down
64 changes: 48 additions & 16 deletions pkg/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,21 @@ export type UpstashRequest = {
* Request body will be serialized to json
*/
body?: unknown;

fahreddinozcan marked this conversation as resolved.
Show resolved Hide resolved
upstashSyncToken?: string;
};
export type UpstashResponse<TResult> = { result?: TResult; error?: string };

export interface Requester {
/**
* When this flag is not disabled, new commands of this client expects the previous write commands to be finalized before executing.
fahreddinozcan marked this conversation as resolved.
Show resolved Hide resolved
*/
readYourWrites: boolean;

/**
* This token is used to ensure that the client is in sync with the server. On each request, we send this token in the header, and the server will return a new token.
*/
upstashSyncToken: string;
request: <TResult = unknown>(req: UpstashRequest) => Promise<UpstashResponse<TResult>>;
}

Expand All @@ -29,22 +40,22 @@ type ResultError = {
export type RetryConfig =
| false
| {
/**
* The number of retries to attempt before giving up.
*
* @default 5
*/
retries?: number;
/**
* A backoff function receives the current retry cound and returns a number in milliseconds to wait before retrying.
*
* @default
* ```ts
* Math.exp(retryCount) * 50
* ```
*/
backoff?: (retryCount: number) => number;
};
/**
* The number of retries to attempt before giving up.
*
* @default 5
*/
retries?: number;
/**
* A backoff function receives the current retry cound and returns a number in milliseconds to wait before retrying.
*
* @default
* ```ts
* Math.exp(retryCount) * 50
* ```
*/
backoff?: (retryCount: number) => number;
};

export type Options = {
backend?: string;
Expand Down Expand Up @@ -95,11 +106,17 @@ export type HttpClientConfig = {
agent?: any;
signal?: AbortSignal;
keepAlive?: boolean;

/**
* When this flag is not disabled, new commands of this client expects the previous write commands to be finalized before executing.
*/
readYourWrites?: boolean;
} & RequesterConfig;

export class HttpClient implements Requester {
public baseUrl: string;
public headers: Record<string, string>;

public readonly options: {
backend?: string;
agent: any;
Expand All @@ -108,6 +125,8 @@ export class HttpClient implements Requester {
cache?: CacheSetting;
keepAlive: boolean;
};
public readYourWrites: boolean;
public upstashSyncToken = "";

public readonly retry: {
attempts: number;
Expand All @@ -123,6 +142,8 @@ export class HttpClient implements Requester {
signal: config.signal,
keepAlive: config.keepAlive ?? true,
};
this.upstashSyncToken = "";
this.readYourWrites = config.readYourWrites ?? true;

this.baseUrl = config.baseUrl.replace(/\/$/, "");

Expand Down Expand Up @@ -202,6 +223,11 @@ export class HttpClient implements Requester {
backend: this.options?.backend,
};

if (this.readYourWrites) {
const newHeader = this.upstashSyncToken;
this.headers["upstash-sync-token"] = newHeader;
}

fahreddinozcan marked this conversation as resolved.
Show resolved Hide resolved
let res: Response | null = null;
let error: Error | null = null;
for (let i = 0; i <= this.retry.attempts; i++) {
Expand Down Expand Up @@ -233,6 +259,11 @@ export class HttpClient implements Requester {
throw new UpstashError(`${body.error}, command was: ${JSON.stringify(req.body)}`);
}

if (this.readYourWrites) {
const headers = res.headers;
this.upstashSyncToken = headers.get("upstash-sync-token") ?? "";
}

if (this.options?.responseEncoding === "base64") {
if (Array.isArray(body)) {
return body.map(({ result, error }) => ({
Expand All @@ -243,6 +274,7 @@ export class HttpClient implements Requester {
const result = decode(body.result) as any;
return { result, error: body.error };
}

return body as UpstashResponse<TResult>;
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ export class Pipeline<TCommands extends Command<any, any>[] = []> {
throw new Error("Pipeline is empty");
}
const path = this.multiExec ? ["multi-exec"] : ["pipeline"];

const res = (await this.client.request({
path,
body: Object.values(this.commands).map((c) => c.command),
Expand Down
116 changes: 116 additions & 0 deletions pkg/read-your-writes.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { keygen, newHttpClient } from "./test-utils";

import { afterAll, describe, expect, test } from "bun:test";

import { Redis as PublicRedis } from "../platforms/nodejs";
import { SetCommand } from "./commands/set";
import { Redis } from "./redis";

const client = newHttpClient();
const { cleanup } = keygen();
afterAll(cleanup);
describe("Read Your Writes Feature", () => {
test("successfully retrieves Upstash-Sync-Token in the response header and updates local state", async () => {
const initialSync = client.upstashSyncToken;
await new SetCommand(["key", "value"]).exec(client);
const updatedSync = client.upstashSyncToken;
await new SetCommand(["key", "value"]).exec(client);

expect(updatedSync).not.toEqual(initialSync);
});

test("succesfully updates sync state with pipeline", async () => {
const initialSync = client.upstashSyncToken;

const { pipeline } = new Redis(client);
const p = pipeline();

p.set("key1", "value1");
p.set("key2", "value2");
p.set("key3", "value3");

await p.exec();

const updatedSync = client.upstashSyncToken;

expect(initialSync).not.toEqual(updatedSync);
});

test("updates after each element of promise.all", async () => {
let currentSync = client.upstashSyncToken;

const promises = Array.from({ length: 3 }, (_, i) =>
new SetCommand([`key${i}`, `value${i}`]).exec(client).then(() => {
expect(client.upstashSyncToken).not.toEqual(currentSync);
currentSync = client.upstashSyncToken;
}),
);

await Promise.all(promises);
});

test("updates after successful lua script call", async () => {
const s = `redis.call('SET', 'mykey', 'myvalue')
return 1
`;

const initialSync = client.upstashSyncToken;

const redis = new Redis(client);
const script = redis.createScript(s);

await script.exec([], []);

const updatedSync = client.upstashSyncToken;

expect(updatedSync).not.toEqual(initialSync);
});

test("should not update the sync state in case of Redis client with manuel HTTP client and opt-out ryw", async () => {
const optOutClient = newHttpClient();
const redis = new Redis(optOutClient, { readYourWrites: false });

const initialSync = optOutClient.upstashSyncToken;

await redis.set("key", "value");

const updatedSync = optOutClient.upstashSyncToken;

expect(updatedSync).toEqual(initialSync);
});

test("should not update the sync state when public Redis interface is provided with opt-out", async () => {
const redis = new PublicRedis({
url: process.env.UPSTASH_REDIS_REST_URL,
token: process.env.UPSTASH_REDIS_REST_TOKEN,
readYourWrites: false,
});

// @ts-expect-error - We need the sync token for this test, which resides on the client
const initialSync = redis.client.upstashSyncToken;

await redis.set("key", "value");

// @ts-expect-error - We need the sync token for this test, which resides on the client
const updatedSync = redis.client.upstashSyncToken;

expect(updatedSync).toEqual(initialSync);
});

test("should update the sync state when public Redis interface is provided with default behaviour", async () => {
const redis = new PublicRedis({
url: process.env.UPSTASH_REDIS_REST_URL,
token: process.env.UPSTASH_REDIS_REST_TOKEN,
});

// @ts-expect-error - We need the sync token for this test, which resides on the client
const initialSync = redis.client.upstashSyncToken;

await redis.set("key", "value");

// @ts-expect-error - We need the sync token for this test, which resides on the client
const updatedSync = redis.client.upstashSyncToken;
expect(updatedSync).not.toEqual(initialSync);
})

});
37 changes: 20 additions & 17 deletions pkg/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ export class Redis {
this.opts = opts;
this.enableTelemetry = opts?.enableTelemetry ?? true;
this.enableAutoPipelining = opts?.enableAutoPipelining ?? false;
if (opts?.readYourWrites === false) {
this.client.readYourWrites = false;
}
}

get json() {
Expand Down Expand Up @@ -450,9 +453,9 @@ export class Redis {
sourceKey: string,
...sourceKeys: string[]
) =>
new BitOpCommand([op as any, destinationKey, sourceKey, ...sourceKeys], this.opts).exec(
this.client,
);
new BitOpCommand([op as any, destinationKey, sourceKey, ...sourceKeys], this.opts).exec(
this.client,
);

/**
* @see https://redis.io/commands/bitpos
Expand Down Expand Up @@ -1201,10 +1204,10 @@ export class Redis {
...args:
| [key: string, scoreMember: ScoreMember<TData>, ...scoreMemberPairs: ScoreMember<TData>[]]
| [
key: string,
opts: ZAddCommandOptions,
...scoreMemberPairs: [ScoreMember<TData>, ...ScoreMember<TData>[]],
]
key: string,
opts: ZAddCommandOptions,
...scoreMemberPairs: [ScoreMember<TData>, ...ScoreMember<TData>[]],
]
) => {
if ("score" in args[1]) {
return new ZAddCommand<TData>(
Expand Down Expand Up @@ -1279,17 +1282,17 @@ export class Redis {
...args:
| [key: string, min: number, max: number, opts?: ZRangeCommandOptions]
| [
key: string,
min: `(${string}` | `[${string}` | "-" | "+",
max: `(${string}` | `[${string}` | "-" | "+",
opts: { byLex: true } & ZRangeCommandOptions,
]
key: string,
min: `(${string}` | `[${string}` | "-" | "+",
max: `(${string}` | `[${string}` | "-" | "+",
opts: { byLex: true } & ZRangeCommandOptions,
]
| [
key: string,
min: number | `(${number}` | "-inf" | "+inf",
max: number | `(${number}` | "-inf" | "+inf",
opts: { byScore: true } & ZRangeCommandOptions,
]
key: string,
min: number | `(${number}` | "-inf" | "+inf",
max: number | `(${number}` | "-inf" | "+inf",
opts: { byScore: true } & ZRangeCommandOptions,
]
) => new ZRangeCommand<TData>(args as any, this.opts).exec(this.client);

/**
Expand Down
1 change: 1 addition & 0 deletions pkg/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ export type RedisOptions = {
latencyLogging?: boolean;
enableTelemetry?: boolean;
enableAutoPipelining?: boolean;
readYourWrites?: boolean;
};
10 changes: 8 additions & 2 deletions platforms/cloudflare.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ export type RedisConfigCloudflare = {
*/
signal?: AbortSignal;
keepAlive?: boolean;

/**
* When this flag is not disabled, new commands of this client expects the previous write commands to be finalized before executing.
*/
readYourWrites?: boolean;
} & core.RedisOptions &
RequesterConfig &
Env;
Expand All @@ -50,11 +55,11 @@ export class Redis extends core.Redis {
* ```
*/
constructor(config: RedisConfigCloudflare, env?: Env) {
if(!config.url) {
if (!config.url) {
throw new Error(`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`)
}

if(!config.token) {
if (!config.token) {
throw new Error(`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`)
}

Expand All @@ -72,6 +77,7 @@ export class Redis extends core.Redis {
responseEncoding: config.responseEncoding,
signal: config.signal,
keepAlive: config.keepAlive,
readYourWrites: config.readYourWrites,
});

super(client, {
Expand Down
Loading
Loading