Skip to content

Commit

Permalink
fix(parent): consider re-adding child that is in completed state usin…
Browse files Browse the repository at this point in the history
…g same jobIds (#2627) (python) fixes #2554
  • Loading branch information
roggervalf authored Jul 10, 2024
1 parent db10c87 commit 00cd017
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 22 deletions.
10 changes: 5 additions & 5 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}):
push_cmd = "RPUSH" if lifo else "LPUSH"

args = [self.keys[''], round(time.time() * 1000), push_cmd,
job_id, token, "1" if opts.get("skipAttempt") else "0"]
job_id, token, "1" if opts.get("skipAttempt") else "0"]

return (keys, args)

Expand All @@ -275,7 +275,7 @@ def moveToDelayedArgs(self, job_id: str, timestamp: int, token: str, delay: int
keys.append(self.keys['stalled'])

args = [self.keys[''], round(time.time() * 1000), str(max_timestamp),
job_id, token, delay, "1" if opts.get("skipAttempt") else "0"]
job_id, token, delay, "1" if opts.get("skipAttempt") else "0"]

return (keys, args)

Expand Down Expand Up @@ -325,9 +325,9 @@ def getCounts(self, types):

def getCountsPerPriorityArgs(self, priorities):
keys = [self.keys['wait'],
self.keys['paused'],
self.keys['meta'],
self.keys['prioritized']]
self.keys['paused'],
self.keys['meta'],
self.keys['prioritized']]

args = priorities

Expand Down
2 changes: 1 addition & 1 deletion python/bullmq/types/job_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class JobOptions(TypedDict, total=False):
"""
Backoff setting for automatic retries if the job fails.
"""

jobId: str
"""
Override the job ID - by default, the job ID is a unique
Expand Down
1 change: 0 additions & 1 deletion python/bullmq/types/queue_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ class QueueBaseOptions(TypedDict, total=False):
"""
Prefix for all queue keys.
"""

1 change: 1 addition & 0 deletions python/bullmq/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import semver
import traceback


def isRedisVersionLowerThan(current_version, minimum_version):
return semver.VersionInfo.parse(current_version).compare(minimum_version) == -1

Expand Down
4 changes: 3 additions & 1 deletion python/bullmq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

maximum_block_timeout = 10
# 1 millisecond is chosen because the granularity of our timestamps are milliseconds.
# Obviously we can still process much faster than 1 job per millisecond but delays and rate limits will never work with more accuracy than 1ms.
# Obviously we can still process much faster than 1 job per millisecond but delays and
# rate limits will never work with more accuracy than 1ms.
minimum_block_timeout = 0.001


class Worker(EventEmitter):
def __init__(self, name: str, processor: Callable[[Job, str], asyncio.Future], opts: WorkerOptions = {}):
super().__init__()
Expand Down
1 change: 1 addition & 0 deletions python/tests/bulk_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async def process(job: Job, token: str):
completed_events = Future()

job_count = 1

def completing(job: Job, result):
nonlocal job_count
if job_count == 2:
Expand Down
12 changes: 6 additions & 6 deletions python/tests/job_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def test_set_and_get_progress_as_number(self):
await job.updateProgress(42)
stored_job = await Job.fromId(queue, job.id)
self.assertEqual(stored_job.progress, 42)

await queue.close()

async def test_set_and_get_progress_as_object(self):
Expand All @@ -36,7 +36,7 @@ async def test_set_and_get_progress_as_object(self):
await job.updateProgress({"total": 120, "completed": 40})
stored_job = await Job.fromId(queue, job.id)
self.assertEqual(stored_job.progress, {"total": 120, "completed": 40})

await queue.close()

async def test_get_job_state(self):
Expand All @@ -45,7 +45,7 @@ async def test_get_job_state(self):
state = await job.getState()

self.assertEqual(state, "waiting")

await queue.close()

async def test_job_log(self):
Expand All @@ -67,9 +67,9 @@ async def test_update_job_data(self):
job = await queue.add("test", {"foo": "bar"}, {})
await job.updateData({"baz": "qux"})
stored_job = await Job.fromId(queue, job.id)

self.assertEqual(stored_job.data, {"baz": "qux"})

await queue.close()

async def test_update_job_data_when_is_removed(self):
Expand All @@ -78,7 +78,7 @@ async def test_update_job_data_when_is_removed(self):
await job.remove()
with self.assertRaises(TypeError):
await job.updateData({"baz": "qux"})

await queue.close()

async def test_promote_delayed_job(self):
Expand Down
10 changes: 5 additions & 5 deletions python/tests/queue_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async def test_get_jobs(self):
job1 = await queue.add("test-job", {"foo": "bar"}, {})
job2 = await queue.add("test-job", {"foo": "bar"}, {})
jobs = await queue.getJobs(["wait"])

self.assertEqual(job2.id, jobs[0].id)
self.assertEqual(job1.id, jobs[1].id)
await queue.close()
Expand Down Expand Up @@ -106,7 +106,7 @@ async def test_trim_events_manually(self):
events_length = await queue.client.xlen(f"{queue.prefix}:{queueName}:events")
self.assertEqual(events_length, 8)

await queue.trimEvents(0);
await queue.trimEvents(0)

events_length = await queue.client.xlen(f"{queue.prefix}:{queue.name}:events")

Expand All @@ -124,7 +124,7 @@ async def test_trim_events_manually_with_custom_prefix(self):
events_length = await queue.client.xlen(f"test:{queueName}:events")
self.assertEqual(events_length, 8)

await queue.trimEvents(0);
await queue.trimEvents(0)

events_length = await queue.client.xlen(f"test:{queue.name}:events")

Expand Down Expand Up @@ -374,13 +374,13 @@ async def test_promote_all_delayed_jobs(self):
job_count = 8

for index in range(job_count):
data = {"idx": index}
data = { "idx": index }
await queue.add("test", data=data, opts={ "delay": 5000 })

delayed_count = await queue.getJobCounts('delayed')
self.assertEqual(delayed_count['delayed'], job_count)

await queue.promoteJobs();
await queue.promoteJobs()

waiting_count = await queue.getJobCounts('waiting')
self.assertEqual(waiting_count['waiting'], job_count)
Expand Down
4 changes: 2 additions & 2 deletions python/tests/worker_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ async def parent_process(job: Job, token: str):
})
step = Step.Second
elif step == Step.Second:
await queue.add('child-2', {"foo": "bar" },{
await queue.add('child-2', { "foo": "bar" }, {
"parent": {
"id": job.id,
"queue": job.queueQualifiedName
Expand Down Expand Up @@ -364,7 +364,7 @@ async def test_process_job_respecting_the_concurrency_set(self):

async def process(job: Job, token: str):
nonlocal num_jobs_processing
nonlocal wait
nonlocal wait
nonlocal pending_message_to_process
num_jobs_processing += 1
self.assertLess(num_jobs_processing, 5)
Expand Down
2 changes: 1 addition & 1 deletion src/commands/includes/handleDuplicatedJob.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ local function handleDuplicatedJob(jobKey, jobId, currentParentKey, currentParen
parentData, parentDependenciesKey, completedKey, eventsKey, maxEvents, timestamp)
local existedParentKey = rcall("HGET", jobKey, "parentKey")

if not existedParentKey then
if not existedParentKey or existedParentKey == currentParentKey then
updateExistingJobsParent(currentParentKey, currentParent, parentData,
parentDependenciesKey, completedKey, jobKey,
jobId, timestamp)
Expand Down
72 changes: 72 additions & 0 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1671,6 +1671,78 @@ describe('flows', () => {
await flow.close();
});
});

describe('when child already existed and it is re-added with same parentId', async () => {
it('moves parent to wait if child is already completed', async () => {
const worker = new Worker(
queueName,
async () => {
await new Promise(s => {
setTimeout(s, 250);
});
},
{
connection,
prefix,
},
);

const completing = new Promise<void>(resolve => {
worker.on('completed', (job: Job) => {
if (job.id === 'tue') {
resolve();
}
});
});

const flow = new FlowProducer({ connection, prefix });

await flow.add({
queueName,
name: 'tue',
opts: {
jobId: 'tue',
removeOnComplete: true,
},
children: [
{
name: 'mon',
queueName,
opts: {
jobId: 'mon',
},
},
],
});

await completing;

const tree = await flow.add({
queueName,
name: 'tue',
opts: {
jobId: 'tue',
},
children: [
{
name: 'mon',
queueName,
opts: {
jobId: 'mon',
},
},
],
});

await delay(1000);
const state = await tree.job.getState();

expect(state).to.be.equal('completed');

await worker.close();
await flow.close();
});
});
});

describe('when custom prefix is set in flow producer', async () => {
Expand Down

0 comments on commit 00cd017

Please sign in to comment.