Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add lmpop command #1061

Merged
merged 2 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 15 additions & 24 deletions pkg/auto-pipeline.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { Redis } from "../platforms/nodejs"
import { Redis } from "../platforms/nodejs";
import { keygen, newHttpClient } from "./test-utils";

import { afterEach, describe, expect, test } from "bun:test";
import { ScriptLoadCommand } from "./commands/script_load";


const client = newHttpClient();

const { newKey, cleanup } = keygen();
Expand All @@ -17,10 +16,10 @@ describe("Auto pipeline", () => {
const scriptHash = await new ScriptLoadCommand(["return 1"]).exec(client);

const redis = Redis.autoPipeline({
latencyLogging: false
})
latencyLogging: false,
});
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0)
expect(redis.pipelineCounter).toBe(0);

// all the following commands are in a single pipeline call
const result = await Promise.all([
Expand Down Expand Up @@ -143,19 +142,18 @@ describe("Auto pipeline", () => {
redis.zscore(newKey(), "member"),
redis.zunionstore(newKey(), 1, [newKey()]),
redis.zunion(1, [newKey()]),
redis.json.set(newKey(), "$", { hello: "world" })
])
redis.json.set(newKey(), "$", { hello: "world" }),
]);
expect(result).toBeTruthy();
expect(result.length).toBe(120); // returns
expect(result.length).toBe(120); // returns
// @ts-expect-error pipelineCounter is not in type but accessible120 results
expect(redis.pipelineCounter).toBe(1);
});

test("should group async requests with sync requests", async () => {

const redis = Redis.autoPipeline({
latencyLogging: false
})
latencyLogging: false,
});
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0);

Expand All @@ -168,21 +166,17 @@ describe("Auto pipeline", () => {

// two get calls are added to the pipeline and pipeline
// is executed since we called await
const [fooValue, bazValue] = await Promise.all([
redis.get("foo"),
redis.get("baz")
]);
const [fooValue, bazValue] = await Promise.all([redis.get("foo"), redis.get("baz")]);

expect(fooValue).toBe("bar");
expect(bazValue).toBe(3);
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(1);
})
});

test("should execute a pipeline for each consecutive awaited command", async () => {

const redis = Redis.autoPipeline({
latencyLogging: false
latencyLogging: false,
});
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0);
Expand All @@ -202,13 +196,11 @@ describe("Auto pipeline", () => {
expect(redis.pipelineCounter).toBe(3);

expect([res1, res2, res3]).toEqual([1, 2, "OK"]);

});

test("should execute a single pipeline for several commands inside Promise.all", async () => {

const redis = Redis.autoPipeline({
latencyLogging: false
latencyLogging: false,
});
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0);
Expand All @@ -218,11 +210,10 @@ describe("Auto pipeline", () => {
redis.incr("baz"),
redis.incr("baz"),
redis.set("foo", "bar"),
redis.get("foo")
redis.get("foo"),
]);
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(1);
expect(resArray).toEqual(["OK", 1, 2, "OK", "bar"]);

})
});
});
14 changes: 6 additions & 8 deletions pkg/auto-pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
import { Command } from "./commands/command";
import { CommandArgs } from "./types";
import { Pipeline } from "./pipeline";
import { Redis } from "./redis";
import { CommandArgs } from "./types";

// will omit redis only commands since we call Pipeline in the background in auto pipeline
type redisOnly = Exclude<keyof Redis, keyof Pipeline>
type redisOnly = Exclude<keyof Redis, keyof Pipeline>;

export function createAutoPipelineProxy(_redis: Redis) {

const redis = _redis as Redis & {
autoPipelineExecutor: AutoPipelineExecutor;
}
};

if (!redis.autoPipelineExecutor) {
redis.autoPipelineExecutor = new AutoPipelineExecutor(redis);
}

return new Proxy(redis, {
get: (target, prop: "pipelineCounter" | keyof Pipeline ) => {

get: (target, prop: "pipelineCounter" | keyof Pipeline) => {
// return pipelineCounter of autoPipelineExecutor
if (prop == "pipelineCounter") {
if (prop === "pipelineCounter") {
return target.autoPipelineExecutor.pipelineCounter;
}

Expand All @@ -43,7 +41,7 @@ export class AutoPipelineExecutor {
private indexInCurrentPipeline = 0;
private redis: Redis;
pipeline: Pipeline; // only to make sure that proxy can work
pipelineCounter: number = 0; // to keep track of how many times a pipeline was executed
pipelineCounter = 0; // to keep track of how many times a pipeline was executed

constructor(redis: Redis) {
this.redis = redis;
Expand Down
86 changes: 86 additions & 0 deletions pkg/commands/lmpop.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { keygen, newHttpClient, randomID } from "../test-utils";

import { afterAll, describe, expect, test } from "bun:test";

import { LmPopCommand } from "./lmpop";
import { LPushCommand } from "./lpush";
const client = newHttpClient();

const { newKey, cleanup } = keygen();
afterAll(cleanup);

describe("LMPOP", () => {
test("should pop elements from the left-most end of the list", async () => {
const key = newKey();
const lpushElement1 = { name: randomID(), surname: randomID() };
const lpushElement2 = { name: randomID(), surname: randomID() };

await new LPushCommand([key, lpushElement1, lpushElement2]).exec(client);

const result = await new LmPopCommand<{ name: string; surname: string }>([
1,
[key],
"LEFT",
2,
]).exec(client);

expect(result?.[1][0].name).toEqual(lpushElement2.name);
});

test("should pop elements from the right-most end of the list", async () => {
const key = newKey();
const lpushElement1 = randomID();
const lpushElement2 = randomID();

await new LPushCommand([key, lpushElement1, lpushElement2]).exec(client);

const result = await new LmPopCommand<string>([1, [key], "RIGHT", 2]).exec(client);

expect(result?.[1][0]).toEqual(lpushElement1);
});

test("should pop elements from the first list then second list", async () => {
const key = newKey();
const lpushElement1 = randomID();
const lpushElement2 = randomID();

const key2 = newKey();
const lpushElement2_1 = randomID();
const lpushElement2_2 = randomID();

await new LPushCommand([key, lpushElement1, lpushElement2]).exec(client);
await new LPushCommand([key2, lpushElement2_1, lpushElement2_2]).exec(client);

const result = await new LmPopCommand<string>([2, [key, key2], "RIGHT", 4]).exec(client);
expect(result).toEqual([key, [lpushElement1, lpushElement2]]);

const result1 = await new LmPopCommand<string>([2, [key, key2], "RIGHT", 4]).exec(client);
expect(result1).toEqual([key2, [lpushElement2_1, lpushElement2_2]]);
});

test("should return null after first attempt", async () => {
const key = newKey();
const lpushElement1 = randomID();
const lpushElement2 = randomID();

await new LPushCommand([key, lpushElement1, lpushElement2]).exec(client);

await new LmPopCommand([1, [key], "LEFT", 2]).exec(client);

const result1 = await new LmPopCommand([1, [key], "LEFT", 2]).exec(client);

expect(result1).toBeNull();
});

test("should return without count", async () => {
const key = newKey();
const lpushElement1 = randomID();
const lpushElement2 = randomID();

await new LPushCommand([key, lpushElement1, lpushElement2]).exec(client);

const result1 = await new LmPopCommand([1, [key], "LEFT"]).exec(client);

expect(result1).toEqual([key, [lpushElement2]]);
});
});
18 changes: 18 additions & 0 deletions pkg/commands/lmpop.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Command, CommandOptions } from "./command";

/**
* @see https://redis.io/commands/lmpop
*/
export class LmPopCommand<TValues> extends Command<
[string, TValues[]] | null,
[string, TValues[]] | null
> {
constructor(
cmd: [numkeys: number, keys: string[], "LEFT" | "RIGHT", count?: number],
opts?: CommandOptions<[string, TValues[]] | null, [string, TValues[]] | null>,
) {
const [numkeys, keys, direction, count] = cmd;

super(["LMPOP", numkeys, ...keys, direction, ...(count ? ["COUNT", count] : [])], opts);
}
}
1 change: 1 addition & 0 deletions pkg/commands/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export * from "./linsert";
export * from "./llen";
export * from "./lmove";
export * from "./lpop";
export * from "./lmpop";
export * from "./lpos";
export * from "./lpush";
export * from "./lpushx";
Expand Down
7 changes: 7 additions & 0 deletions pkg/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import {
LRemCommand,
LSetCommand,
LTrimCommand,
LmPopCommand,
MGetCommand,
MSetCommand,
MSetNXCommand,
Expand Down Expand Up @@ -651,6 +652,12 @@ export class Pipeline<TCommands extends Command<any, any>[] = []> {
lpop = <TData>(...args: CommandArgs<typeof LPopCommand>) =>
this.chain(new LPopCommand<TData>(args, this.commandOptions));

/**
* @see https://redis.io/commands/lmpop
*/
lmpop = <TData>(...args: CommandArgs<typeof LmPopCommand>) =>
this.chain(new LmPopCommand<TData>(args, this.commandOptions));

/**
* @see https://redis.io/commands/lpos
*/
Expand Down
11 changes: 9 additions & 2 deletions pkg/redis.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createAutoPipelineProxy } from "../pkg/auto-pipeline";
import {
AppendCommand,
BitCountCommand,
Expand Down Expand Up @@ -81,6 +82,7 @@ import {
LRemCommand,
LSetCommand,
LTrimCommand,
LmPopCommand,
MGetCommand,
MSetCommand,
MSetNXCommand,
Expand Down Expand Up @@ -175,7 +177,6 @@ import { Requester, UpstashRequest, UpstashResponse } from "./http";
import { Pipeline } from "./pipeline";
import { Script } from "./script";
import type { CommandArgs, RedisOptions, Telemetry } from "./types";
import { AutoPipelineExecutor, createAutoPipelineProxy } from "../pkg/auto-pipeline"

// See https://github.com/upstash/upstash-redis/issues/342
// why we need this export
Expand Down Expand Up @@ -380,7 +381,7 @@ export class Redis {
});

autoPipeline = () => {
return createAutoPipelineProxy(this)
return createAutoPipelineProxy(this);
};

/**
Expand Down Expand Up @@ -743,6 +744,12 @@ export class Redis {
lpop = <TData>(...args: CommandArgs<typeof LPopCommand>) =>
new LPopCommand<TData>(args, this.opts).exec(this.client);

/**
* @see https://redis.io/commands/lmpop
*/
lmpop = <TData>(...args: CommandArgs<typeof LmPopCommand>) =>
new LmPopCommand<TData>(args, this.opts).exec(this.client);

/**
* @see https://redis.io/commands/lpos
*/
Expand Down
Loading