turbo_broccoli.parallel
Guarded parallel calls
Usage
It works in a way that is similar to
joblib.Parallel
,
for example
from math import sqrt
import turbo_broccoli as tb
# Note the use of `tb.delayed` instead of `joblib.delayed`.
# ↓
jobs = [tb.delayed(sqrt)(i**2) for i in range(5)]
executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2)
results = executor(jobs)
gives
{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0}
however, only the calls for which the corresponding entry in out/foo.json
does not exist will actually be executed, the others will simply be loaded.
Note that unlike joblib.Parallel
, the result is a dict
of arg/result, not
just a list
of results.
If the function in the jobs take more than one argument, simply drop the
only_one_arg=True
argument:
from math import sqrt
import turbo_broccoli as tb
f = lambda a, b: a * b
jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)]
executor = tb.Parallel("foo/bar.json", n_jobs=2)
results = executor(jobs)
gives
{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16}
Notes & caveats
The result of
executor(jobs)
is a dict or a generator of key/value pairs.The order of the results is guaranteed to be consistent with the order of the jobs.
The argument(s) in the jobs must be hashable. Furthermore, if a job has kwargs, a
ValueError
is raised.Every job must have a different tuple of argument, or in other words, every job must be unique. So something like this is not acceptable:
f = lambda a: 2 * a jobs = [tb.delayed(f)(i % 2) for i in range(5)] executor = tb.Parallel(...) results = executor(jobs)
because
f
is in effect called multiple times with value0
. In particular, TurboBroccoli'sParallel
is not suited for functions with no arguments (unless if they are executed only once but that kind of defeats the idea of parallelism).Beyond the arguments documented in
Parallel
,joblib.Parallel
arguments can be passed as kwargs to the constructor.TurboBroccoli's
Parallel
does not honor thereturn_as
argument ofjoblib.Parallel
. The return value ofParallel.__call__
is always adict
.If the output of the job's function are large, it might be inefficient to rewrite every past results the output file each time a new result is generated. Using embedded objects, can make writes faster.
def f(i): ... return tb.EmbeddedDict({"result": something_big, ...}) jobs = [tb.delayed(f)(i) for i in range(1000)] executor = tb.Parallel(...) results = executor(jobs)
1""" 2Guarded parallel calls 3 4## Usage 5 6It works in a way that is similar to 7[`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html), 8for example 9 10```py 11from math import sqrt 12import turbo_broccoli as tb 13 14# Note the use of `tb.delayed` instead of `joblib.delayed`. 15# ↓ 16jobs = [tb.delayed(sqrt)(i**2) for i in range(5)] 17executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2) 18results = executor(jobs) 19``` 20 21gives 22 23```py 24{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0} 25``` 26 27however, only the calls for which the corresponding entry in `out/foo.json` 28does not exist will actually be executed, the others will simply be loaded. 29Note that unlike `joblib.Parallel`, the result is a `dict` of arg/result, not 30just a `list` of results. 31 32If the function in the jobs take more than one argument, simply drop the 33`only_one_arg=True` argument: 34 35```py 36from math import sqrt 37import turbo_broccoli as tb 38 39f = lambda a, b: a * b 40 41jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)] 42executor = tb.Parallel("foo/bar.json", n_jobs=2) 43results = executor(jobs) 44``` 45 46gives 47 48```py 49{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16} 50``` 51 52## Notes & caveats 53 54* The result of `executor(jobs)` is a dict or a generator of key/value pairs. 55 56* The order of the results is guaranteed to be consistent with the order of the 57 jobs. 58 59* The argument(s) in the jobs must be hashable. Furthermore, if a job has 60 kwargs, a `ValueError` is raised. 61 62* Every job must have a different tuple of argument, or in other words, every 63 job must be unique. So something like this is not acceptable: 64 ```py 65 f = lambda a: 2 * a 66 67 jobs = [tb.delayed(f)(i % 2) for i in range(5)] 68 executor = tb.Parallel(...) 69 results = executor(jobs) 70 ``` 71 because `f` is in effect called multiple times with value `0`. In 72 particular, TurboBroccoli's `Parallel` is not suited for functions with no 73 arguments (unless if they are executed only once but that kind of defeats 74 the idea of parallelism). 75 76* Beyond the arguments documented in `Parallel`, [`joblib.Parallel` 77 arguments](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 78 can be passed as kwargs to the constructor. 79 80* TurboBroccoli's `Parallel` *does not* honor the `return_as` argument of 81 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html). 82 The return value of `Parallel.__call__` is always a `dict`. 83 84* If the output of the job's function are large, it might be inefficient to 85 rewrite every past results the output file each time a new result is 86 generated. Using [embedded 87 objects](https://altaris.github.io/turbo-broccoli/turbo_broccoli/custom/embedded.html), 88 can make writes faster. 89 90 ```py 91 def f(i): 92 ... 93 return tb.EmbeddedDict({"result": something_big, ...}) 94 95 jobs = [tb.delayed(f)(i) for i in range(1000)] 96 executor = tb.Parallel(...) 97 results = executor(jobs) 98 ``` 99""" 100 101from itertools import combinations 102from pathlib import Path 103from typing import Any, Callable, Generator, Iterable 104 105import joblib 106 107try: 108 from loguru import logger as logging 109except ModuleNotFoundError: 110 import logging # type: ignore 111 112from .context import Context 113from .turbo_broccoli import load_json, save_json 114 115 116class _DelayedCall: 117 function: Callable 118 args: tuple[Any, ...] 119 120 def __call__(self, *args: Any, **kwds: Any) -> "_DelayedCall": 121 if kwds: 122 raise ValueError("Keyword arguments are not supported") 123 self.args = args 124 return self 125 126 def __init__(self, function: Callable) -> None: 127 self.function = function 128 129 def to_joblib_delayed(self) -> Callable: 130 """ 131 Returns a `joblib.delayed` object that can be used with 132 `joblib.Parallel`. 133 """ 134 return joblib.delayed(self.function)(*self.args) 135 136 137class Parallel: 138 """ 139 Guarded analogue to 140 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html). 141 See module documentation. 142 """ 143 144 context: Context 145 executor: joblib.Parallel 146 only_one_arg: bool 147 148 def __init__( 149 self, 150 output_file: str | Path, 151 context: Context | None = None, 152 only_one_arg: bool = False, 153 **kwargs: Any, 154 ) -> None: 155 """ 156 Args: 157 output_file (str | Path): 158 context (Context | None, optional): 159 only_one_arg (bool, optional): If `True`, assumes that every job 160 has exactly one argument. This produces more compact output 161 files. 162 kwargs (Any): Forwarded to 163 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 164 """ 165 self.context = context or Context(output_file) 166 self.executor = joblib.Parallel(**kwargs) 167 self.only_one_arg = only_one_arg 168 169 def __call__(self, jobs: Iterable[_DelayedCall]) -> dict: 170 jobs = list(jobs) 171 self.sanity_check(jobs) 172 return dict(self._execute(jobs)) 173 174 def _execute( 175 self, jobs: Iterable[_DelayedCall] 176 ) -> Generator[tuple[Any, Any], None, None]: 177 """ 178 Executes the jobs in parallel and yields the results. Saves to the 179 output file each time a new result (i.e. one that was not already 180 present in the output file) is obtained. 181 182 Args: 183 jobs (Iterable[_DelayedCall]): All the jobs, including those whose 184 results are already in the output file (and therefore shall not 185 be run again) 186 """ 187 188 def _key(j: _DelayedCall) -> Any: 189 """What the key of a job should be in the result dict""" 190 return j.args[0] if self.only_one_arg else tuple(j.args) 191 192 job_status = { 193 _key(j): {"job": j, "done": False, "result": None} for j in jobs 194 } 195 196 # Check if some jobs already have their results in the output file 197 assert self.context.file_path is not None # for typechecking 198 if self.context.file_path.exists(): 199 results = load_json(self.context.file_path, self.context) 200 if not isinstance(results, dict): 201 raise RuntimeError( 202 f"The contents of '{self.context.file_path}' is not a dict" 203 ) 204 # Mark the jobs that are already done 205 for k, r in results.items(): 206 if k in job_status: 207 job_status[k]["done"], job_status[k]["result"] = True, r 208 else: 209 results = {} 210 211 new_results_it = iter( 212 self.executor( 213 d["job"].to_joblib_delayed() # type: ignore 214 for d in job_status.values() 215 if not d["done"] 216 ) 217 ) 218 # This loops strongly assumes that `jobs`, `loaded_results`, 219 # `job_status`, and `new_results` are ordered in a consistent way 220 for k, s in job_status.items(): 221 if s["done"]: 222 yield k, s["result"] 223 else: 224 results[k] = next(new_results_it) 225 save_json(results, self.context.file_path, self.context) 226 yield k, results[k] 227 228 # At this point, new_results should have been fully consumed 229 try: 230 next(new_results_it) 231 raise RuntimeError("The executor returned too many results") 232 except StopIteration: 233 pass 234 235 def sanity_check(self, jobs: list[_DelayedCall]) -> None: 236 """ 237 Performs various sanity checks on a list of jobs. 238 """ 239 if self.only_one_arg: 240 for i, j in enumerate(jobs): 241 if len(j.args) != 1: 242 raise ValueError( 243 f"The only_one_arg option is set to True but job {i} " 244 f"has {len(j.args)} arguments: {j.args}" 245 ) 246 for i1, i2 in combinations(range(len(jobs)), 2): 247 if jobs[i1].args == jobs[i2].args: 248 raise ValueError( 249 f"Jobs {i1} and {i2} have the same arguments: " 250 f"{jobs[i1].args}" 251 ) 252 253 254def delayed(function: Callable) -> Callable: 255 """Use this like `joblib.delayed`""" 256 return _DelayedCall(function)
138class Parallel: 139 """ 140 Guarded analogue to 141 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html). 142 See module documentation. 143 """ 144 145 context: Context 146 executor: joblib.Parallel 147 only_one_arg: bool 148 149 def __init__( 150 self, 151 output_file: str | Path, 152 context: Context | None = None, 153 only_one_arg: bool = False, 154 **kwargs: Any, 155 ) -> None: 156 """ 157 Args: 158 output_file (str | Path): 159 context (Context | None, optional): 160 only_one_arg (bool, optional): If `True`, assumes that every job 161 has exactly one argument. This produces more compact output 162 files. 163 kwargs (Any): Forwarded to 164 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 165 """ 166 self.context = context or Context(output_file) 167 self.executor = joblib.Parallel(**kwargs) 168 self.only_one_arg = only_one_arg 169 170 def __call__(self, jobs: Iterable[_DelayedCall]) -> dict: 171 jobs = list(jobs) 172 self.sanity_check(jobs) 173 return dict(self._execute(jobs)) 174 175 def _execute( 176 self, jobs: Iterable[_DelayedCall] 177 ) -> Generator[tuple[Any, Any], None, None]: 178 """ 179 Executes the jobs in parallel and yields the results. Saves to the 180 output file each time a new result (i.e. one that was not already 181 present in the output file) is obtained. 182 183 Args: 184 jobs (Iterable[_DelayedCall]): All the jobs, including those whose 185 results are already in the output file (and therefore shall not 186 be run again) 187 """ 188 189 def _key(j: _DelayedCall) -> Any: 190 """What the key of a job should be in the result dict""" 191 return j.args[0] if self.only_one_arg else tuple(j.args) 192 193 job_status = { 194 _key(j): {"job": j, "done": False, "result": None} for j in jobs 195 } 196 197 # Check if some jobs already have their results in the output file 198 assert self.context.file_path is not None # for typechecking 199 if self.context.file_path.exists(): 200 results = load_json(self.context.file_path, self.context) 201 if not isinstance(results, dict): 202 raise RuntimeError( 203 f"The contents of '{self.context.file_path}' is not a dict" 204 ) 205 # Mark the jobs that are already done 206 for k, r in results.items(): 207 if k in job_status: 208 job_status[k]["done"], job_status[k]["result"] = True, r 209 else: 210 results = {} 211 212 new_results_it = iter( 213 self.executor( 214 d["job"].to_joblib_delayed() # type: ignore 215 for d in job_status.values() 216 if not d["done"] 217 ) 218 ) 219 # This loops strongly assumes that `jobs`, `loaded_results`, 220 # `job_status`, and `new_results` are ordered in a consistent way 221 for k, s in job_status.items(): 222 if s["done"]: 223 yield k, s["result"] 224 else: 225 results[k] = next(new_results_it) 226 save_json(results, self.context.file_path, self.context) 227 yield k, results[k] 228 229 # At this point, new_results should have been fully consumed 230 try: 231 next(new_results_it) 232 raise RuntimeError("The executor returned too many results") 233 except StopIteration: 234 pass 235 236 def sanity_check(self, jobs: list[_DelayedCall]) -> None: 237 """ 238 Performs various sanity checks on a list of jobs. 239 """ 240 if self.only_one_arg: 241 for i, j in enumerate(jobs): 242 if len(j.args) != 1: 243 raise ValueError( 244 f"The only_one_arg option is set to True but job {i} " 245 f"has {len(j.args)} arguments: {j.args}" 246 ) 247 for i1, i2 in combinations(range(len(jobs)), 2): 248 if jobs[i1].args == jobs[i2].args: 249 raise ValueError( 250 f"Jobs {i1} and {i2} have the same arguments: " 251 f"{jobs[i1].args}" 252 )
Guarded analogue to
joblib.Parallel
.
See module documentation.
149 def __init__( 150 self, 151 output_file: str | Path, 152 context: Context | None = None, 153 only_one_arg: bool = False, 154 **kwargs: Any, 155 ) -> None: 156 """ 157 Args: 158 output_file (str | Path): 159 context (Context | None, optional): 160 only_one_arg (bool, optional): If `True`, assumes that every job 161 has exactly one argument. This produces more compact output 162 files. 163 kwargs (Any): Forwarded to 164 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 165 """ 166 self.context = context or Context(output_file) 167 self.executor = joblib.Parallel(**kwargs) 168 self.only_one_arg = only_one_arg
Args:
output_file (str | Path):
context (Context | None, optional):
only_one_arg (bool, optional): If True
, assumes that every job
has exactly one argument. This produces more compact output
files.
kwargs (Any): Forwarded to
joblib.Parallel
236 def sanity_check(self, jobs: list[_DelayedCall]) -> None: 237 """ 238 Performs various sanity checks on a list of jobs. 239 """ 240 if self.only_one_arg: 241 for i, j in enumerate(jobs): 242 if len(j.args) != 1: 243 raise ValueError( 244 f"The only_one_arg option is set to True but job {i} " 245 f"has {len(j.args)} arguments: {j.args}" 246 ) 247 for i1, i2 in combinations(range(len(jobs)), 2): 248 if jobs[i1].args == jobs[i2].args: 249 raise ValueError( 250 f"Jobs {i1} and {i2} have the same arguments: " 251 f"{jobs[i1].args}" 252 )
Performs various sanity checks on a list of jobs.
255def delayed(function: Callable) -> Callable: 256 """Use this like `joblib.delayed`""" 257 return _DelayedCall(function)
Use this like joblib.delayed