Skip to content

Commit

Permalink
feat: add lmpop command (#1061)
Browse files Browse the repository at this point in the history
* feat: add lmpop command

* fix: add missing check
  • Loading branch information
ogzhanolguncu authored May 14, 2024
1 parent c9dc7e8 commit 36bcd25
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 34 deletions.
39 changes: 15 additions & 24 deletions pkg/auto-pipeline.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { Redis } from "../platforms/nodejs"
import { Redis } from "../platforms/nodejs";
import { keygen, newHttpClient } from "./test-utils";

import { afterEach, describe, expect, test } from "bun:test";
import { ScriptLoadCommand } from "./commands/script_load";


const client = newHttpClient();

const { newKey, cleanup } = keygen();
Expand All @@ -17,10 +16,10 @@ describe("Auto pipeline", () => {
const scriptHash = await new ScriptLoadCommand(["return 1"]).exec(client);

const redis = Redis.autoPipeline({
latencyLogging: false
})
latencyLogging: false,
});
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0)
expect(redis.pipelineCounter).toBe(0);

// all the following commands are in a single pipeline call
const result = await Promise.all([
Expand Down Expand Up @@ -143,19 +142,18 @@ describe("Auto pipeline", () => {
redis.zscore(newKey(), "member"),
redis.zunionstore(newKey(), 1, [newKey()]),
redis.zunion(1, [newKey()]),
redis.json.set(newKey(), "$", { hello: "world" })
])
redis.json.set(newKey(), "$", { hello: "world" }),
]);
expect(result).toBeTruthy();
expect(result.length).toBe(120); // returns
expect(result.length).toBe(120); // returns
// @ts-expect-error pipelineCounter is not in type but accessible120 results
expect(redis.pipelineCounter).toBe(1);
});

test("should group async requests with sync requests", async () => {

const redis = Redis.autoPipeline({
latencyLogging: false
})
latencyLogging: false,
});
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0);

Expand All @@ -168,21 +166,17 @@ describe("Auto pipeline", () => {

// two get calls are added to the pipeline and pipeline
// is executed since we called await
const [fooValue, bazValue] = await Promise.all([
redis.get("foo"),
redis.get("baz")
]);
const [fooValue, bazValue] = await Promise.all([redis.get("foo"), redis.get("baz")]);

expect(fooValue).toBe("bar");
expect(bazValue).toBe(3);
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(1);
})
});

test("should execute a pipeline for each consecutive awaited command", async () => {

const redis = Redis.autoPipeline({
latencyLogging: false
latencyLogging: false,
});
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0);
Expand All @@ -202,13 +196,11 @@ describe("Auto pipeline", () => {
expect(redis.pipelineCounter).toBe(3);

expect([res1, res2, res3]).toEqual([1, 2, "OK"]);

});

test("should execute a single pipeline for several commands inside Promise.all", async () => {

const redis = Redis.autoPipeline({
latencyLogging: false
latencyLogging: false,
});
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(0);
Expand All @@ -218,11 +210,10 @@ describe("Auto pipeline", () => {
redis.incr("baz"),
redis.incr("baz"),
redis.set("foo", "bar"),
redis.get("foo")
redis.get("foo"),
]);
// @ts-expect-error pipelineCounter is not in type but accessible
expect(redis.pipelineCounter).toBe(1);
expect(resArray).toEqual(["OK", 1, 2, "OK", "bar"]);

})
});
});
14 changes: 6 additions & 8 deletions pkg/auto-pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
import { Command } from "./commands/command";
import { CommandArgs } from "./types";
import { Pipeline } from "./pipeline";
import { Redis } from "./redis";
import { CommandArgs } from "./types";

// will omit redis only commands since we call Pipeline in the background in auto pipeline
type redisOnly = Exclude<keyof Redis, keyof Pipeline>
type redisOnly = Exclude<keyof Redis, keyof Pipeline>;

export function createAutoPipelineProxy(_redis: Redis) {

const redis = _redis as Redis & {
autoPipelineExecutor: AutoPipelineExecutor;
}
};

if (!redis.autoPipelineExecutor) {
redis.autoPipelineExecutor = new AutoPipelineExecutor(redis);
}

return new Proxy(redis, {
get: (target, prop: "pipelineCounter" | keyof Pipeline ) => {

get: (target, prop: "pipelineCounter" | keyof Pipeline) => {
// return pipelineCounter of autoPipelineExecutor
if (prop == "pipelineCounter") {
if (prop === "pipelineCounter") {
return target.autoPipelineExecutor.pipelineCounter;
}

Expand All @@ -43,7 +41,7 @@ export class AutoPipelineExecutor {
private indexInCurrentPipeline = 0;
private redis: Redis;
pipeline: Pipeline; // only to make sure that proxy can work
pipelineCounter: number = 0; // to keep track of how many times a pipeline was executed
pipelineCounter = 0; // to keep track of how many times a pipeline was executed

constructor(redis: Redis) {
this.redis = redis;
Expand Down
86 changes: 86 additions & 0 deletions pkg/commands/lmpop.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { keygen, newHttpClient, randomID } from "../test-utils";

import { afterAll, describe, expect, test } from "bun:test";

import { LmPopCommand } from "./lmpop";
import { LPushCommand } from "./lpush";
const client = newHttpClient();

const { newKey, cleanup } = keygen();
afterAll(cleanup);

describe("LMPOP", () => {
test("should pop elements from the left-most end of the list", async () => {
const key = newKey();
const lpushElement1 = { name: randomID(), surname: randomID() };
const lpushElement2 = { name: randomID(), surname: randomID() };

await new LPushCommand([key, lpushElement1, lpushElement2]).exec(client);

const result = await new LmPopCommand<{ name: string; surname: string }>([
1,
[key],
"LEFT",
2,
]).exec(client);

expect(result?.[1][0].name).toEqual(lpushElement2.name);
});

test("should pop elements from the right-most end of the list", async () => {
const key = newKey();
const lpushElement1 = randomID();
const lpushElement2 = randomID();

await new LPushCommand([key, lpushElement1, lpushElement2]).exec(client);

const result = await new LmPopCommand<string>([1, [key], "RIGHT", 2]).exec(client);

expect(result?.[1][0]).toEqual(lpushElement1);
});

test("should pop elements from the first list then second list", async () => {
const key = newKey();
const lpushElement1 = randomID();
const lpushElement2 = randomID();

const key2 = newKey();
const lpushElement2_1 = randomID();
const lpushElement2_2 = randomID();

await new LPushCommand([key, lpushElement1, lpushElement2]).exec(client);
await new LPushCommand([key2, lpushElement2_1, lpushElement2_2]).exec(client);

const result = await new LmPopCommand<string>([2, [key, key2], "RIGHT", 4]).exec(client);
expect(result).toEqual([key, [lpushElement1, lpushElement2]]);

const result1 = await new LmPopCommand<string>([2, [key, key2], "RIGHT", 4]).exec(client);
expect(result1).toEqual([key2, [lpushElement2_1, lpushElement2_2]]);
});

test("should return null after first attempt", async () => {
const key = newKey();
const lpushElement1 = randomID();
const lpushElement2 = randomID();

await new LPushCommand([key, lpushElement1, lpushElement2]).exec(client);

await new LmPopCommand([1, [key], "LEFT", 2]).exec(client);

const result1 = await new LmPopCommand([1, [key], "LEFT", 2]).exec(client);

expect(result1).toBeNull();
});

test("should return without count", async () => {
const key = newKey();
const lpushElement1 = randomID();
const lpushElement2 = randomID();

await new LPushCommand([key, lpushElement1, lpushElement2]).exec(client);

const result1 = await new LmPopCommand([1, [key], "LEFT"]).exec(client);

expect(result1).toEqual([key, [lpushElement2]]);
});
});
18 changes: 18 additions & 0 deletions pkg/commands/lmpop.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Command, CommandOptions } from "./command";

/**
* @see https://redis.io/commands/lmpop
*/
export class LmPopCommand<TValues> extends Command<
[string, TValues[]] | null,
[string, TValues[]] | null
> {
constructor(
cmd: [numkeys: number, keys: string[], "LEFT" | "RIGHT", count?: number],
opts?: CommandOptions<[string, TValues[]] | null, [string, TValues[]] | null>,
) {
const [numkeys, keys, direction, count] = cmd;

super(["LMPOP", numkeys, ...keys, direction, ...(count ? ["COUNT", count] : [])], opts);
}
}
1 change: 1 addition & 0 deletions pkg/commands/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export * from "./linsert";
export * from "./llen";
export * from "./lmove";
export * from "./lpop";
export * from "./lmpop";
export * from "./lpos";
export * from "./lpush";
export * from "./lpushx";
Expand Down
7 changes: 7 additions & 0 deletions pkg/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import {
LRemCommand,
LSetCommand,
LTrimCommand,
LmPopCommand,
MGetCommand,
MSetCommand,
MSetNXCommand,
Expand Down Expand Up @@ -651,6 +652,12 @@ export class Pipeline<TCommands extends Command<any, any>[] = []> {
lpop = <TData>(...args: CommandArgs<typeof LPopCommand>) =>
this.chain(new LPopCommand<TData>(args, this.commandOptions));

/**
* @see https://redis.io/commands/lmpop
*/
lmpop = <TData>(...args: CommandArgs<typeof LmPopCommand>) =>
this.chain(new LmPopCommand<TData>(args, this.commandOptions));

/**
* @see https://redis.io/commands/lpos
*/
Expand Down
11 changes: 9 additions & 2 deletions pkg/redis.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { createAutoPipelineProxy } from "../pkg/auto-pipeline";
import {
AppendCommand,
BitCountCommand,
Expand Down Expand Up @@ -81,6 +82,7 @@ import {
LRemCommand,
LSetCommand,
LTrimCommand,
LmPopCommand,
MGetCommand,
MSetCommand,
MSetNXCommand,
Expand Down Expand Up @@ -175,7 +177,6 @@ import { Requester, UpstashRequest, UpstashResponse } from "./http";
import { Pipeline } from "./pipeline";
import { Script } from "./script";
import type { CommandArgs, RedisOptions, Telemetry } from "./types";
import { AutoPipelineExecutor, createAutoPipelineProxy } from "../pkg/auto-pipeline"

// See https://github.com/upstash/upstash-redis/issues/342
// why we need this export
Expand Down Expand Up @@ -380,7 +381,7 @@ export class Redis {
});

autoPipeline = () => {
return createAutoPipelineProxy(this)
return createAutoPipelineProxy(this);
};

/**
Expand Down Expand Up @@ -743,6 +744,12 @@ export class Redis {
lpop = <TData>(...args: CommandArgs<typeof LPopCommand>) =>
new LPopCommand<TData>(args, this.opts).exec(this.client);

/**
* @see https://redis.io/commands/lmpop
*/
lmpop = <TData>(...args: CommandArgs<typeof LmPopCommand>) =>
new LmPopCommand<TData>(args, this.opts).exec(this.client);

/**
* @see https://redis.io/commands/lpos
*/
Expand Down

0 comments on commit 36bcd25

Please sign in to comment.