turbo_broccoli.parallel

Guarded parallel calls

Usage

It works in a way that is similar to joblib.Parallel, for example

from math import sqrt
import turbo_broccoli as tb

# Note the use of `tb.delayed` instead of `joblib.delayed`.
#          ↓
jobs = [tb.delayed(sqrt)(i**2) for i in range(5)]
executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2)
results = executor(jobs)

gives

{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0}

however, only the calls for which the corresponding entry in out/foo.json does not exist will actually be executed, the others will simply be loaded. Note that unlike joblib.Parallel, the result is a dict of arg/result, not just a list of results.

If the function in the jobs take more than one argument, simply drop the only_one_arg=True argument:

from math import sqrt
import turbo_broccoli as tb

f = lambda a, b: a * b

jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)]
executor = tb.Parallel("foo/bar.json", n_jobs=2)
results = executor(jobs)

gives

{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16}

Notes & caveats

  • The result of executor(jobs) is a dict or a generator of key/value pairs.

  • The order of the results is guaranteed to be consistent with the order of the jobs.

  • The argument(s) in the jobs must be hashable. Furthermore, if a job has kwargs, a ValueError is raised.

  • Every job must have a different tuple of argument, or in other words, every job must be unique. So something like this is not acceptable:

    f = lambda a: 2 * a
    
    jobs = [tb.delayed(f)(i % 2) for i in range(5)]
    executor = tb.Parallel(...)
    results = executor(jobs)
    

    because f is in effect called multiple times with value 0. In particular, TurboBroccoli's Parallel is not suited for functions with no arguments (unless if they are executed only once but that kind of defeats the idea of parallelism).

  • Beyond the arguments documented in Parallel, joblib.Parallel arguments can be passed as kwargs to the constructor.

  • TurboBroccoli's Parallel does not honor the return_as argument of joblib.Parallel. The return value of Parallel.__call__ is always a dict.

  • If the output of the job's function are large, it might be inefficient to rewrite every past results the output file each time a new result is generated. Using embedded objects, can make writes faster.

    def f(i):
        ...
        return tb.EmbeddedDict({"result": something_big, ...})
    
    jobs = [tb.delayed(f)(i) for i in range(1000)]
    executor = tb.Parallel(...)
    results = executor(jobs)
    

  1"""
  2Guarded parallel calls
  3
  4## Usage
  5
  6It works in a way that is similar to
  7[`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html),
  8for example
  9
 10```py
 11from math import sqrt
 12import turbo_broccoli as tb
 13
 14# Note the use of `tb.delayed` instead of `joblib.delayed`.
 15#          ↓
 16jobs = [tb.delayed(sqrt)(i**2) for i in range(5)]
 17executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2)
 18results = executor(jobs)
 19```
 20
 21gives
 22
 23```py
 24{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0}
 25```
 26
 27however, only the calls for which the corresponding entry in `out/foo.json`
 28does not exist will actually be executed, the others will simply be loaded.
 29Note that unlike `joblib.Parallel`, the result is a `dict` of arg/result, not
 30just a `list` of results.
 31
 32If the function in the jobs take more than one argument, simply drop the
 33`only_one_arg=True` argument:
 34
 35```py
 36from math import sqrt
 37import turbo_broccoli as tb
 38
 39f = lambda a, b: a * b
 40
 41jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)]
 42executor = tb.Parallel("foo/bar.json", n_jobs=2)
 43results = executor(jobs)
 44```
 45
 46gives
 47
 48```py
 49{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16}
 50```
 51
 52## Notes & caveats
 53
 54* The result of `executor(jobs)` is a dict or a generator of key/value pairs.
 55
 56* The order of the results is guaranteed to be consistent with the order of the
 57  jobs.
 58
 59* The argument(s) in the jobs must be hashable. Furthermore, if a job has
 60  kwargs, a `ValueError` is raised.
 61
 62* Every job must have a different tuple of argument, or in other words, every
 63  job must be unique. So something like this is not acceptable:
 64    ```py
 65    f = lambda a: 2 * a
 66
 67    jobs = [tb.delayed(f)(i % 2) for i in range(5)]
 68    executor = tb.Parallel(...)
 69    results = executor(jobs)
 70    ```
 71    because `f` is in effect called multiple times with value `0`. In
 72    particular, TurboBroccoli's `Parallel` is not suited for functions with no
 73    arguments (unless if they are executed only once but that kind of defeats
 74    the idea of parallelism).
 75
 76* Beyond the arguments documented in `Parallel`, [`joblib.Parallel`
 77  arguments](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
 78  can be passed as kwargs to the constructor.
 79
 80* TurboBroccoli's `Parallel` *does not* honor the `return_as` argument of
 81  [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html).
 82  The return value of `Parallel.__call__` is always a `dict`.
 83
 84* If the output of the job's function are large, it might be inefficient to
 85  rewrite every past results the output file each time a new result is
 86  generated. Using [embedded
 87  objects](https://altaris.github.io/turbo-broccoli/turbo_broccoli/custom/embedded.html),
 88  can make writes faster.
 89
 90    ```py
 91    def f(i):
 92        ...
 93        return tb.EmbeddedDict({"result": something_big, ...})
 94
 95    jobs = [tb.delayed(f)(i) for i in range(1000)]
 96    executor = tb.Parallel(...)
 97    results = executor(jobs)
 98    ```
 99"""
100
101from itertools import combinations
102from pathlib import Path
103from typing import Any, Callable, Generator, Iterable
104
105import joblib
106
107try:
108    from loguru import logger as logging
109except ModuleNotFoundError:
110    import logging  # type: ignore
111
112from .context import Context
113from .turbo_broccoli import load_json, save_json
114
115
116class _DelayedCall:
117    function: Callable
118    args: tuple[Any, ...]
119
120    def __call__(self, *args: Any, **kwds: Any) -> "_DelayedCall":
121        if kwds:
122            raise ValueError("Keyword arguments are not supported")
123        self.args = args
124        return self
125
126    def __init__(self, function: Callable) -> None:
127        self.function = function
128
129    def to_joblib_delayed(self) -> Callable:
130        """
131        Returns a `joblib.delayed` object that can be used with
132        `joblib.Parallel`.
133        """
134        return joblib.delayed(self.function)(*self.args)
135
136
137class Parallel:
138    """
139    Guarded analogue to
140    [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html).
141    See module documentation.
142    """
143
144    context: Context
145    executor: joblib.Parallel
146    only_one_arg: bool
147
148    def __init__(
149        self,
150        output_file: str | Path,
151        context: Context | None = None,
152        only_one_arg: bool = False,
153        **kwargs: Any,
154    ) -> None:
155        """
156        Args:
157            output_file (str | Path):
158            context (Context | None, optional):
159            only_one_arg (bool, optional): If `True`, assumes that every job
160                has exactly one argument. This produces more compact output
161                files.
162            kwargs (Any): Forwarded to
163                [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
164        """
165        self.context = context or Context(output_file)
166        self.executor = joblib.Parallel(**kwargs)
167        self.only_one_arg = only_one_arg
168
169    def __call__(self, jobs: Iterable[_DelayedCall]) -> dict:
170        jobs = list(jobs)
171        self.sanity_check(jobs)
172        return dict(self._execute(jobs))
173
174    def _execute(
175        self, jobs: Iterable[_DelayedCall]
176    ) -> Generator[tuple[Any, Any], None, None]:
177        """
178        Executes the jobs in parallel and yields the results. Saves to the
179        output file each time a new result (i.e. one that was not already
180        present in the output file) is obtained.
181
182        Args:
183            jobs (Iterable[_DelayedCall]): All the jobs, including those whose
184                results are already in the output file (and therefore shall not
185                be run again)
186        """
187
188        def _key(j: _DelayedCall) -> Any:
189            """What the key of a job should be in the result dict"""
190            return j.args[0] if self.only_one_arg else tuple(j.args)
191
192        job_status = {
193            _key(j): {"job": j, "done": False, "result": None} for j in jobs
194        }
195
196        # Check if some jobs already have their results in the output file
197        assert self.context.file_path is not None  # for typechecking
198        if self.context.file_path.exists():
199            results = load_json(self.context.file_path, self.context)
200            if not isinstance(results, dict):
201                raise RuntimeError(
202                    f"The contents of '{self.context.file_path}' is not a dict"
203                )
204            # Mark the jobs that are already done
205            for k, r in results.items():
206                if k in job_status:
207                    job_status[k]["done"], job_status[k]["result"] = True, r
208        else:
209            results = {}
210
211        new_results_it = iter(
212            self.executor(
213                d["job"].to_joblib_delayed()  # type: ignore
214                for d in job_status.values()
215                if not d["done"]
216            )
217        )
218        # This loops strongly assumes that `jobs`, `loaded_results`,
219        # `job_status`, and `new_results` are ordered in a consistent way
220        for k, s in job_status.items():
221            if s["done"]:
222                yield k, s["result"]
223            else:
224                results[k] = next(new_results_it)
225                save_json(results, self.context.file_path, self.context)
226                yield k, results[k]
227
228        # At this point, new_results should have been fully consumed
229        try:
230            next(new_results_it)
231            raise RuntimeError("The executor returned too many results")
232        except StopIteration:
233            pass
234
235    def sanity_check(self, jobs: list[_DelayedCall]) -> None:
236        """
237        Performs various sanity checks on a list of jobs.
238        """
239        if self.only_one_arg:
240            for i, j in enumerate(jobs):
241                if len(j.args) != 1:
242                    raise ValueError(
243                        f"The only_one_arg option is set to True but job {i} "
244                        f"has {len(j.args)} arguments: {j.args}"
245                    )
246        for i1, i2 in combinations(range(len(jobs)), 2):
247            if jobs[i1].args == jobs[i2].args:
248                raise ValueError(
249                    f"Jobs {i1} and {i2} have the same arguments: "
250                    f"{jobs[i1].args}"
251                )
252
253
254def delayed(function: Callable) -> Callable:
255    """Use this like `joblib.delayed`"""
256    return _DelayedCall(function)
class Parallel:
138class Parallel:
139    """
140    Guarded analogue to
141    [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html).
142    See module documentation.
143    """
144
145    context: Context
146    executor: joblib.Parallel
147    only_one_arg: bool
148
149    def __init__(
150        self,
151        output_file: str | Path,
152        context: Context | None = None,
153        only_one_arg: bool = False,
154        **kwargs: Any,
155    ) -> None:
156        """
157        Args:
158            output_file (str | Path):
159            context (Context | None, optional):
160            only_one_arg (bool, optional): If `True`, assumes that every job
161                has exactly one argument. This produces more compact output
162                files.
163            kwargs (Any): Forwarded to
164                [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
165        """
166        self.context = context or Context(output_file)
167        self.executor = joblib.Parallel(**kwargs)
168        self.only_one_arg = only_one_arg
169
170    def __call__(self, jobs: Iterable[_DelayedCall]) -> dict:
171        jobs = list(jobs)
172        self.sanity_check(jobs)
173        return dict(self._execute(jobs))
174
175    def _execute(
176        self, jobs: Iterable[_DelayedCall]
177    ) -> Generator[tuple[Any, Any], None, None]:
178        """
179        Executes the jobs in parallel and yields the results. Saves to the
180        output file each time a new result (i.e. one that was not already
181        present in the output file) is obtained.
182
183        Args:
184            jobs (Iterable[_DelayedCall]): All the jobs, including those whose
185                results are already in the output file (and therefore shall not
186                be run again)
187        """
188
189        def _key(j: _DelayedCall) -> Any:
190            """What the key of a job should be in the result dict"""
191            return j.args[0] if self.only_one_arg else tuple(j.args)
192
193        job_status = {
194            _key(j): {"job": j, "done": False, "result": None} for j in jobs
195        }
196
197        # Check if some jobs already have their results in the output file
198        assert self.context.file_path is not None  # for typechecking
199        if self.context.file_path.exists():
200            results = load_json(self.context.file_path, self.context)
201            if not isinstance(results, dict):
202                raise RuntimeError(
203                    f"The contents of '{self.context.file_path}' is not a dict"
204                )
205            # Mark the jobs that are already done
206            for k, r in results.items():
207                if k in job_status:
208                    job_status[k]["done"], job_status[k]["result"] = True, r
209        else:
210            results = {}
211
212        new_results_it = iter(
213            self.executor(
214                d["job"].to_joblib_delayed()  # type: ignore
215                for d in job_status.values()
216                if not d["done"]
217            )
218        )
219        # This loops strongly assumes that `jobs`, `loaded_results`,
220        # `job_status`, and `new_results` are ordered in a consistent way
221        for k, s in job_status.items():
222            if s["done"]:
223                yield k, s["result"]
224            else:
225                results[k] = next(new_results_it)
226                save_json(results, self.context.file_path, self.context)
227                yield k, results[k]
228
229        # At this point, new_results should have been fully consumed
230        try:
231            next(new_results_it)
232            raise RuntimeError("The executor returned too many results")
233        except StopIteration:
234            pass
235
236    def sanity_check(self, jobs: list[_DelayedCall]) -> None:
237        """
238        Performs various sanity checks on a list of jobs.
239        """
240        if self.only_one_arg:
241            for i, j in enumerate(jobs):
242                if len(j.args) != 1:
243                    raise ValueError(
244                        f"The only_one_arg option is set to True but job {i} "
245                        f"has {len(j.args)} arguments: {j.args}"
246                    )
247        for i1, i2 in combinations(range(len(jobs)), 2):
248            if jobs[i1].args == jobs[i2].args:
249                raise ValueError(
250                    f"Jobs {i1} and {i2} have the same arguments: "
251                    f"{jobs[i1].args}"
252                )

Guarded analogue to joblib.Parallel. See module documentation.

Parallel( output_file: str | pathlib.Path, context: turbo_broccoli.context.Context | None = None, only_one_arg: bool = False, **kwargs: Any)
149    def __init__(
150        self,
151        output_file: str | Path,
152        context: Context | None = None,
153        only_one_arg: bool = False,
154        **kwargs: Any,
155    ) -> None:
156        """
157        Args:
158            output_file (str | Path):
159            context (Context | None, optional):
160            only_one_arg (bool, optional): If `True`, assumes that every job
161                has exactly one argument. This produces more compact output
162                files.
163            kwargs (Any): Forwarded to
164                [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html)
165        """
166        self.context = context or Context(output_file)
167        self.executor = joblib.Parallel(**kwargs)
168        self.only_one_arg = only_one_arg

Args: output_file (str | Path): context (Context | None, optional): only_one_arg (bool, optional): If True, assumes that every job has exactly one argument. This produces more compact output files. kwargs (Any): Forwarded to joblib.Parallel

executor: joblib.parallel.Parallel
only_one_arg: bool
def sanity_check(self, jobs: list[turbo_broccoli.parallel._DelayedCall]) -> None:
236    def sanity_check(self, jobs: list[_DelayedCall]) -> None:
237        """
238        Performs various sanity checks on a list of jobs.
239        """
240        if self.only_one_arg:
241            for i, j in enumerate(jobs):
242                if len(j.args) != 1:
243                    raise ValueError(
244                        f"The only_one_arg option is set to True but job {i} "
245                        f"has {len(j.args)} arguments: {j.args}"
246                    )
247        for i1, i2 in combinations(range(len(jobs)), 2):
248            if jobs[i1].args == jobs[i2].args:
249                raise ValueError(
250                    f"Jobs {i1} and {i2} have the same arguments: "
251                    f"{jobs[i1].args}"
252                )

Performs various sanity checks on a list of jobs.

def delayed(function: Callable) -> Callable:
255def delayed(function: Callable) -> Callable:
256    """Use this like `joblib.delayed`"""
257    return _DelayedCall(function)

Use this like joblib.delayed