webber.queue

v0.2: Experimental implementation. Not ready for use in production contexts. Helper class for queue-based DAG implementations.

 1"""
 2v0.2: Experimental implementation. Not ready for use in production contexts.
 3Helper class for queue-based DAG implementations.
 4"""
 5import typing as _T
 6import queue as _q
 7import traceback as _traceback
 8import concurrent.futures as _futures
 9import webber.xcoms as _xcoms
10
11__all__ = []
12
13def _worker(work: _T.Callable, args, kwargs: dict, promises: dict = {}, print_exc = False,
14           parent_id: str = None, parent_process: _futures.Future = None, in_queue: _q.LifoQueue = None,
15           halt_condition: _T.Callable = None, iter_limit: int = None, out_queue: _q.LifoQueue = None):
16
17    try:
18        
19        args = list(args)
20
21        for i in range(len(args)):
22            if isinstance(args[i], _xcoms.Promise):
23                if args[i].key in promises.keys():
24                    args[i] = promises[args[i].key]
25        
26        for k, v in kwargs.items():
27            if isinstance(v, _xcoms.Promise):
28                if v.key in promises.keys():
29                    kwargs[k] = promises[v.key]
30
31        iter_count = 0
32
33        while iter_limit == None or (iter_count < iter_limit):
34            
35            # For child processes, get latest value from the queue.
36            # If none are available and parent process is complete, break.
37            if in_queue != None:
38                try:
39                    output = in_queue.get_nowait()
40                except Exception as e:
41                    if not in_queue.empty():
42                        raise e
43                    elif parent_process.done():
44                        break
45                    continue
46                    
47                for a in range(len(args)):
48                    if isinstance(args[a], _xcoms.Promise):
49                        if args[a].key == parent_id:
50                            args[a] = output
51                
52                for k, v in kwargs.items():
53                    if isinstance(v, _xcoms.Promise):
54                        if v.key == parent_id:
55                            kwargs[k] = output
56
57            # Execute unit of work, and push output to queue, if given.
58            x = work(*args, **kwargs)
59            if out_queue != None:
60                out_queue.put(x)
61            
62            iter_count += 1
63            
64            # Check halt conditions for root process (output-based lambda or iteration limit).
65            if halt_condition != None and halt_condition(x):
66                break
67
68    except Exception as e:
69        if print_exc:
70            _traceback.print_exc()
71        print('Exception during runtime, ending process...')
72        raise e