diff --git a/queue_job/README.rst b/queue_job/README.rst index 4f4a36acb9..7487afdb8d 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -241,6 +241,38 @@ Note: ``delay()`` must be called on the delayable, chain, or group which is at t of the graph. In the example above, if it was called on ``group_a``, then ``group_b`` would never be delayed (but a warning would be shown). +It is also possible to split a job into several jobs, each one processing a part of the +work. This can be useful to avoid very long jobs, parallelize some task and get more specific +errors. Usage is as follows: + +.. code-block:: python + + def button_split_delayable(self): + ( + self # Can be a big recordset, let's say 1000 records + .delayable() + .generate_thumbnail((50, 50)) + .set(priority=30) + .set(description=_("generate xxx")) + .split(50) # Split the job in 20 jobs of 50 records each + .delay() + ) + +The ``split()`` method takes a ``chain`` boolean keyword argument. If set to +True, the jobs will be chained, meaning that the next job will only start when the previous +one is done: + +.. code-block:: python + + def button_increment_var(self): + ( + self + .delayable() + .increment_counter() + .split(1, chain=True) # Will exceute the jobs one after the other + .delay() + ) + Enqueing Job Options -------------------- diff --git a/queue_job/delay.py b/queue_job/delay.py index f8eb82c155..d188542ffe 100644 --- a/queue_job/delay.py +++ b/queue_job/delay.py @@ -526,6 +526,52 @@ def delay(self): """Delay the whole graph""" self._graph.delay() + def split(self, size, chain=False): + """Split the Delayables. + + Use `DelayableGroup` or `DelayableChain` + if `chain` is True containing batches of size `size` + """ + if not self._job_method: + raise ValueError("No method set on the Delayable") + + total_records = len(self.recordset) + + delayables = [] + for index in range(0, total_records, size): + recordset = self.recordset[index : index + size] + delayable = Delayable( + recordset, + priority=self.priority, + eta=self.eta, + max_retries=self.max_retries, + description=self.description, + channel=self.channel, + identity_key=self.identity_key, + ) + # Update the __self__ + delayable._job_method = getattr(recordset, self._job_method.__name__) + delayable._job_args = self._job_args + delayable._job_kwargs = self._job_kwargs + + delayables.append(delayable) + + description = self.description or ( + self._job_method.__doc__.splitlines()[0].strip() + if self._job_method.__doc__ + else "{}.{}".format(self.recordset._name, self._job_method.__name__) + ) + for index, delayable in enumerate(delayables): + delayable.set( + description="%s (split %s/%s)" + % (description, index + 1, len(delayables)) + ) + + # Prevent warning on deletion + self._generated_job = True + + return (DelayableChain if chain else DelayableGroup)(*delayables) + def _build_job(self): if self._generated_job: return self._generated_job diff --git a/queue_job/readme/USAGE.rst b/queue_job/readme/USAGE.rst index 8ff22b0bdb..b7614d2ab4 100644 --- a/queue_job/readme/USAGE.rst +++ b/queue_job/readme/USAGE.rst @@ -104,6 +104,38 @@ Note: ``delay()`` must be called on the delayable, chain, or group which is at t of the graph. In the example above, if it was called on ``group_a``, then ``group_b`` would never be delayed (but a warning would be shown). +It is also possible to split a job into several jobs, each one processing a part of the +work. This can be useful to avoid very long jobs, parallelize some task and get more specific +errors. Usage is as follows: + +.. code-block:: python + + def button_split_delayable(self): + ( + self # Can be a big recordset, let's say 1000 records + .delayable() + .generate_thumbnail((50, 50)) + .set(priority=30) + .set(description=_("generate xxx")) + .split(50) # Split the job in 20 jobs of 50 records each + .delay() + ) + +The ``split()`` method takes a ``chain`` boolean keyword argument. If set to +True, the jobs will be chained, meaning that the next job will only start when the previous +one is done: + +.. code-block:: python + + def button_increment_var(self): + ( + self + .delayable() + .increment_counter() + .split(1, chain=True) # Will exceute the jobs one after the other + .delay() + ) + Enqueing Job Options -------------------- diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index 281066bf31..747497e0a6 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -584,6 +584,34 @@

Delaying jobs

Note: delay() must be called on the delayable, chain, or group which is at the top of the graph. In the example above, if it was called on group_a, then group_b would never be delayed (but a warning would be shown).

+

It is also possible to split a job into several jobs, each one processing a part of the +work. This can be useful to avoid very long jobs, parallelize some task and get more specific +errors. Usage is as follows:

+
+def button_split_delayable(self):
+    (
+        self  # Can be a big recordset, let's say 1000 records
+        .delayable()
+        .generate_thumbnail((50, 50))
+        .set(priority=30)
+        .set(description=_("generate xxx"))
+        .split(50)  # Split the job in 20 jobs of 50 records each
+        .delay()
+    )
+
+

The split() method takes a chain boolean keyword argument. If set to +True, the jobs will be chained, meaning that the next job will only start when the previous +one is done:

+
+def button_increment_var(self):
+    (
+        self
+        .delayable()
+        .increment_counter()
+        .split(1, chain=True) # Will exceute the jobs one after the other
+        .delay()
+    )
+

Enqueing Job Options

diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py index e0ff9576a5..db53ac3a60 100644 --- a/queue_job/tests/__init__.py +++ b/queue_job/tests/__init__.py @@ -1,6 +1,7 @@ from . import test_runner_channels from . import test_runner_runner from . import test_delayable +from . import test_delayable_split from . import test_json_field from . import test_model_job_channel from . import test_model_job_function diff --git a/queue_job/tests/test_delayable.py b/queue_job/tests/test_delayable.py index c7295ea2b1..814e204a25 100644 --- a/queue_job/tests/test_delayable.py +++ b/queue_job/tests/test_delayable.py @@ -1,15 +1,15 @@ # copyright 2019 Camptocamp # license agpl-3.0 or later (http://www.gnu.org/licenses/agpl.html) -import unittest - import mock +from odoo.tests import common + # pylint: disable=odoo-addons-relative-import from odoo.addons.queue_job.delay import Delayable, DelayableGraph -class TestDelayable(unittest.TestCase): +class TestDelayable(common.BaseCase): def setUp(self): super().setUp() self.recordset = mock.MagicMock(name="recordset") diff --git a/queue_job/tests/test_delayable_split.py b/queue_job/tests/test_delayable_split.py new file mode 100644 index 0000000000..b761878b2e --- /dev/null +++ b/queue_job/tests/test_delayable_split.py @@ -0,0 +1,94 @@ +# Copyright 2024 Akretion (http://www.akretion.com). +# @author Florian Mounier +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). + +from odoo.tests import common + +# pylint: disable=odoo-addons-relative-import +from odoo.addons.queue_job.delay import Delayable + + +class TestDelayableSplit(common.BaseCase): + def setUp(self): + super().setUp() + + class FakeRecordSet(list): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._name = "recordset" + + def __getitem__(self, key): + if isinstance(key, slice): + return FakeRecordSet(super().__getitem__(key)) + return super().__getitem__(key) + + def method(self, arg, kwarg=None): + """Method to be called""" + return arg, kwarg + + self.FakeRecordSet = FakeRecordSet + + def test_delayable_split_no_method_call_beforehand(self): + dl = Delayable(self.FakeRecordSet(range(20))) + with self.assertRaises(ValueError): + dl.split(3) + + def test_delayable_split_10_3(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(3) + self.assertEqual(len(group._delayables), 4) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2])) + self.assertEqual(delayables[1].recordset, self.FakeRecordSet([3, 4, 5])) + self.assertEqual(delayables[2].recordset, self.FakeRecordSet([6, 7, 8])) + self.assertEqual(delayables[3].recordset, self.FakeRecordSet([9])) + self.assertEqual(delayables[0].description, "Method to be called (split 1/4)") + self.assertEqual(delayables[1].description, "Method to be called (split 2/4)") + self.assertEqual(delayables[2].description, "Method to be called (split 3/4)") + self.assertEqual(delayables[3].description, "Method to be called (split 4/4)") + self.assertNotEqual(delayables[0]._job_method, dl._job_method) + self.assertNotEqual(delayables[1]._job_method, dl._job_method) + self.assertNotEqual(delayables[2]._job_method, dl._job_method) + self.assertNotEqual(delayables[3]._job_method, dl._job_method) + self.assertEqual(delayables[0]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[1]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[2]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[3]._job_method.__name__, dl._job_method.__name__) + self.assertEqual(delayables[0]._job_args, ("arg",)) + self.assertEqual(delayables[1]._job_args, ("arg",)) + self.assertEqual(delayables[2]._job_args, ("arg",)) + self.assertEqual(delayables[3]._job_args, ("arg",)) + self.assertEqual(delayables[0]._job_kwargs, {"kwarg": "kwarg"}) + self.assertEqual(delayables[1]._job_kwargs, {"kwarg": "kwarg"}) + self.assertEqual(delayables[2]._job_kwargs, {"kwarg": "kwarg"}) + self.assertEqual(delayables[3]._job_kwargs, {"kwarg": "kwarg"}) + + def test_delayable_split_10_5(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(5) + self.assertEqual(len(group._delayables), 2) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet([0, 1, 2, 3, 4])) + self.assertEqual(delayables[1].recordset, self.FakeRecordSet([5, 6, 7, 8, 9])) + self.assertEqual(delayables[0].description, "Method to be called (split 1/2)") + self.assertEqual(delayables[1].description, "Method to be called (split 2/2)") + + def test_delayable_split_10_10(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(10) + self.assertEqual(len(group._delayables), 1) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10))) + self.assertEqual(delayables[0].description, "Method to be called (split 1/1)") + + def test_delayable_split_10_20(self): + dl = Delayable(self.FakeRecordSet(range(10))) + dl.method("arg", kwarg="kwarg") + group = dl.split(20) + self.assertEqual(len(group._delayables), 1) + delayables = sorted(list(group._delayables), key=lambda x: x.description) + self.assertEqual(delayables[0].recordset, self.FakeRecordSet(range(10))) + self.assertEqual(delayables[0].description, "Method to be called (split 1/1)")