diff --git a/pkg/commands/command.ts b/pkg/commands/command.ts index ea524108..b79b4167 100644 --- a/pkg/commands/command.ts +++ b/pkg/commands/command.ts @@ -83,7 +83,9 @@ export class Command { public async exec(client: Requester): Promise { const { result, error } = await client.request({ body: this.command, + upstashSyncToken: client.upstashSyncToken, }); + if (error) { throw new UpstashError(error); } diff --git a/pkg/http.ts b/pkg/http.ts index 6bb54433..7dd5a73a 100644 --- a/pkg/http.ts +++ b/pkg/http.ts @@ -15,10 +15,21 @@ export type UpstashRequest = { * Request body will be serialized to json */ body?: unknown; + + upstashSyncToken?: string; }; export type UpstashResponse = { result?: TResult; error?: string }; -export type Requester = { +export interface Requester { + /** + * When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client. + */ + readYourWrites?: boolean; + + /** + * This token is used to ensure that the client is in sync with the server. On each request, we send this token in the header, and the server will return a new token. + */ + upstashSyncToken?: string; request: (req: UpstashRequest) => Promise>; }; @@ -95,11 +106,17 @@ export type HttpClientConfig = { agent?: any; signal?: AbortSignal; keepAlive?: boolean; + + /** + * When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client. + */ + readYourWrites?: boolean; } & RequesterConfig; export class HttpClient implements Requester { public baseUrl: string; public headers: Record; + public readonly options: { backend?: string; agent: any; @@ -108,6 +125,8 @@ export class HttpClient implements Requester { cache?: CacheSetting; keepAlive: boolean; }; + public readYourWrites: boolean; + public upstashSyncToken = ""; public readonly retry: { attempts: number; @@ -123,6 +142,8 @@ export class HttpClient implements Requester { signal: config.signal, keepAlive: config.keepAlive ?? true, }; + this.upstashSyncToken = ""; + this.readYourWrites = config.readYourWrites ?? true; this.baseUrl = config.baseUrl.replace(/\/$/, ""); @@ -185,6 +206,14 @@ export class HttpClient implements Requester { backend: this.options.backend, }; + /** + * We've recieved a new `upstash-sync-token` in the previous response. We use it in the next request to observe the effects of previous requests. + */ + if (this.readYourWrites) { + const newHeader = this.upstashSyncToken; + this.headers["upstash-sync-token"] = newHeader; + } + let res: Response | null = null; let error: Error | null = null; for (let i = 0; i <= this.retry.attempts; i++) { @@ -216,6 +245,20 @@ export class HttpClient implements Requester { throw new UpstashError(`${body.error}, command was: ${JSON.stringify(req.body)}`); } + if (this.readYourWrites) { + const headers = res.headers; + this.upstashSyncToken = headers.get("upstash-sync-token") ?? ""; + } + + + /** + * We save the new `upstash-sync-token` in the response header to use it in the next request. + */ + if (this.readYourWrites) { + const headers = res.headers; + this.upstashSyncToken = headers.get("upstash-sync-token") ?? ""; + } + if (this.options.responseEncoding === "base64") { if (Array.isArray(body)) { return body.map(({ result, error }) => ({ @@ -226,6 +269,7 @@ export class HttpClient implements Requester { const result = decode(body.result) as any; return { result, error: body.error }; } + return body as UpstashResponse; } } diff --git a/pkg/pipeline.ts b/pkg/pipeline.ts index 3bd8a4fc..64a57889 100644 --- a/pkg/pipeline.ts +++ b/pkg/pipeline.ts @@ -283,6 +283,7 @@ export class Pipeline[] = []> { throw new Error("Pipeline is empty"); } const path = this.multiExec ? ["multi-exec"] : ["pipeline"]; + const res = (await this.client.request({ path, body: Object.values(this.commands).map((c) => c.command), diff --git a/pkg/read-your-writes.test.ts b/pkg/read-your-writes.test.ts new file mode 100644 index 00000000..b8feba9b --- /dev/null +++ b/pkg/read-your-writes.test.ts @@ -0,0 +1,115 @@ +import { keygen, newHttpClient } from "./test-utils"; + +import { afterAll, describe, expect, test } from "bun:test"; + +import { Redis as PublicRedis } from "../platforms/nodejs"; +import { SetCommand } from "./commands/set"; +import { Redis } from "./redis"; + +const client = newHttpClient(); +const { cleanup } = keygen(); +afterAll(cleanup); +describe("Read Your Writes Feature", () => { + test("successfully retrieves Upstash-Sync-Token in the response header and updates local state", async () => { + const initialSync = client.upstashSyncToken; + await new SetCommand(["key", "value"]).exec(client); + const updatedSync = client.upstashSyncToken; + await new SetCommand(["key", "value"]).exec(client); + + expect(updatedSync).not.toEqual(initialSync); + }); + + test("succesfully updates sync state with pipeline", async () => { + const initialSync = client.upstashSyncToken; + + const { pipeline } = new Redis(client); + const p = pipeline(); + + p.set("key1", "value1"); + p.set("key2", "value2"); + p.set("key3", "value3"); + + await p.exec(); + + const updatedSync = client.upstashSyncToken; + + expect(initialSync).not.toEqual(updatedSync); + }); + + test("updates after each element of promise.all", async () => { + let currentSync = client.upstashSyncToken; + + const promises = Array.from({ length: 3 }, (_, i) => + new SetCommand([`key${i}`, `value${i}`]).exec(client).then(() => { + expect(client.upstashSyncToken).not.toEqual(currentSync); + currentSync = client.upstashSyncToken; + }), + ); + + await Promise.all(promises); + }); + + test("updates after successful lua script call", async () => { + const s = `redis.call('SET', 'mykey', 'myvalue') + return 1 + `; + + const initialSync = client.upstashSyncToken; + + const redis = new Redis(client); + const script = redis.createScript(s); + + await script.exec([], []); + + const updatedSync = client.upstashSyncToken; + + expect(updatedSync).not.toEqual(initialSync); + }); + + test("should not update the sync state in case of Redis client with manuel HTTP client and opt-out ryw", async () => { + const optOutClient = newHttpClient(); + const redis = new Redis(optOutClient, { readYourWrites: false }); + + const initialSync = optOutClient.upstashSyncToken; + + await redis.set("key", "value"); + + const updatedSync = optOutClient.upstashSyncToken; + + expect(updatedSync).toEqual(initialSync); + }); + + test("should not update the sync state when public Redis interface is provided with opt-out", async () => { + const redis = new PublicRedis({ + url: process.env.UPSTASH_REDIS_REST_URL, + token: process.env.UPSTASH_REDIS_REST_TOKEN, + readYourWrites: false, + }); + + // @ts-expect-error - We need the sync token for this test, which resides on the client + const initialSync = redis.client.upstashSyncToken; + + await redis.set("key", "value"); + + // @ts-expect-error - We need the sync token for this test, which resides on the client + const updatedSync = redis.client.upstashSyncToken; + + expect(updatedSync).toEqual(initialSync); + }); + + test("should update the sync state when public Redis interface is provided with default behaviour", async () => { + const redis = new PublicRedis({ + url: process.env.UPSTASH_REDIS_REST_URL, + token: process.env.UPSTASH_REDIS_REST_TOKEN, + }); + + // @ts-expect-error - We need the sync token for this test, which resides on the client + const initialSync = redis.client.upstashSyncToken; + + await redis.set("key", "value"); + + // @ts-expect-error - We need the sync token for this test, which resides on the client + const updatedSync = redis.client.upstashSyncToken; + expect(updatedSync).not.toEqual(initialSync); + }); +}); diff --git a/pkg/redis.ts b/pkg/redis.ts index 103c31ee..6c47a371 100644 --- a/pkg/redis.ts +++ b/pkg/redis.ts @@ -210,6 +210,10 @@ export class Redis { this.client = client; this.opts = opts; this.enableTelemetry = opts?.enableTelemetry ?? true; + + if (opts?.readYourWrites === false) { + this.client.readYourWrites = false; + } this.enableAutoPipelining = opts?.enableAutoPipelining ?? true; } diff --git a/pkg/types.ts b/pkg/types.ts index fddfc794..970ca887 100644 --- a/pkg/types.ts +++ b/pkg/types.ts @@ -31,4 +31,5 @@ export type RedisOptions = { latencyLogging?: boolean; enableTelemetry?: boolean; enableAutoPipelining?: boolean; + readYourWrites?: boolean; }; diff --git a/platforms/cloudflare.ts b/platforms/cloudflare.ts index f661feca..3129baa9 100644 --- a/platforms/cloudflare.ts +++ b/platforms/cloudflare.ts @@ -30,6 +30,11 @@ export type RedisConfigCloudflare = { */ signal?: AbortSignal; keepAlive?: boolean; + + /** + * When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client. + */ + readYourWrites?: boolean; } & core.RedisOptions & RequesterConfig & Env; @@ -51,15 +56,11 @@ export class Redis extends core.Redis { */ constructor(config: RedisConfigCloudflare, env?: Env) { if (!config.url) { - throw new Error( - `[Upstash Redis] The 'url' property is missing or undefined in your Redis config.` - ); + throw new Error(`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`) } if (!config.token) { - throw new Error( - `[Upstash Redis] The 'token' property is missing or undefined in your Redis config.` - ); + throw new Error(`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`) } if (config.url.startsWith(" ") || config.url.endsWith(" ") || /\r|\n/.test(config.url)) { @@ -76,6 +77,7 @@ export class Redis extends core.Redis { responseEncoding: config.responseEncoding, signal: config.signal, keepAlive: config.keepAlive, + readYourWrites: config.readYourWrites, }); super(client, { diff --git a/platforms/fastly.ts b/platforms/fastly.ts index af3fe8cf..a8a72c3b 100644 --- a/platforms/fastly.ts +++ b/platforms/fastly.ts @@ -27,6 +27,11 @@ export type RedisConfigFastly = { */ backend: string; keepAlive?: boolean; + + /** + * When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client. + */ + readYourWrites?: boolean; } & core.RedisOptions & RequesterConfig; @@ -48,15 +53,11 @@ export class Redis extends core.Redis { */ constructor(config: RedisConfigFastly) { if (!config.url) { - throw new Error( - `[Upstash Redis] The 'url' property is missing or undefined in your Redis config.` - ); + throw new Error(`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`) } if (!config.token) { - throw new Error( - `[Upstash Redis] The 'token' property is missing or undefined in your Redis config.` - ); + throw new Error(`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`) } if (config.url.startsWith(" ") || config.url.endsWith(" ") || /\r|\n/.test(config.url)) { @@ -73,6 +74,7 @@ export class Redis extends core.Redis { options: { backend: config.backend }, responseEncoding: config.responseEncoding, keepAlive: config.keepAlive, + readYourWrites: config.readYourWrites, }); super(client, { diff --git a/platforms/nodejs.ts b/platforms/nodejs.ts index 358e1613..b00c0f79 100644 --- a/platforms/nodejs.ts +++ b/platforms/nodejs.ts @@ -53,6 +53,11 @@ export type RedisConfigNodejs = { latencyLogging?: boolean; agent?: unknown; keepAlive?: boolean; + + /** + * When this flag is enabled, any subsequent commands issued by this client are guaranteed to observe the effects of all earlier writes submitted by the same client. + */ + readYourWrites?: boolean; } & core.RedisOptions & RequesterConfig; @@ -97,15 +102,11 @@ export class Redis extends core.Redis { } if (!configOrRequester.url) { - throw new Error( - `[Upstash Redis] The 'url' property is missing or undefined in your Redis config.` - ); + throw new Error(`[Upstash Redis] The 'url' property is missing or undefined in your Redis config.`) } if (!configOrRequester.token) { - throw new Error( - `[Upstash Redis] The 'token' property is missing or undefined in your Redis config.` - ); + throw new Error(`[Upstash Redis] The 'token' property is missing or undefined in your Redis config.`) } if ( @@ -133,6 +134,7 @@ export class Redis extends core.Redis { cache: configOrRequester.cache ?? "no-store", signal: configOrRequester.signal, keepAlive: configOrRequester.keepAlive, + readYourWrites: configOrRequester.readYourWrites, }); super(client, {