Skip to content

Commit

Permalink
feat: Clear task queue (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
unimonkiez authored Jan 19, 2023
1 parent 2d307a9 commit 17f13ec
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 7 deletions.
5 changes: 5 additions & 0 deletions clients/redis/js/src/MemorixBaseApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ export class MemorixBaseApi {
},
};
},
clear: async (key) => {
const hashedKey = hashPubsubKey(key);
await this.redis.del(hashedKey);
},
};
}

Expand All @@ -266,6 +270,7 @@ export class MemorixBaseApi {
return {
queue: (...args) => item.queue(undefined, ...args),
dequeue: (...args) => item.dequeue(undefined, ...args),
clear: (...args) => item.clear(undefined, ...args),
};
}
}
15 changes: 10 additions & 5 deletions clients/redis/js/src/example-schema.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,22 @@ describe("example schema has", () => {
});
describe("task", () => {
beforeEach(async () => {
const { stop } = await memorixApi.task.runAlgo.dequeue(() => {
return Animal.dog;
});
await new Promise((res) => setTimeout(res, 1000));
await stop();
await memorixApi.task.runAlgo.clear();
});
it("queue returns the queue size", async () => {
await memorixApi.task.runAlgo.queue("uv1");
const { queueSize } = await memorixApi.task.runAlgo.queue("uv2");
expect(queueSize).toBe(2);
});
it("queue clears correctly", async () => {
await memorixApi.task.runAlgo.queue("uv1");
const { queueSize } = await memorixApi.task.runAlgo.queue("uv2");
expect(queueSize).toBe(2);
await memorixApi.task.runAlgo.clear();
const { queueSize: queueSizeAfterClear } =
await memorixApi.task.runAlgo.queue("uv2");
expect(queueSizeAfterClear).toBe(1);
});
it("dequeue receives a message", (done) => {
memorixApi.task.runAlgo
.queue("uv3")
Expand Down
2 changes: 2 additions & 0 deletions clients/redis/js/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export type TaskItem<Key, Payload, Returns> = {
options?: TaskDequequeOptions
]
): TaskDequeue;
clear(...args: [key: Key]): void;
};
export type TaskItemNoKey<Payload, Returns> = {
queue(...args: [payload: Payload]): TaskQueue<Returns>;
Expand All @@ -67,4 +68,5 @@ export type TaskItemNoKey<Payload, Returns> = {
options?: TaskDequequeOptions
]
): TaskDequeue;
clear(...args: []): void;
};
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio
import functools
from memorix_client_redis.features.api.hash_key import hash_key
from uuid import uuid4
from memorix_client_redis.features.api.json import from_json, to_json, bytes_to_str
Expand Down Expand Up @@ -180,6 +182,22 @@ async def async_dequeue(
print("dequeue async")
yield typing.cast(TaskItemDequeueWithReturns[PT], None)

def clear(self, key: KT) -> None:
self._api._redis.delete(
hash_key(self._id, key=key),
)

async def async_clear(self, key: KT) -> None:
loop = asyncio.get_running_loop()
await loop.run_in_executor(
None,
functools.partial(
TaskItem.clear,
self=self,
key=key,
),
)


class TaskItemNoKey(TaskItem[None, PT, RT]):
# Different signature on purpose
Expand All @@ -198,6 +216,14 @@ def dequeue(self) -> typing.Generator[TaskItemDequeueWithReturns[PT], None, None
async def async_dequeue(self) -> typing.AsyncGenerator[TaskItemDequeueWithReturns[PT], None]: # type: ignore
return TaskItem.async_dequeue(self, key=None)

# Different signature on purpose
def clear(self) -> None: # type: ignore
TaskItem.clear(self, key=None)

# Different signature on purpose
async def async_clear(self) -> None: # type: ignore
await TaskItem.async_clear(self, key=None)


class TaskItemNoReturns(TaskItem[KT, PT, None]):
def __init__(
Expand Down Expand Up @@ -232,6 +258,14 @@ def dequeue(self, key: KT) -> typing.Generator[TaskItemDequeue[PT], None, None]:
async def async_dequeue(self, key: KT) -> typing.AsyncGenerator[TaskItemDequeue[PT], None]: # type: ignore
return TaskItem.async_dequeue(self, key=key)

# Different signature on purpose
def clear(self, key: KT) -> None:
TaskItem.clear(self, key=key)

# Different signature on purpose
async def async_clear(self, key: KT) -> None:
await TaskItem.async_clear(self, key=key)


class TaskItemNoKeyNoReturns(TaskItemNoReturns[None, PT]):
# Different signature on purpose
Expand All @@ -252,3 +286,11 @@ def dequeue(self) -> typing.Generator[TaskItemDequeue[PT], None, None]: # type:
# Different signature on purpose
async def async_dequeue(self) -> typing.AsyncGenerator[TaskItemDequeue[PT], None]: # type: ignore
return TaskItem.async_dequeue(self, key=None)

# Different signature on purpose
def clear(self) -> None: # type: ignore
TaskItem.clear(self, key=None)

# Different signature on purpose
async def async_clear(self) -> None: # type: ignore
await TaskItem.async_clear(self, key=None)
15 changes: 15 additions & 0 deletions clients/redis/python/tests/example_api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,18 @@ def test_task() -> None:

task1.kill()
task2.kill()


def test_task_clear() -> None:
memorix_api = MemorixApi(redis_url=redis_url)

try:
queue = memorix_api.task.runAlgo.queue(payload="send me cat")
queue = memorix_api.task.runAlgo.queue(payload="send me cat")
queue = memorix_api.task.runAlgo.queue(payload="send me cat")
assert queue.queue_size == 3
memorix_api.task.runAlgo.clear()
queue = memorix_api.task.runAlgo.queue(payload="send me cat")
assert queue.queue_size == 1
finally:
memorix_api.task.runAlgo.clear()
4 changes: 4 additions & 0 deletions develop.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
{
"name": "client-redis-python",
"path": "clients/redis/python"
},
{
"name": "docs",
"path": "docs"
}
],
"settings": {
Expand Down
5 changes: 5 additions & 0 deletions docs/content/en/docs/concepts/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ await { stop } = memorixApi.task.addMessage.dequeue(async ({ payload }) => {
await stop();
}
});

// Clears queue
await memorix_api.task.addMessage.clear();
```

{{% /tab %}}
Expand All @@ -288,6 +291,8 @@ for res in memorix_api.task.addMessage.dequeque():
if res.payload == "world"
break

# Clears queue
memorix_api.task.addMessage.clear()
```

{{% /tab %}}
Expand Down
4 changes: 2 additions & 2 deletions docs/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 17f13ec

Please sign in to comment.