turbo_broccoli.parallel

Guarded parallel calls

Usage

It works in a way that is similar to joblib.Parallel, for example

from math import sqrt
import turbo_broccoli as tb

# Note the use of `tb.delayed` instead of `joblib.delayed`.
#          ↓
jobs = [tb.delayed(sqrt)(i**2) for i in range(5)]
executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2)
results = executor(jobs)

gives

{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0}

however, only the calls for which the corresponding entry in out/foo.json does not exist will actually be executed, the others will simply be loaded. Note that unlike joblib.Parallel, the result is a dict of arg/result, not just a list of results.

If the function in the jobs take more than one argument, simply drop the only_one_arg=True argument:

from math import sqrt
import turbo_broccoli as tb

f = lambda a, b: a * b

jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)]
executor = tb.Parallel("foo/bar.json", n_jobs=2)
results = executor(jobs)

gives

{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16}

Notes & caveats

  • The result of executor(jobs) is a dict or a generator of key/value pairs.

  • The order of the results is guaranteed to be consistent with the order of the jobs.

  • The argument(s) in the jobs must be hashable. Furthermore, if a job has kwargs, a ValueError is raised.

  • Every job must have a different tuple of argument, or in other words, every job must be unique. So something like this is not acceptable:

    f = lambda a: 2 * a
    
    jobs = [tb.delayed(f)(i % 2) for i in range(5)]
    executor = tb.Parallel(...)
    results = executor(jobs)
    

    because f is in effect called multiple times with value 0. In particular, TurboBroccoli's Parallel is not suited for functions with no arguments (unless if they are executed only once but that kind of defeats the idea of parallelism).

  • Beyond the arguments documented in Parallel, joblib.Parallel arguments can be passed as kwargs to the constructor.

  • TurboBroccoli's Parallel does not honor the return_as argument of joblib.Parallel. The return value of Parallel.__call__ is always a dict.

  • If the output of the job's function are large, it might be inefficient to rewrite every past results the output file each time a new result is generated. Using embedded objects, can make writes faster.

    def f(i):
        ...
        return tb.EmbeddedDict({"result": something_big, ...})
    
    jobs = [tb.delayed(f)(i) for i in range(1000)]
    executor = tb.Parallel(...)
    results = executor(jobs)
    
  1"""
  2Guarded parallel calls
  3
  4## Usage
  5
  6It works in a way that is similar to
  7[`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html),
  8for example
  9
 10```py
 11from math import sqrt
 12import turbo_broccoli as tb
 13
 14# Note the use of `tb.delayed` instead of `joblib.delayed`.
 15#          ↓
 16jobs = [tb.delayed(sqrt)(i**2) for i in range(5)]
 17executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2)
 18results = executor(jobs)
 19```
 20
 21gives
 22
 23```py
 24{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0}
 25```
 26
 27however, only the calls for which the corresponding entry in `out/foo.json`
 28does not exist will actually be executed, the others will simply be loaded.
 29Note that unlike `joblib.Parallel`, the result is a `dict` of arg/result, not
 30just a `list` of results.
 31
 32If the function in the jobs take more than one argument, simply drop the
 33`only_one_arg=True` argument:
 34
 35```py
 36from math import sqrt
 37import turbo_broccoli as tb
 38
 39f = lambda a, b: a * b
 40
 41jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)]
 42executor = tb.Parallel("foo/bar.json", n_jobs=2)
 43results = executor(jobs)
 44```
 45
 46gives
 47
 48```py
 49{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16}
 50```
 51
 52## Notes & caveats
 53
 54* The result of `executor(jobs)` is a dict or a generator of key/value pairs.
 55
 56* The order of the results is guaranteed to be consistent with the order of the
 57  jobs.
 58
 59* The argument(s) in the jobs must be hashable. Furthermore, if a job has
 60  kwargs, a `ValueError` is raised.
 61
 62* Every job must have a different tuple of argument, or in other words, every
 63  job must be unique. So something like this is not acceptable:
 64    ```py
 65    f = lambda a: 2 * a
 66
 67    jobs = [tb.delayed(f)(i % 2) for i in range(5)]
 68    executor = tb.Parallel(...)
 69    results = executor(jobs)
 70    ```
 71    because `f` is in effect called multiple times with value `0`. In
 72    particular, TurboBroccoli's `Parallel` is not suited for functions with no
 73    arguments (unless if they are executed only once but that kind of defeats
 74    the idea of parallelism).
 75
 76* Beyond the arguments documented in `Parallel`, [`joblib.Parallel`
 77  arguments](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
 78  can be passed as kwargs to the constructor.
 79
 80* TurboBroccoli's `Parallel` *does not* honor the `return_as` argument of
 81  [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html).
 82  The return value of `Parallel.__call__` is always a `dict`.
 83
 84* If the output of the job's function are large, it might be inefficient to
 85  rewrite every past results the output file each time a new result is
 86  generated. Using [embedded
 87  objects](https://altaris.github.io/turbo-broccoli/turbo_broccoli/custom/embedded.html),
 88  can make writes faster.
 89
 90    ```py
 91    def f(i):
 92        ...
 93        return tb.EmbeddedDict({"result": something_big, ...})
 94
 95    jobs = [tb.delayed(f)(i) for i in range(1000)]
 96    executor = tb.Parallel(...)
 97    results = executor(jobs)
 98    ```
 99"""
100
101from itertools import combinations
102from pathlib import Path
103from typing import Any, Callable, Generator, Iterable
104
105import joblib
106
107try:
108    from loguru import logger as logging
109except ModuleNotFoundError:
110    import logging  # type: ignore
111
112from .context import Context
113from .turbo_broccoli import load_json, save_json
114
115
116class _DelayedCall:
117
118    function: Callable
119    args: tuple[Any, ...]
120
121    def __call__(self, *args: Any, **kwds: Any) -> "_DelayedCall":
122        if kwds:
123            raise ValueError("Keyword arguments are not supported")
124        self.args = args
125        return self
126
127    def __init__(self, function: Callable) -> None:
128        self.function = function
129
130    def to_joblib_delayed(self) -> Callable:
131        """
132        Returns a `joblib.delayed` object that can be used with
133        `joblib.Parallel`.
134        """
135        return joblib.delayed(self.function)(*self.args)
136
137
138class Parallel:
139    """
140    Guarded analogue to
141    [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html).
142    See module documentation.
143    """
144
145    context: Context
146    executor: joblib.Parallel
147    only_one_arg: bool
148
149    def __init__(
150        self,
151        output_file: str | Path,
152        context: Context | None = None,
153        only_one_arg: bool = False,
154        **kwargs: Any,
155    ) -> None:
156        """
157        Args:
158            output_file (str | Path):
159            context (Context | None, optional):
160            only_one_arg (bool, optional): If `True`, assumes that every job
161                has exactly one argument. This produces more compact output
162                files.
163            kwargs (Any): Forwarded to
164                [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
165        """
166        self.context = context or Context(output_file)
167        self.executor = joblib.Parallel(**kwargs)
168        self.only_one_arg = only_one_arg
169
170    def __call__(self, jobs: Iterable[_DelayedCall]) -> dict:
171        jobs = list(jobs)
172        self.sanity_check(jobs)
173        return dict(self._execute(jobs))
174
175    # pylint: disable=stop-iteration-return
176    def _execute(
177        self, jobs: Iterable[_DelayedCall]
178    ) -> Generator[tuple[Any, Any], None, None]:
179        """
180        Executes the jobs in parallel and yields the results. Saves to the
181        output file each time a new result (i.e. one that was not already
182        present in the output file) is obtained.
183
184        Args:
185            jobs (Iterable[_DelayedCall]): All the jobs, including those whose
186                results are already in the output file (and therefore shall not
187                be run again)
188        """
189
190        def _key(j: _DelayedCall) -> Any:
191            """What the key of a job should be in the result dict"""
192            return j.args[0] if self.only_one_arg else tuple(j.args)
193
194        job_status = {
195            _key(j): {"job": j, "done": False, "result": None} for j in jobs
196        }
197
198        # Check if some jobs already have their results in the output file
199        assert self.context.file_path is not None  # for typechecking
200        if self.context.file_path.exists():
201            results = load_json(self.context.file_path, self.context)
202            if not isinstance(results, dict):
203                raise RuntimeError(
204                    f"The contents of '{self.context.file_path}' is not a dict"
205                )
206            # Mark the jobs that are already done
207            for k, r in results.items():
208                if k in job_status:
209                    job_status[k]["done"], job_status[k]["result"] = True, r
210        else:
211            results = {}
212
213        new_results_it = iter(
214            self.executor(
215                d["job"].to_joblib_delayed()  # type: ignore
216                for d in job_status.values()
217                if not d["done"]
218            )
219        )
220        # This loops strongly assumes that `jobs`, `loaded_results`,
221        # `job_status`, and `new_results` are ordered in a consistent way
222        for k, s in job_status.items():
223            if s["done"]:
224                yield k, s["result"]
225            else:
226                results[k] = next(new_results_it)
227                save_json(results, self.context.file_path, self.context)
228                yield k, results[k]
229
230        # At this point, new_results should have been fully consumed
231        try:
232            next(new_results_it)
233            raise RuntimeError("The executor returned too many results")
234        except StopIteration:
235            pass
236
237    def sanity_check(self, jobs: list[_DelayedCall]) -> None:
238        """
239        Performs various sanity checks on a list of jobs.
240        """
241        if self.only_one_arg:
242            for i, j in enumerate(jobs):
243                if len(j.args) != 1:
244                    raise ValueError(
245                        f"The only_one_arg option is set to True but job {i} "
246                        f"has {len(j.args)} arguments: {j.args}"
247                    )
248        for i1, i2 in combinations(range(len(jobs)), 2):
249            if jobs[i1].args == jobs[i2].args:
250                raise ValueError(
251                    f"Jobs {i1} and {i2} have the same arguments: "
252                    f"{jobs[i1].args}"
253                )
254
255
256def delayed(function: Callable) -> Callable:
257    """Use this like `joblib.delayed`"""
258    return _DelayedCall(function)
class Parallel:
139class Parallel:
140    """
141    Guarded analogue to
142    [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html).
143    See module documentation.
144    """
145
146    context: Context
147    executor: joblib.Parallel
148    only_one_arg: bool
149
150    def __init__(
151        self,
152        output_file: str | Path,
153        context: Context | None = None,
154        only_one_arg: bool = False,
155        **kwargs: Any,
156    ) -> None:
157        """
158        Args:
159            output_file (str | Path):
160            context (Context | None, optional):
161            only_one_arg (bool, optional): If `True`, assumes that every job
162                has exactly one argument. This produces more compact output
163                files.
164            kwargs (Any): Forwarded to
165                [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
166        """
167        self.context = context or Context(output_file)
168        self.executor = joblib.Parallel(**kwargs)
169        self.only_one_arg = only_one_arg
170
171    def __call__(self, jobs: Iterable[_DelayedCall]) -> dict:
172        jobs = list(jobs)
173        self.sanity_check(jobs)
174        return dict(self._execute(jobs))
175
176    # pylint: disable=stop-iteration-return
177    def _execute(
178        self, jobs: Iterable[_DelayedCall]
179    ) -> Generator[tuple[Any, Any], None, None]:
180        """
181        Executes the jobs in parallel and yields the results. Saves to the
182        output file each time a new result (i.e. one that was not already
183        present in the output file) is obtained.
184
185        Args:
186            jobs (Iterable[_DelayedCall]): All the jobs, including those whose
187                results are already in the output file (and therefore shall not
188                be run again)
189        """
190
191        def _key(j: _DelayedCall) -> Any:
192            """What the key of a job should be in the result dict"""
193            return j.args[0] if self.only_one_arg else tuple(j.args)
194
195        job_status = {
196            _key(j): {"job": j, "done": False, "result": None} for j in jobs
197        }
198
199        # Check if some jobs already have their results in the output file
200        assert self.context.file_path is not None  # for typechecking
201        if self.context.file_path.exists():
202            results = load_json(self.context.file_path, self.context)
203            if not isinstance(results, dict):
204                raise RuntimeError(
205                    f"The contents of '{self.context.file_path}' is not a dict"
206                )
207            # Mark the jobs that are already done
208            for k, r in results.items():
209                if k in job_status:
210                    job_status[k]["done"], job_status[k]["result"] = True, r
211        else:
212            results = {}
213
214        new_results_it = iter(
215            self.executor(
216                d["job"].to_joblib_delayed()  # type: ignore
217                for d in job_status.values()
218                if not d["done"]
219            )
220        )
221        # This loops strongly assumes that `jobs`, `loaded_results`,
222        # `job_status`, and `new_results` are ordered in a consistent way
223        for k, s in job_status.items():
224            if s["done"]:
225                yield k, s["result"]
226            else:
227                results[k] = next(new_results_it)
228                save_json(results, self.context.file_path, self.context)
229                yield k, results[k]
230
231        # At this point, new_results should have been fully consumed
232        try:
233            next(new_results_it)
234            raise RuntimeError("The executor returned too many results")
235        except StopIteration:
236            pass
237
238    def sanity_check(self, jobs: list[_DelayedCall]) -> None:
239        """
240        Performs various sanity checks on a list of jobs.
241        """
242        if self.only_one_arg:
243            for i, j in enumerate(jobs):
244                if len(j.args) != 1:
245                    raise ValueError(
246                        f"The only_one_arg option is set to True but job {i} "
247                        f"has {len(j.args)} arguments: {j.args}"
248                    )
249        for i1, i2 in combinations(range(len(jobs)), 2):
250            if jobs[i1].args == jobs[i2].args:
251                raise ValueError(
252                    f"Jobs {i1} and {i2} have the same arguments: "
253                    f"{jobs[i1].args}"
254                )

Guarded analogue to joblib.Parallel. See module documentation.

Parallel( output_file: str | pathlib.Path, context: turbo_broccoli.context.Context | None = None, only_one_arg: bool = False, **kwargs: Any)
150    def __init__(
151        self,
152        output_file: str | Path,
153        context: Context | None = None,
154        only_one_arg: bool = False,
155        **kwargs: Any,
156    ) -> None:
157        """
158        Args:
159            output_file (str | Path):
160            context (Context | None, optional):
161            only_one_arg (bool, optional): If `True`, assumes that every job
162                has exactly one argument. This produces more compact output
163                files.
164            kwargs (Any): Forwarded to
165                [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
166        """
167        self.context = context or Context(output_file)
168        self.executor = joblib.Parallel(**kwargs)
169        self.only_one_arg = only_one_arg

Args: output_file (str | Path): context (Context | None, optional): only_one_arg (bool, optional): If True, assumes that every job has exactly one argument. This produces more compact output files. kwargs (Any): Forwarded to joblib.Parallel

executor: joblib.parallel.Parallel
only_one_arg: bool
def sanity_check(self, jobs: list[turbo_broccoli.parallel._DelayedCall]) -> None:
238    def sanity_check(self, jobs: list[_DelayedCall]) -> None:
239        """
240        Performs various sanity checks on a list of jobs.
241        """
242        if self.only_one_arg:
243            for i, j in enumerate(jobs):
244                if len(j.args) != 1:
245                    raise ValueError(
246                        f"The only_one_arg option is set to True but job {i} "
247                        f"has {len(j.args)} arguments: {j.args}"
248                    )
249        for i1, i2 in combinations(range(len(jobs)), 2):
250            if jobs[i1].args == jobs[i2].args:
251                raise ValueError(
252                    f"Jobs {i1} and {i2} have the same arguments: "
253                    f"{jobs[i1].args}"
254                )

Performs various sanity checks on a list of jobs.

def delayed(function: Callable) -> Callable:
257def delayed(function: Callable) -> Callable:
258    """Use this like `joblib.delayed`"""
259    return _DelayedCall(function)

Use this like joblib.delayed