turbo_broccoli.parallel
Guarded parallel calls
Usage
It works in a way that is similar to
joblib.Parallel
,
for example
from math import sqrt
import turbo_broccoli as tb
# Note the use of `tb.delayed` instead of `joblib.delayed`.
# ↓
jobs = [tb.delayed(sqrt)(i**2) for i in range(5)]
executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2)
results = executor(jobs)
gives
{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0}
however, only the calls for which the corresponding entry in out/foo.json
does not exist will actually be executed, the others will simply be loaded.
Note that unlike joblib.Parallel
, the result is a dict
of arg/result, not
just a list
of results.
If the function in the jobs take more than one argument, simply drop the
only_one_arg=True
argument:
from math import sqrt
import turbo_broccoli as tb
f = lambda a, b: a * b
jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)]
executor = tb.Parallel("foo/bar.json", n_jobs=2)
results = executor(jobs)
gives
{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16}
Notes & caveats
The result of
executor(jobs)
is a dict or a generator of key/value pairs.The order of the results is guaranteed to be consistent with the order of the jobs.
The argument(s) in the jobs must be hashable. Furthermore, if a job has kwargs, a
ValueError
is raised.Every job must have a different tuple of argument, or in other words, every job must be unique. So something like this is not acceptable:
f = lambda a: 2 * a jobs = [tb.delayed(f)(i % 2) for i in range(5)] executor = tb.Parallel(...) results = executor(jobs)
because
f
is in effect called multiple times with value0
. In particular, TurboBroccoli'sParallel
is not suited for functions with no arguments (unless if they are executed only once but that kind of defeats the idea of parallelism).Beyond the arguments documented in
Parallel
,joblib.Parallel
arguments can be passed as kwargs to the constructor.TurboBroccoli's
Parallel
does not honor thereturn_as
argument ofjoblib.Parallel
. The return value ofParallel.__call__
is always adict
.If the output of the job's function are large, it might be inefficient to rewrite every past results the output file each time a new result is generated. Using embedded objects, can make writes faster.
def f(i): ... return tb.EmbeddedDict({"result": something_big, ...}) jobs = [tb.delayed(f)(i) for i in range(1000)] executor = tb.Parallel(...) results = executor(jobs)
1""" 2Guarded parallel calls 3 4## Usage 5 6It works in a way that is similar to 7[`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html), 8for example 9 10```py 11from math import sqrt 12import turbo_broccoli as tb 13 14# Note the use of `tb.delayed` instead of `joblib.delayed`. 15# ↓ 16jobs = [tb.delayed(sqrt)(i**2) for i in range(5)] 17executor = tb.Parallel("foo/bar.json", only_one_arg=True, n_jobs=2) 18results = executor(jobs) 19``` 20 21gives 22 23```py 24{0: 0.0, 1: 1.0, 2: 2.0, 3: 3.0, 4: 4.0, 5: 5.0} 25``` 26 27however, only the calls for which the corresponding entry in `out/foo.json` 28does not exist will actually be executed, the others will simply be loaded. 29Note that unlike `joblib.Parallel`, the result is a `dict` of arg/result, not 30just a `list` of results. 31 32If the function in the jobs take more than one argument, simply drop the 33`only_one_arg=True` argument: 34 35```py 36from math import sqrt 37import turbo_broccoli as tb 38 39f = lambda a, b: a * b 40 41jobs = [tb.delayed(f)(i, j) for i in range(5) for j in range(5)] 42executor = tb.Parallel("foo/bar.json", n_jobs=2) 43results = executor(jobs) 44``` 45 46gives 47 48```py 49{(0, 0): 0, (0, 1): 0, ..., (4, 4): 16} 50``` 51 52## Notes & caveats 53 54* The result of `executor(jobs)` is a dict or a generator of key/value pairs. 55 56* The order of the results is guaranteed to be consistent with the order of the 57 jobs. 58 59* The argument(s) in the jobs must be hashable. Furthermore, if a job has 60 kwargs, a `ValueError` is raised. 61 62* Every job must have a different tuple of argument, or in other words, every 63 job must be unique. So something like this is not acceptable: 64 ```py 65 f = lambda a: 2 * a 66 67 jobs = [tb.delayed(f)(i % 2) for i in range(5)] 68 executor = tb.Parallel(...) 69 results = executor(jobs) 70 ``` 71 because `f` is in effect called multiple times with value `0`. In 72 particular, TurboBroccoli's `Parallel` is not suited for functions with no 73 arguments (unless if they are executed only once but that kind of defeats 74 the idea of parallelism). 75 76* Beyond the arguments documented in `Parallel`, [`joblib.Parallel` 77 arguments](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 78 can be passed as kwargs to the constructor. 79 80* TurboBroccoli's `Parallel` *does not* honor the `return_as` argument of 81 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html). 82 The return value of `Parallel.__call__` is always a `dict`. 83 84* If the output of the job's function are large, it might be inefficient to 85 rewrite every past results the output file each time a new result is 86 generated. Using [embedded 87 objects](https://altaris.github.io/turbo-broccoli/turbo_broccoli/custom/embedded.html), 88 can make writes faster. 89 90 ```py 91 def f(i): 92 ... 93 return tb.EmbeddedDict({"result": something_big, ...}) 94 95 jobs = [tb.delayed(f)(i) for i in range(1000)] 96 executor = tb.Parallel(...) 97 results = executor(jobs) 98 ``` 99""" 100 101from itertools import combinations 102from pathlib import Path 103from typing import Any, Callable, Generator, Iterable 104 105import joblib 106 107try: 108 from loguru import logger as logging 109except ModuleNotFoundError: 110 import logging # type: ignore 111 112from .context import Context 113from .turbo_broccoli import load_json, save_json 114 115 116class _DelayedCall: 117 118 function: Callable 119 args: tuple[Any, ...] 120 121 def __call__(self, *args: Any, **kwds: Any) -> "_DelayedCall": 122 if kwds: 123 raise ValueError("Keyword arguments are not supported") 124 self.args = args 125 return self 126 127 def __init__(self, function: Callable) -> None: 128 self.function = function 129 130 def to_joblib_delayed(self) -> Callable: 131 """ 132 Returns a `joblib.delayed` object that can be used with 133 `joblib.Parallel`. 134 """ 135 return joblib.delayed(self.function)(*self.args) 136 137 138class Parallel: 139 """ 140 Guarded analogue to 141 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html). 142 See module documentation. 143 """ 144 145 context: Context 146 executor: joblib.Parallel 147 only_one_arg: bool 148 149 def __init__( 150 self, 151 output_file: str | Path, 152 context: Context | None = None, 153 only_one_arg: bool = False, 154 **kwargs: Any, 155 ) -> None: 156 """ 157 Args: 158 output_file (str | Path): 159 context (Context | None, optional): 160 only_one_arg (bool, optional): If `True`, assumes that every job 161 has exactly one argument. This produces more compact output 162 files. 163 kwargs (Any): Forwarded to 164 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 165 """ 166 self.context = context or Context(output_file) 167 self.executor = joblib.Parallel(**kwargs) 168 self.only_one_arg = only_one_arg 169 170 def __call__(self, jobs: Iterable[_DelayedCall]) -> dict: 171 jobs = list(jobs) 172 self.sanity_check(jobs) 173 return dict(self._execute(jobs)) 174 175 # pylint: disable=stop-iteration-return 176 def _execute( 177 self, jobs: Iterable[_DelayedCall] 178 ) -> Generator[tuple[Any, Any], None, None]: 179 """ 180 Executes the jobs in parallel and yields the results. Saves to the 181 output file each time a new result (i.e. one that was not already 182 present in the output file) is obtained. 183 184 Args: 185 jobs (Iterable[_DelayedCall]): All the jobs, including those whose 186 results are already in the output file (and therefore shall not 187 be run again) 188 """ 189 190 def _key(j: _DelayedCall) -> Any: 191 """What the key of a job should be in the result dict""" 192 return j.args[0] if self.only_one_arg else tuple(j.args) 193 194 job_status = { 195 _key(j): {"job": j, "done": False, "result": None} for j in jobs 196 } 197 198 # Check if some jobs already have their results in the output file 199 assert self.context.file_path is not None # for typechecking 200 if self.context.file_path.exists(): 201 results = load_json(self.context.file_path, self.context) 202 if not isinstance(results, dict): 203 raise RuntimeError( 204 f"The contents of '{self.context.file_path}' is not a dict" 205 ) 206 # Mark the jobs that are already done 207 for k, r in results.items(): 208 if k in job_status: 209 job_status[k]["done"], job_status[k]["result"] = True, r 210 else: 211 results = {} 212 213 new_results_it = iter( 214 self.executor( 215 d["job"].to_joblib_delayed() # type: ignore 216 for d in job_status.values() 217 if not d["done"] 218 ) 219 ) 220 # This loops strongly assumes that `jobs`, `loaded_results`, 221 # `job_status`, and `new_results` are ordered in a consistent way 222 for k, s in job_status.items(): 223 if s["done"]: 224 yield k, s["result"] 225 else: 226 results[k] = next(new_results_it) 227 save_json(results, self.context.file_path, self.context) 228 yield k, results[k] 229 230 # At this point, new_results should have been fully consumed 231 try: 232 next(new_results_it) 233 raise RuntimeError("The executor returned too many results") 234 except StopIteration: 235 pass 236 237 def sanity_check(self, jobs: list[_DelayedCall]) -> None: 238 """ 239 Performs various sanity checks on a list of jobs. 240 """ 241 if self.only_one_arg: 242 for i, j in enumerate(jobs): 243 if len(j.args) != 1: 244 raise ValueError( 245 f"The only_one_arg option is set to True but job {i} " 246 f"has {len(j.args)} arguments: {j.args}" 247 ) 248 for i1, i2 in combinations(range(len(jobs)), 2): 249 if jobs[i1].args == jobs[i2].args: 250 raise ValueError( 251 f"Jobs {i1} and {i2} have the same arguments: " 252 f"{jobs[i1].args}" 253 ) 254 255 256def delayed(function: Callable) -> Callable: 257 """Use this like `joblib.delayed`""" 258 return _DelayedCall(function)
139class Parallel: 140 """ 141 Guarded analogue to 142 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html). 143 See module documentation. 144 """ 145 146 context: Context 147 executor: joblib.Parallel 148 only_one_arg: bool 149 150 def __init__( 151 self, 152 output_file: str | Path, 153 context: Context | None = None, 154 only_one_arg: bool = False, 155 **kwargs: Any, 156 ) -> None: 157 """ 158 Args: 159 output_file (str | Path): 160 context (Context | None, optional): 161 only_one_arg (bool, optional): If `True`, assumes that every job 162 has exactly one argument. This produces more compact output 163 files. 164 kwargs (Any): Forwarded to 165 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 166 """ 167 self.context = context or Context(output_file) 168 self.executor = joblib.Parallel(**kwargs) 169 self.only_one_arg = only_one_arg 170 171 def __call__(self, jobs: Iterable[_DelayedCall]) -> dict: 172 jobs = list(jobs) 173 self.sanity_check(jobs) 174 return dict(self._execute(jobs)) 175 176 # pylint: disable=stop-iteration-return 177 def _execute( 178 self, jobs: Iterable[_DelayedCall] 179 ) -> Generator[tuple[Any, Any], None, None]: 180 """ 181 Executes the jobs in parallel and yields the results. Saves to the 182 output file each time a new result (i.e. one that was not already 183 present in the output file) is obtained. 184 185 Args: 186 jobs (Iterable[_DelayedCall]): All the jobs, including those whose 187 results are already in the output file (and therefore shall not 188 be run again) 189 """ 190 191 def _key(j: _DelayedCall) -> Any: 192 """What the key of a job should be in the result dict""" 193 return j.args[0] if self.only_one_arg else tuple(j.args) 194 195 job_status = { 196 _key(j): {"job": j, "done": False, "result": None} for j in jobs 197 } 198 199 # Check if some jobs already have their results in the output file 200 assert self.context.file_path is not None # for typechecking 201 if self.context.file_path.exists(): 202 results = load_json(self.context.file_path, self.context) 203 if not isinstance(results, dict): 204 raise RuntimeError( 205 f"The contents of '{self.context.file_path}' is not a dict" 206 ) 207 # Mark the jobs that are already done 208 for k, r in results.items(): 209 if k in job_status: 210 job_status[k]["done"], job_status[k]["result"] = True, r 211 else: 212 results = {} 213 214 new_results_it = iter( 215 self.executor( 216 d["job"].to_joblib_delayed() # type: ignore 217 for d in job_status.values() 218 if not d["done"] 219 ) 220 ) 221 # This loops strongly assumes that `jobs`, `loaded_results`, 222 # `job_status`, and `new_results` are ordered in a consistent way 223 for k, s in job_status.items(): 224 if s["done"]: 225 yield k, s["result"] 226 else: 227 results[k] = next(new_results_it) 228 save_json(results, self.context.file_path, self.context) 229 yield k, results[k] 230 231 # At this point, new_results should have been fully consumed 232 try: 233 next(new_results_it) 234 raise RuntimeError("The executor returned too many results") 235 except StopIteration: 236 pass 237 238 def sanity_check(self, jobs: list[_DelayedCall]) -> None: 239 """ 240 Performs various sanity checks on a list of jobs. 241 """ 242 if self.only_one_arg: 243 for i, j in enumerate(jobs): 244 if len(j.args) != 1: 245 raise ValueError( 246 f"The only_one_arg option is set to True but job {i} " 247 f"has {len(j.args)} arguments: {j.args}" 248 ) 249 for i1, i2 in combinations(range(len(jobs)), 2): 250 if jobs[i1].args == jobs[i2].args: 251 raise ValueError( 252 f"Jobs {i1} and {i2} have the same arguments: " 253 f"{jobs[i1].args}" 254 )
Guarded analogue to
joblib.Parallel
.
See module documentation.
150 def __init__( 151 self, 152 output_file: str | Path, 153 context: Context | None = None, 154 only_one_arg: bool = False, 155 **kwargs: Any, 156 ) -> None: 157 """ 158 Args: 159 output_file (str | Path): 160 context (Context | None, optional): 161 only_one_arg (bool, optional): If `True`, assumes that every job 162 has exactly one argument. This produces more compact output 163 files. 164 kwargs (Any): Forwarded to 165 [`joblib.Parallel`](https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html) 166 """ 167 self.context = context or Context(output_file) 168 self.executor = joblib.Parallel(**kwargs) 169 self.only_one_arg = only_one_arg
Args:
output_file (str | Path):
context (Context | None, optional):
only_one_arg (bool, optional): If True
, assumes that every job
has exactly one argument. This produces more compact output
files.
kwargs (Any): Forwarded to
joblib.Parallel
238 def sanity_check(self, jobs: list[_DelayedCall]) -> None: 239 """ 240 Performs various sanity checks on a list of jobs. 241 """ 242 if self.only_one_arg: 243 for i, j in enumerate(jobs): 244 if len(j.args) != 1: 245 raise ValueError( 246 f"The only_one_arg option is set to True but job {i} " 247 f"has {len(j.args)} arguments: {j.args}" 248 ) 249 for i1, i2 in combinations(range(len(jobs)), 2): 250 if jobs[i1].args == jobs[i2].args: 251 raise ValueError( 252 f"Jobs {i1} and {i2} have the same arguments: " 253 f"{jobs[i1].args}" 254 )
Performs various sanity checks on a list of jobs.
257def delayed(function: Callable) -> Callable: 258 """Use this like `joblib.delayed`""" 259 return _DelayedCall(function)
Use this like joblib.delayed