Skip to content

Commit

Permalink
refactor(queue-keys): reuse getQueueQualifiedName method (#2109)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Aug 3, 2023
1 parent 41c8c57 commit 6b15914
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 29 deletions.
5 changes: 1 addition & 4 deletions python/bullmq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(self, queue: Queue, name: str, data: Any, opts: JobOptions = {}):
self.parent = {"id": parent.get("id"), "queueKey": parent.get("queue")} if parent else None
self.stacktrace: List[str] = []
self.scripts = Scripts(queue.prefix, queue.name, queue.redisConnection)
self.queueQualifiedName = queue.qualifiedName

def updateData(self, data):
self.data = data
Expand Down Expand Up @@ -152,10 +153,6 @@ async def saveStacktrace(self, pipe, err:str):
def moveToWaitingChildren(self, token, opts:dict):
return self.scripts.moveToWaitingChildren(self.id, token, opts)

@property
def queueQualifiedName(self):
return f"{self.queue.prefix}:{self.queue.name}"

@staticmethod
def fromJSON(queue: Queue, rawData: dict, jobId: str | None = None):
"""
Expand Down
11 changes: 8 additions & 3 deletions python/bullmq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ def __init__(self, name: str, redisOpts: dict | str = {}, opts: QueueOptions = {
self.prefix = opts.get("prefix", "bull")
self.scripts = Scripts(
self.prefix, name, self.redisConnection)
self.keys = self.scripts.queue_keys.getKeys(name)
self.qualifiedName = self.scripts.queue_keys.getQueueQualifiedName(name)

def toKey(self, type: str):
return self.scripts.queue_keys.toKey(self.name, type)

async def add(self, name: str, data, opts: JobOptions = {}):
"""
Expand Down Expand Up @@ -63,7 +68,7 @@ async def isPaused(self):
"""
Returns true if the queue is currently paused.
"""
paused_key_exists = await self.client.hexists(f"{self.prefix}:{self.name}:meta", "paused")
paused_key_exists = await self.client.hexists(self.keys["meta"], "paused")
return paused_key_exists == 1

async def obliterate(self, force: bool = False):
Expand Down Expand Up @@ -103,13 +108,13 @@ def trimEvents(self, maxLength: int):
@param maxLength:
"""
return self.client.xtrim(f"{self.prefix}:{self.name}:events", maxlen = maxLength, approximate = "~")
return self.client.xtrim(self.keys["events"], maxlen = maxLength, approximate = "~")

def removeDeprecatedPriorityKey(self):
"""
Delete old priority helper key.
"""
return self.client.delete(f"{self.prefix}:{self.name}:priority")
return self.client.delete(self.toKey("priority"))

async def getJobCountByTypes(self, *types):
result = await self.getJobCounts(*types)
Expand Down
22 changes: 22 additions & 0 deletions python/bullmq/queue_keys.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
class QueueKeys:
"""
This class handles all keys parser logic.
"""

def __init__(self, prefix: str = 'bull'):
self.prefix = prefix

def getKeys(self, name: str):
names = ["", "active", "wait", "waiting-children", "paused", "completed", "failed", "delayed",
"stalled", "limiter", "prioritized", "id", "stalled-check", "meta", "pc", "events"]
keys = {}
for name_type in names:
keys[name_type] = self.toKey(name, name_type)

return keys

def toKey(self, name: str, name_type: str):
return f"{self.getQueueQualifiedName(name)}:{name_type}"

def getQueueQualifiedName(self, name: str):
return f"{self.prefix}:{name}"
10 changes: 4 additions & 6 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import annotations
from redis import Redis
from bullmq.queue_keys import QueueKeys
from bullmq.error_code import ErrorCode
from bullmq.utils import isRedisVersionLowerThan, get_parent_key
from typing import Any, TYPE_CHECKING
Expand Down Expand Up @@ -53,14 +54,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"updateProgress": self.redisClient.register_script(self.getScript("updateProgress-2.lua")),
}

# loop all the names and add them to the keys object
names = ["", "active", "wait", "waiting-children", "paused", "completed", "failed", "delayed",
"stalled", "limiter", "prioritized", "id", "stalled-check", "meta", "pc", "events", "waiting-children"]
for name in names:
self.keys[name] = self.toKey(name)
self.queue_keys = QueueKeys(prefix)
self.keys = self.queue_keys.getKeys(queueName)

def toKey(self, name: str):
return f"{self.prefix}:{self.queueName}:{name}"
return self.queue_keys.toKey(self.queueName, name)

def getScript(self, name: str):
"""
Expand Down
1 change: 1 addition & 0 deletions python/bullmq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, name: str, processor: Callable[[Job, str], asyncio.Future], o
self.blockUntil = 0
self.limitUntil = 0
self.drained = False
self.qualifiedName = self.scripts.queue_keys.getQueueQualifiedName(name)

if opts.get("autorun", True):
asyncio.ensure_future(self.run())
Expand Down
3 changes: 2 additions & 1 deletion src/classes/flow-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ export class FlowProducer extends EventEmitter {
parent: {
parentOpts: {
id: parentId,
queue: queueKeysParent.getPrefixedQueueName(node.queueName),
queue: queueKeysParent.getQueueQualifiedName(node.queueName),
},
parentDependenciesKey,
},
Expand Down Expand Up @@ -433,6 +433,7 @@ export class FlowProducer extends EventEmitter {
keys: queueKeys.getKeys(node.queueName),
toKey: (type: string) => queueKeys.toKey(node.queueName, type),
opts: { prefix },
qualifiedName: queueKeys.getQueueQualifiedName(node.queueName),
closing: this.closing,
waitUntilReady: async () => this.connection.client,
removeListener: this.removeListener.bind(this) as any,
Expand Down
16 changes: 8 additions & 8 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ export class Job<
NameType extends string = string,
> implements MinimalJob<DataType, ReturnType, NameType>
{
/**
* It includes the prefix, the namespace separator :, and queue name.
* @see https://www.gnu.org/software/gawk/manual/html_node/Qualified-Names.html
*/
public readonly queueQualifiedName: string;

/**
* The progress a job has performed so far.
* @defaultValue 0
Expand Down Expand Up @@ -183,6 +189,8 @@ export class Job<

this.toKey = queue.toKey.bind(queue);
this.scripts = new Scripts(queue);

this.queueQualifiedName = queue.qualifiedName;
}

/**
Expand Down Expand Up @@ -715,14 +723,6 @@ export class Job<
return this.queue.opts.prefix;
}

/**
* @returns it includes the prefix, the namespace separator :, and queue name.
* @see https://www.gnu.org/software/gawk/manual/html_node/Qualified-Names.html
*/
get queueQualifiedName(): string {
return `${this.prefix}:${this.queueName}`;
}

/**
* Get current state.
*
Expand Down
2 changes: 2 additions & 0 deletions src/classes/queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export class QueueBase extends EventEmitter implements MinimalQueue {

protected scripts: Scripts;
protected connection: RedisConnection;
public readonly qualifiedName: string;

/**
*
Expand Down Expand Up @@ -66,6 +67,7 @@ export class QueueBase extends EventEmitter implements MinimalQueue {
});

const queueKeys = new QueueKeys(opts.prefix);
this.qualifiedName = queueKeys.getQueueQualifiedName(name);
this.keys = queueKeys.getKeys(name);
this.toKey = (type: string) => queueKeys.toKey(name, type);
this.scripts = new Scripts(this);
Expand Down
10 changes: 3 additions & 7 deletions src/classes/queue-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ export class QueueKeys {
'',
'active',
'wait',
'waiting',
'waiting-children',
'paused',
'resumed',
'id',
'delayed',
'prioritized',
Expand All @@ -21,11 +20,8 @@ export class QueueKeys {
'stalled',
'repeat',
'limiter',
'drained',
'progress',
'meta',
'events',
'delay',
'pc',
].forEach(key => {
keys[key] = this.toKey(name, key);
Expand All @@ -35,10 +31,10 @@ export class QueueKeys {
}

toKey(name: string, type: string): string {
return `${this.getPrefixedQueueName(name)}:${type}`;
return `${this.getQueueQualifiedName(name)}:${type}`;
}

getPrefixedQueueName(name: string): string {
getQueueQualifiedName(name: string): string {
return `${this.prefix}:${name}`;
}
}
1 change: 1 addition & 0 deletions src/types/minimal-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type MinimalQueue = Pick<
| 'toKey'
| 'keys'
| 'opts'
| 'qualifiedName'
| 'closing'
| 'waitUntilReady'
| 'removeListener'
Expand Down

0 comments on commit 6b15914

Please sign in to comment.