webber.queue
v0.2: Experimental implementation. Not ready for use in production contexts. Helper class for queue-based DAG implementations.
1""" 2v0.2: Experimental implementation. Not ready for use in production contexts. 3Helper class for queue-based DAG implementations. 4""" 5import typing as _T 6import queue as _q 7import traceback as _traceback 8import concurrent.futures as _futures 9import webber.xcoms as _xcoms 10 11__all__ = [] 12 13def _worker(work: _T.Callable, args: _T.Tuple[_T.Any, ...] | _T.List[_T.Any], kwargs: _T.Dict[str, _T.Any], 14 promises: _T.Dict[str, _T.Any] | None = None, print_exc: bool = False, 15 parent_id: str | None = None, parent_process: _futures.Future[_T.Any] | None = None, 16 in_queue: _q.LifoQueue[_T.Any] | None = None, 17 halt_condition: _T.Callable[[_T.Any], bool] | None = None, iter_limit: int | None = None, 18 out_queue: _q.LifoQueue[_T.Any] | None = None) -> None: 19 20 try: 21 # Handle mutable default 22 if promises is None: 23 promises = {} 24 25 args = list(args) 26 27 for i in range(len(args)): 28 if isinstance(args[i], _xcoms.Promise): 29 if args[i].key in promises.keys(): 30 args[i] = promises[args[i].key] 31 32 for k, v in kwargs.items(): 33 if isinstance(v, _xcoms.Promise): 34 if v.key in promises.keys(): 35 kwargs[k] = promises[v.key] 36 37 iter_count = 0 38 39 while iter_limit is None or (iter_count < iter_limit): 40 41 # For child processes, get latest value from the queue. 42 # If none are available and parent process is complete, break. 43 if in_queue is not None: 44 try: 45 output = in_queue.get_nowait() 46 except Exception as e: 47 if not in_queue.empty(): 48 raise e 49 elif parent_process is not None and parent_process.done(): 50 break 51 continue 52 53 for a in range(len(args)): 54 if isinstance(args[a], _xcoms.Promise): 55 if args[a].key == parent_id: 56 args[a] = output 57 58 for k, v in kwargs.items(): 59 if isinstance(v, _xcoms.Promise): 60 if v.key == parent_id: 61 kwargs[k] = output 62 63 # Execute unit of work, and push output to queue, if given. 64 x = work(*args, **kwargs) 65 if out_queue is not None: 66 out_queue.put(x) 67 68 iter_count += 1 69 70 # Check halt conditions for root process (output-based lambda or iteration limit). 71 if halt_condition is not None and halt_condition(x): 72 break 73 74 except Exception as e: 75 if print_exc: 76 _traceback.print_exc() 77 print('Exception during runtime, ending process...') 78 raise e