webber.queue
v0.2: Experimental implementation. Not ready for use in production contexts. Helper class for queue-based DAG implementations.
1""" 2v0.2: Experimental implementation. Not ready for use in production contexts. 3Helper class for queue-based DAG implementations. 4""" 5import typing as _T 6import queue as _q 7import traceback as _traceback 8import concurrent.futures as _futures 9import webber.xcoms as _xcoms 10 11__all__ = [] 12 13def _worker(work: _T.Callable, args, kwargs: dict, promises: dict = {}, print_exc = False, 14 parent_id: str = None, parent_process: _futures.Future = None, in_queue: _q.LifoQueue = None, 15 halt_condition: _T.Callable = None, iter_limit: int = None, out_queue: _q.LifoQueue = None): 16 17 try: 18 19 args = list(args) 20 21 for i in range(len(args)): 22 if isinstance(args[i], _xcoms.Promise): 23 if args[i].key in promises.keys(): 24 args[i] = promises[args[i].key] 25 26 for k, v in kwargs.items(): 27 if isinstance(v, _xcoms.Promise): 28 if v.key in promises.keys(): 29 kwargs[k] = promises[v.key] 30 31 iter_count = 0 32 33 while iter_limit == None or (iter_count < iter_limit): 34 35 # For child processes, get latest value from the queue. 36 # If none are available and parent process is complete, break. 37 if in_queue != None: 38 try: 39 output = in_queue.get_nowait() 40 except Exception as e: 41 if not in_queue.empty(): 42 raise e 43 elif parent_process.done(): 44 break 45 continue 46 47 for a in range(len(args)): 48 if isinstance(args[a], _xcoms.Promise): 49 if args[a].key == parent_id: 50 args[a] = output 51 52 for k, v in kwargs.items(): 53 if isinstance(v, _xcoms.Promise): 54 if v.key == parent_id: 55 kwargs[k] = output 56 57 # Execute unit of work, and push output to queue, if given. 58 x = work(*args, **kwargs) 59 if out_queue != None: 60 out_queue.put(x) 61 62 iter_count += 1 63 64 # Check halt conditions for root process (output-based lambda or iteration limit). 65 if halt_condition != None and halt_condition(x): 66 break 67 68 except Exception as e: 69 if print_exc: 70 _traceback.print_exc() 71 print('Exception during runtime, ending process...') 72 raise e