webber.queue

v0.2: Experimental implementation. Not ready for use in production contexts. Helper class for queue-based DAG implementations.

 1"""
 2v0.2: Experimental implementation. Not ready for use in production contexts.
 3Helper class for queue-based DAG implementations.
 4"""
 5import typing as _T
 6import queue as _q
 7import traceback as _traceback
 8import concurrent.futures as _futures
 9import webber.xcoms as _xcoms
10
11__all__ = []
12
13def _worker(work: _T.Callable, args: _T.Tuple[_T.Any, ...] | _T.List[_T.Any], kwargs: _T.Dict[str, _T.Any],
14           promises: _T.Dict[str, _T.Any] | None = None, print_exc: bool = False,
15           parent_id: str | None = None, parent_process: _futures.Future[_T.Any] | None = None,
16           in_queue: _q.LifoQueue[_T.Any] | None = None,
17           halt_condition: _T.Callable[[_T.Any], bool] | None = None, iter_limit: int | None = None,
18           out_queue: _q.LifoQueue[_T.Any] | None = None) -> None:
19
20    try:
21        # Handle mutable default
22        if promises is None:
23            promises = {}
24
25        args = list(args)
26
27        for i in range(len(args)):
28            if isinstance(args[i], _xcoms.Promise):
29                if args[i].key in promises.keys():
30                    args[i] = promises[args[i].key]
31        
32        for k, v in kwargs.items():
33            if isinstance(v, _xcoms.Promise):
34                if v.key in promises.keys():
35                    kwargs[k] = promises[v.key]
36
37        iter_count = 0
38
39        while iter_limit is None or (iter_count < iter_limit):
40            
41            # For child processes, get latest value from the queue.
42            # If none are available and parent process is complete, break.
43            if in_queue is not None:
44                try:
45                    output = in_queue.get_nowait()
46                except Exception as e:
47                    if not in_queue.empty():
48                        raise e
49                    elif parent_process is not None and parent_process.done():
50                        break
51                    continue
52                    
53                for a in range(len(args)):
54                    if isinstance(args[a], _xcoms.Promise):
55                        if args[a].key == parent_id:
56                            args[a] = output
57                
58                for k, v in kwargs.items():
59                    if isinstance(v, _xcoms.Promise):
60                        if v.key == parent_id:
61                            kwargs[k] = output
62
63            # Execute unit of work, and push output to queue, if given.
64            x = work(*args, **kwargs)
65            if out_queue is not None:
66                out_queue.put(x)
67            
68            iter_count += 1
69            
70            # Check halt conditions for root process (output-based lambda or iteration limit).
71            if halt_condition is not None and halt_condition(x):
72                break
73
74    except Exception as e:
75        if print_exc:
76            _traceback.print_exc()
77        print('Exception during runtime, ending process...')
78        raise e